MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
flexTT.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #include <ndb_global.h>
20 
21 #include <NdbApi.hpp>
22 #include <NdbSchemaCon.hpp>
23 #include <NdbMain.h>
24 #include <md5_hash.hpp>
25 
26 #include <NdbThread.h>
27 #include <NdbSleep.h>
28 #include <NdbTick.h>
29 #include <NdbOut.hpp>
30 #include <NdbTimer.hpp>
31 
32 #include <NdbTest.hpp>
33 #include <NDBT_Error.hpp>
34 
35 #define MAX_PARTS 4
36 #define MAX_SEEK 16
37 #define MAXSTRLEN 16
38 #define MAXATTR 64
39 #define MAXTABLES 64
40 #define NDB_MAXTHREADS 128
41 /*
42  NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
43  #define from <sys/thread.h> on AIX (IBM compiler). We explicitly
44  #undef it here lest someone use it by habit and get really funny
45  results. K&R says we may #undef non-existent symbols, so let's go.
46 */
47 #undef MAXTHREADS
48 #define MAXPAR 1024
49 #define MAXATTRSIZE 1000
50 #define PKSIZE 1
51 
52 
53 #ifdef NDB_WIN32
54 inline long lrand48(void) { return rand(); };
55 #endif
56 
57 
58 enum StartType {
59  stIdle,
60  stInsert,
61  stRead,
62  stUpdate,
63  stDelete,
64  stStop
65 } ;
66 
67 struct ThreadNdb
68 {
69  int threadNo;
70  Ndb* threadNdb;
71  Uint32 threadBase;
72  Uint32 threadLoopCounter;
73  Uint32 threadNextStart;
74  Uint32 threadStop;
75  Uint32 threadLoopStop;
76  Uint32 threadIncrement;
77  Uint32 threadNoCompleted;
78  bool threadCompleted;
79  StartType threadStartType;
80 };
81 
82 struct TransNdb
83 {
84  char transRecord[128];
85  Ndb* transNdb;
86  StartType transStartType;
87  Uint32 vpn_number;
88  Uint32 vpn_identity;
89  Uint32 transErrorCount;
90  NdbOperation* transOperation;
91  ThreadNdb* transThread;
92 };
93 
94 extern "C" { static void* threadLoop(void*); }
95 static void setAttrNames(void);
96 static void setTableNames(void);
97 static int readArguments(int argc, const char** argv);
98 static int createTables(Ndb*);
99 static bool defineOperation(NdbConnection* aTransObject, TransNdb*,
100  Uint32 vpn_nb, Uint32 vpn_id);
101 static bool executeTransaction(TransNdb* transNdbRef);
102 static StartType random_choice();
103 static void execute(StartType aType);
104 static bool executeThread(ThreadNdb*, TransNdb*);
105 static void executeCallback(int result, NdbConnection* NdbObject,
106  void* aObject);
107 static bool error_handler(const NdbError & err) ;
108 static Uint32 getKey(Uint32, Uint32) ;
109 static void input_error();
110 
111 ErrorData * flexTTErrorData;
112 
113 static NdbThread* threadLife[NDB_MAXTHREADS];
114 static int tNodeId;
115 static int ThreadReady[NDB_MAXTHREADS];
116 static StartType ThreadStart[NDB_MAXTHREADS];
117 static char tableName[1][MAXSTRLEN+1];
118 static char attrName[5][MAXSTRLEN+1];
119 
120 // Program Parameters
121 static bool tInsert = false;
122 static bool tDelete = false;
123 static bool tReadUpdate = true;
124 static int tUpdateFreq = 20;
125 static bool tLocal = false;
126 static int tLocalPart = 0;
127 static Uint32 tMinEvents = 0;
128 static int tSendForce = 0;
129 static int tNoOfLoops = 1;
130 static Uint32 tNoOfThreads = 1;
131 static Uint32 tNoOfParallelTrans = 32;
132 static Uint32 tNoOfTransactions = 500;
133 static Uint32 tLoadFactor = 80;
134 static bool tempTable = false;
135 static bool startTransGuess = true;
136 
137 //Program Flags
138 static int theSimpleFlag = 0;
139 static int theDirtyFlag = 0;
140 static int theWriteFlag = 0;
141 static int theTableCreateFlag = 1;
142 
143 #define START_REAL_TIME
144 #define STOP_REAL_TIME
145 #define START_TIMER { NdbTimer timer; timer.doStart();
146 #define STOP_TIMER timer.doStop();
147 #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
148 
149 static void
150 resetThreads(){
151 
152  for (int i = 0; i < (int)tNoOfThreads ; i++) {
153  ThreadReady[i] = 0;
154  ThreadStart[i] = stIdle;
155  }//for
156 }
157 
158 static void
159 waitForThreads(void)
160 {
161  int cont = 0;
162  do {
163  cont = 0;
164  NdbSleep_MilliSleep(20);
165  for (int i = 0; i < (int)tNoOfThreads ; i++) {
166  if (ThreadReady[i] == 0) {
167  cont = 1;
168  }//if
169  }//for
170  } while (cont == 1);
171 }
172 
173 static void
174 tellThreads(StartType what)
175 {
176  for (int i = 0; i < (int)tNoOfThreads ; i++)
177  ThreadStart[i] = what;
178 }
179 
180 static Ndb_cluster_connection *g_cluster_connection= 0;
181 
182 NDB_COMMAND(flexTT, "flexTT", "flexTT", "flexTT", 65535)
183 {
184  ndb_init();
185  ThreadNdb* pThreadData;
186  int returnValue = NDBT_OK;
187  flexTTErrorData = new ErrorData;
188  flexTTErrorData->resetErrorCounters();
189 
190  if (readArguments(argc, argv) != 0){
191  input_error();
192  return NDBT_ProgramExit(NDBT_WRONGARGS);
193  }
194 
195  pThreadData = new ThreadNdb[NDB_MAXTHREADS];
196 
197  ndbout << endl << "FLEXTT - Starting normal mode" << endl;
198  ndbout << "Perform TimesTen benchmark" << endl;
199  ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl;
200  ndbout << " " << tNoOfParallelTrans;
201  ndbout << " number of parallel transaction per thread " << endl;
202  ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl;
203  ndbout << " " << tNoOfLoops << " iterations " << endl;
204  ndbout << " " << "Update Frequency is " << tUpdateFreq << "%" << endl;
205  ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl;
206  if (tLocal == true) {
207  ndbout << " " << "We only use Local Part = ";
208  ndbout << tLocalPart << endl;
209  }//if
210  if (tempTable == true) {
211  ndbout << " Tables are without logging " << endl;
212  } else {
213  ndbout << " Tables are with logging " << endl;
214  }//if
215  if (startTransGuess == true) {
216  ndbout << " Transactions are executed with hint provided" << endl;
217  } else {
218  ndbout << " Transactions are executed with round robin scheme" << endl;
219  }//if
220  if (tSendForce == 0) {
221  ndbout << " No force send is used, adaptive algorithm used" << endl;
222  } else if (tSendForce == 1) {
223  ndbout << " Force send used" << endl;
224  } else {
225  ndbout << " No force send is used, adaptive algorithm disabled" << endl;
226  }//if
227 
228  ndbout << endl;
229 
230  /* print Setting */
231  flexTTErrorData->printSettings(ndbout);
232 
233  NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
234 
235  setAttrNames();
236  setTableNames();
237 
239  if(con.connect(12, 5, 1) != 0)
240  {
241  return NDBT_ProgramExit(NDBT_FAILED);
242  }
243  g_cluster_connection= &con;
244 
245  Ndb * pNdb = new Ndb(g_cluster_connection, "TEST_DB");
246  pNdb->init();
247  tNodeId = pNdb->getNodeId();
248 
249  ndbout << " NdbAPI node with id = " << pNdb->getNodeId() << endl;
250  ndbout << endl;
251 
252  ndbout << "Waiting for ndb to become ready..." <<endl;
253  if (pNdb->waitUntilReady(2000) != 0){
254  ndbout << "NDB is not ready" << endl;
255  ndbout << "Benchmark failed!" << endl;
256  returnValue = NDBT_FAILED;
257  }
258 
259  if(returnValue == NDBT_OK){
260  if (createTables(pNdb) != 0){
261  returnValue = NDBT_FAILED;
262  }
263  }
264 
265  if(returnValue == NDBT_OK){
266  /****************************************************************
267  * Create NDB objects. *
268  ****************************************************************/
269  resetThreads();
270  for (int i = 0; i < (int)tNoOfThreads ; i++) {
271  pThreadData[i].threadNo = i;
272  threadLife[i] = NdbThread_Create(threadLoop,
273  (void**)&pThreadData[i],
274  32768,
275  "flexAsynchThread",
276  NDB_THREAD_PRIO_LOW);
277  }//for
278  ndbout << endl << "All NDB objects and table created" << endl << endl;
279  int noOfTransacts = tNoOfParallelTrans * tNoOfTransactions *
280  tNoOfThreads * tNoOfLoops;
281  /****************************************************************
282  * Execute program. *
283  ****************************************************************/
284  /****************************************************************
285  * Perform inserts. *
286  ****************************************************************/
287 
288  if (tInsert == true) {
289  tInsert = false;
290  tReadUpdate = false;
291  START_TIMER;
292  execute(stInsert);
293  STOP_TIMER;
294  PRINT_TIMER("insert", noOfTransacts, 1);
295  }//if
296  /****************************************************************
297  * Perform read + updates. *
298  ****************************************************************/
299 
300  if (tReadUpdate == true) {
301  START_TIMER;
302  execute(stRead);
303  STOP_TIMER;
304  PRINT_TIMER("update + read", noOfTransacts, 1);
305  }//if
306  /****************************************************************
307  * Perform delete. *
308  ****************************************************************/
309 
310  if (tDelete == true) {
311  tDelete = false;
312  START_TIMER;
313  execute(stDelete);
314  STOP_TIMER;
315  PRINT_TIMER("delete", noOfTransacts, 1);
316  }//if
317  ndbout << "--------------------------------------------------" << endl;
318 
319  execute(stStop);
320  void * tmp;
321  for(int i = 0; i<(int)tNoOfThreads; i++){
322  NdbThread_WaitFor(threadLife[i], &tmp);
323  NdbThread_Destroy(&threadLife[i]);
324  }
325  }
326  delete [] pThreadData;
327  delete pNdb;
328 
329  //printing errorCounters
330  flexTTErrorData->printErrorCounters(ndbout);
331 
332  return NDBT_ProgramExit(returnValue);
333 }//main()
334 
335 
336 static void execute(StartType aType)
337 {
338  resetThreads();
339  tellThreads(aType);
340  waitForThreads();
341 }//execute()
342 
343 static void*
344 threadLoop(void* ThreadData)
345 {
346  Ndb* localNdb;
347  ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
348  int loc_threadNo = tabThread->threadNo;
349 
350  void * mem = malloc(sizeof(TransNdb)*tNoOfParallelTrans);
351  TransNdb* pTransData = (TransNdb*)mem;
352 
353  localNdb = new Ndb(g_cluster_connection, "TEST_DB");
354  localNdb->init(1024);
355  localNdb->waitUntilReady();
356 
357  if (tLocal == false) {
358  tabThread->threadIncrement = 1;
359  } else {
360  tabThread->threadIncrement = MAX_SEEK;
361  }//if
362  tabThread->threadBase = (loc_threadNo << 16) + tNodeId;
363  tabThread->threadNdb = localNdb;
364  tabThread->threadStop = tNoOfParallelTrans * tNoOfTransactions;
365  tabThread->threadStop *= tabThread->threadIncrement;
366  tabThread->threadLoopStop = tNoOfLoops;
367  Uint32 i, j;
368  for (i = 0; i < tNoOfParallelTrans; i++) {
369  pTransData[i].transNdb = localNdb;
370  pTransData[i].transThread = tabThread;
371  pTransData[i].transOperation = NULL;
372  pTransData[i].transStartType = stIdle;
373  pTransData[i].vpn_number = tabThread->threadBase;
374  pTransData[i].vpn_identity = 0;
375  pTransData[i].transErrorCount = 0;
376  for (j = 0; j < 128; j++) {
377  pTransData[i].transRecord[j] = 0x30;
378  }//for
379  }//for
380 
381  for (;;){
382  while (ThreadStart[loc_threadNo] == stIdle) {
383  NdbSleep_MilliSleep(10);
384  }//while
385 
386  // Check if signal to exit is received
387  if (ThreadStart[loc_threadNo] == stStop) {
388  break;
389  }//if
390 
391  tabThread->threadStartType = ThreadStart[loc_threadNo];
392  tabThread->threadLoopCounter = 0;
393  tabThread->threadCompleted = false;
394  tabThread->threadNoCompleted = 0;
395  tabThread->threadNextStart = 0;
396 
397  ThreadStart[loc_threadNo] = stIdle;
398  if(!executeThread(tabThread, pTransData)){
399  break;
400  }
401  ThreadReady[loc_threadNo] = 1;
402  }//for
403 
404  free(mem);
405  delete localNdb;
406  ThreadReady[loc_threadNo] = 1;
407 
408  return NULL; // Thread exits
409 }//threadLoop()
410 
411 static
412 bool
413 executeThread(ThreadNdb* tabThread, TransNdb* atransDataArrayPtr) {
414  Uint32 i;
415  for (i = 0; i < tNoOfParallelTrans; i++) {
416  TransNdb* transNdbPtr = &atransDataArrayPtr[i];
417  transNdbPtr->vpn_identity = i * tabThread->threadIncrement;
418  transNdbPtr->transStartType = tabThread->threadStartType;
419  if (executeTransaction(transNdbPtr) == false) {
420  return false;
421  }//if
422  }//for
423  tabThread->threadNextStart = tNoOfParallelTrans * tabThread->threadIncrement;
424  do {
425  tabThread->threadNdb->sendPollNdb(3000, tMinEvents, tSendForce);
426  } while (tabThread->threadCompleted == false);
427  return true;
428 }//executeThread()
429 
430 static
431 bool executeTransaction(TransNdb* transNdbRef)
432 {
433  NdbConnection* MyTrans;
434  ThreadNdb* tabThread = transNdbRef->transThread;
435  Ndb* aNdbObject = transNdbRef->transNdb;
436  Uint32 threadBase = tabThread->threadBase;
437  Uint32 startKey = transNdbRef->vpn_identity;
438  if (tLocal == true) {
439  startKey = getKey(startKey, threadBase);
440  }//if
441  if (startTransGuess == true) {
442  Uint32 tKey[2];
443  tKey[0] = startKey;
444  tKey[1] = threadBase;
445  MyTrans = aNdbObject->startTransaction((Uint32)0, //Priority
446  (const char*)&tKey[0], //Main PKey
447  (Uint32)8); //Key Length
448  } else {
449  MyTrans = aNdbObject->startTransaction();
450  }//if
451  if (MyTrans == NULL) {
452  error_handler(aNdbObject->getNdbError());
453  ndbout << endl << "Unable to recover! Quiting now" << endl ;
454  return false;
455  }//if
456  //-------------------------------------------------------
457  // Define the operation, but do not execute it yet.
458  //-------------------------------------------------------
459  if (!defineOperation(MyTrans, transNdbRef, startKey, threadBase))
460  return false;
461 
462  return true;
463 }//executeTransaction()
464 
465 
466 static
467 Uint32
468 getKey(Uint32 aBase, Uint32 aThreadBase) {
469  Uint32 Tfound = aBase;
470  Uint32 hash;
471  union {
472  Uint64 Tkey64;
473  Uint32 tKey32[2];
474  };
475  tKey32[0] = aThreadBase;
476  for (Uint32 i = aBase; i < (aBase + MAX_SEEK); i++) {
477  tKey32[1] = i;
478  hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
479  hash = (hash >> 6) & (MAX_PARTS - 1);
480  if (hash == (Uint32)tLocalPart) {
481  Tfound = i;
482  break;
483  }//if
484  }//for
485  return Tfound;
486 }//getKey()
487 
488 static void
489 executeCallback(int result, NdbConnection* NdbObject, void* aObject)
490 {
491  TransNdb* transNdbRef = (TransNdb*)aObject;
492  ThreadNdb* tabThread = transNdbRef->transThread;
493  Ndb* tNdb = transNdbRef->transNdb;
494  Uint32 vpn_id = transNdbRef->vpn_identity;
495  Uint32 vpn_nb = tabThread->threadBase;
496 
497  if (result == -1) {
498 // Add complete error handling here
499  int retCode = flexTTErrorData->handleErrorCommon(NdbObject->getNdbError());
500  if (retCode == 1) {
501  if (NdbObject->getNdbError().code != 626 &&
502  NdbObject->getNdbError().code != 630) {
503  ndbout_c("execute: %s", NdbObject->getNdbError().message);
504  ndbout_c("Error code = %d", NdbObject->getNdbError().code);
505  }
506  } else if (retCode == 2) {
507  ndbout << "4115 should not happen in flexTT" << endl;
508  } else if (retCode == 3) {
509  /* What can we do here? */
510  ndbout_c("execute: %s", NdbObject->getNdbError().message);
511  }//if(retCode == 3)
512  transNdbRef->transErrorCount++;
513  const NdbError & err = NdbObject->getNdbError();
514  switch (err.classification) {
517  ndbout << "Error with vpn_id = " << vpn_id << " and vpn_nb = ";
518  ndbout << vpn_nb << endl;
519  ndbout << err << endl;
520  goto checkCompleted;
522  NdbSleep_MilliSleep(10);
526  break;
527  default:
528  goto checkCompleted;
529  }//if
530  if ((transNdbRef->transErrorCount > 10) ||
531  (tabThread->threadNoCompleted > 0)) {
532  goto checkCompleted;
533  }//if
534  } else {
535  if (tabThread->threadNoCompleted == 0) {
536  transNdbRef->transErrorCount = 0;
537  transNdbRef->vpn_identity = tabThread->threadNextStart;
538  if (tabThread->threadNextStart == tabThread->threadStop) {
539  tabThread->threadLoopCounter++;
540  transNdbRef->vpn_identity = 0;
541  tabThread->threadNextStart = 0;
542  if (tabThread->threadLoopCounter == (Uint32)tNoOfLoops) {
543  goto checkCompleted;
544  }//if
545  }//if
546  tabThread->threadNextStart += tabThread->threadIncrement;
547  } else {
548  goto checkCompleted;
549  }//if
550  }//if
551  tNdb->closeTransaction(NdbObject);
552  executeTransaction(transNdbRef);
553  return;
554 
555 checkCompleted:
556  tNdb->closeTransaction(NdbObject);
557  tabThread->threadNoCompleted++;
558  if (tabThread->threadNoCompleted == tNoOfParallelTrans) {
559  tabThread->threadCompleted = true;
560  }//if
561  return;
562 }//executeCallback()
563 
564 static
565 StartType
566 random_choice()
567 {
568 //----------------------------------------------------
569 // Generate a random key between 0 and tNoOfRecords - 1
570 //----------------------------------------------------
571  UintR random_number = lrand48() % 100;
572  if ((int)random_number < tUpdateFreq)
573  return stUpdate;
574  else
575  return stRead;
576 }//random_choice()
577 
578 static bool
579 defineOperation(NdbConnection* localNdbConnection, TransNdb* transNdbRef,
580  unsigned int vpn_id, unsigned int vpn_nb)
581 {
582  NdbOperation* localNdbOperation;
583  StartType TType = transNdbRef->transStartType;
584 
585  //-------------------------------------------------------
586  // Set-up the attribute values for this operation.
587  //-------------------------------------------------------
588  localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);
589  if (localNdbOperation == NULL) {
590  error_handler(localNdbConnection->getNdbError());
591  return false;
592  }//if
593  switch (TType) {
594  case stInsert: // Insert case
595  if (theWriteFlag == 1 && theDirtyFlag == 1) {
596  localNdbOperation->dirtyWrite();
597  } else if (theWriteFlag == 1) {
598  localNdbOperation->writeTuple();
599  } else {
600  localNdbOperation->insertTuple();
601  }//if
602  break;
603  case stRead: // Read Case
604  TType = random_choice();
605  if (TType == stRead) {
606  if (theSimpleFlag == 1) {
607  localNdbOperation->simpleRead();
608  } else if (theDirtyFlag == 1) {
609  localNdbOperation->dirtyRead();
610  } else {
611  localNdbOperation->readTuple();
612  }//if
613  } else {
614  if (theWriteFlag == 1 && theDirtyFlag == 1) {
615  localNdbOperation->dirtyWrite();
616  } else if (theWriteFlag == 1) {
617  localNdbOperation->writeTuple();
618  } else if (theDirtyFlag == 1) {
619  localNdbOperation->dirtyUpdate();
620  } else {
621  localNdbOperation->updateTuple();
622  }//if
623  }//if
624  break;
625  case stDelete: // Delete Case
626  localNdbOperation->deleteTuple();
627  break;
628  default:
629  error_handler(localNdbOperation->getNdbError());
630  }//switch
631  localNdbOperation->equal((Uint32)0,vpn_id);
632  localNdbOperation->equal((Uint32)1,vpn_nb);
633  char* attrValue = &transNdbRef->transRecord[0];
634  switch (TType) {
635  case stInsert: // Insert case
636  localNdbOperation->setValue((Uint32)2, attrValue);
637  localNdbOperation->setValue((Uint32)3, attrValue);
638  localNdbOperation->setValue((Uint32)4, attrValue);
639  break;
640  case stUpdate: // Update Case
641  localNdbOperation->setValue((Uint32)3, attrValue);
642  break;
643  case stRead: // Read Case
644  localNdbOperation->getValue((Uint32)2, attrValue);
645  localNdbOperation->getValue((Uint32)3, attrValue);
646  localNdbOperation->getValue((Uint32)4, attrValue);
647  break;
648  case stDelete: // Delete Case
649  break;
650  default:
651  error_handler(localNdbOperation->getNdbError());
652  }//switch
653  localNdbConnection->executeAsynchPrepare(Commit, &executeCallback,
654  (void*)transNdbRef);
655  return true;
656 }//defineOperation()
657 
658 
659 static void setAttrNames()
660 {
661  BaseString::snprintf(attrName[0], MAXSTRLEN, "VPN_ID");
662  BaseString::snprintf(attrName[1], MAXSTRLEN, "VPN_NB");
663  BaseString::snprintf(attrName[2], MAXSTRLEN, "DIRECTORY_NB");
664  BaseString::snprintf(attrName[3], MAXSTRLEN, "LAST_CALL_PARTY");
665  BaseString::snprintf(attrName[4], MAXSTRLEN, "DESCR");
666 }
667 
668 
669 static void setTableNames()
670 {
671  BaseString::snprintf(tableName[0], MAXSTRLEN, "VPN_USERS");
672 }
673 
674 static
675 int
676 createTables(Ndb* pMyNdb){
677 
678  NdbSchemaCon *MySchemaTransaction;
679  NdbSchemaOp *MySchemaOp;
680  int check;
681 
682  if (theTableCreateFlag == 0) {
683  ndbout << "Creating Table: vpn_users " << "..." << endl;
684  MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
685 
686  if(MySchemaTransaction == NULL &&
687  (!error_handler(MySchemaTransaction->getNdbError())))
688  return -1;
689 
690  MySchemaOp = MySchemaTransaction->getNdbSchemaOp();
691  if(MySchemaOp == NULL &&
692  (!error_handler(MySchemaTransaction->getNdbError())))
693  return -1;
694 
695  check = MySchemaOp->createTable( tableName[0]
696  ,8 // Table Size
697  ,TupleKey // Key Type
698  ,40 // Nr of Pages
699  ,All
700  ,6
701  ,(tLoadFactor - 5)
702  ,tLoadFactor
703  ,1
704  ,!tempTable
705  );
706 
707  if (check == -1 &&
708  (!error_handler(MySchemaTransaction->getNdbError())))
709  return -1;
710 
711  check = MySchemaOp->createAttribute( (char*)attrName[0],
712  TupleKey,
713  32,
714  1,
715  UnSigned,
716  MMBased,
717  NotNullAttribute );
718 
719  if (check == -1 &&
720  (!error_handler(MySchemaTransaction->getNdbError())))
721  return -1;
722  check = MySchemaOp->createAttribute( (char*)attrName[1],
723  TupleKey,
724  32,
725  1,
726  UnSigned,
727  MMBased,
728  NotNullAttribute );
729 
730  if (check == -1 &&
731  (!error_handler(MySchemaTransaction->getNdbError())))
732  return -1;
733  check = MySchemaOp->createAttribute( (char*)attrName[2],
734  NoKey,
735  8,
736  10,
737  UnSigned,
738  MMBased,
739  NotNullAttribute );
740  if (check == -1 &&
741  (!error_handler(MySchemaTransaction->getNdbError())))
742  return -1;
743 
744  check = MySchemaOp->createAttribute( (char*)attrName[3],
745  NoKey,
746  8,
747  10,
748  UnSigned,
749  MMBased,
750  NotNullAttribute );
751  if (check == -1 &&
752  (!error_handler(MySchemaTransaction->getNdbError())))
753  return -1;
754 
755  check = MySchemaOp->createAttribute( (char*)attrName[4],
756  NoKey,
757  8,
758  100,
759  UnSigned,
760  MMBased,
761  NotNullAttribute );
762  if (check == -1 &&
763  (!error_handler(MySchemaTransaction->getNdbError())))
764  return -1;
765 
766  if (MySchemaTransaction->execute() == -1 &&
767  (!error_handler(MySchemaTransaction->getNdbError())))
768  return -1;
769 
770  NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
771  }//if
772 
773  return 0;
774 }
775 
776 bool error_handler(const NdbError& err){
777  ndbout << err << endl ;
778  switch(err.classification){
782  ndbout << endl << "Attempting to recover and continue now..." << endl ;
783  return true ; // return true to retry
784  default:
785  break;
786  }
787  return false;
788 }
789 #if 0
790 bool error_handler(const char* error_string, int error_int) {
791  ndbout << error_string << endl ;
792  if ((4008 == error_int) ||
793  (677 == error_int) ||
794  (891 == error_int) ||
795  (1221 == error_int) ||
796  (721 == error_int) ||
797  (266 == error_int)) {
798  ndbout << endl << "Attempting to recover and continue now..." << endl ;
799  return true ; // return true to retry
800  }
801  return false ; // return false to abort
802 }
803 #endif
804 
805 static
806 int
807 readArguments(int argc, const char** argv){
808 
809  int i = 1;
810  while (argc > 1){
811  if (strcmp(argv[i], "-t") == 0){
812  tNoOfThreads = atoi(argv[i+1]);
813  if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS)){
814  ndbout_c("Invalid no of threads");
815  return -1;
816  }
817  } else if (strcmp(argv[i], "-p") == 0){
818  tNoOfParallelTrans = atoi(argv[i+1]);
819  if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
820  ndbout_c("Invalid no of parallell transactions");
821  return -1;
822  }
823  } else if (strcmp(argv[i], "-o") == 0) {
824  tNoOfTransactions = atoi(argv[i+1]);
825  if (tNoOfTransactions < 1){
826  ndbout_c("Invalid no of transactions");
827  return -1;
828  }
829  } else if (strcmp(argv[i], "-l") == 0){
830  tNoOfLoops = atoi(argv[i+1]);
831  if (tNoOfLoops < 1) {
832  ndbout_c("Invalid no of loops");
833  return -1;
834  }
835  } else if (strcmp(argv[i], "-e") == 0){
836  tMinEvents = atoi(argv[i+1]);
837  if ((tMinEvents < 1) || (tMinEvents > tNoOfParallelTrans)) {
838  ndbout_c("Invalid no of loops");
839  return -1;
840  }
841  } else if (strcmp(argv[i], "-local") == 0){
842  tLocalPart = atoi(argv[i+1]);
843  tLocal = true;
844  startTransGuess = true;
845  if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
846  ndbout_c("Invalid local part");
847  return -1;
848  }
849  } else if (strcmp(argv[i], "-ufreq") == 0){
850  tUpdateFreq = atoi(argv[i+1]);
851  if ((tUpdateFreq < 0) || (tUpdateFreq > 100)){
852  ndbout_c("Invalid Update Frequency");
853  return -1;
854  }
855  } else if (strcmp(argv[i], "-load_factor") == 0){
856  tLoadFactor = atoi(argv[i+1]);
857  if ((tLoadFactor < 40) || (tLoadFactor >= 100)){
858  ndbout_c("Invalid LoadFactor");
859  return -1;
860  }
861  } else if (strcmp(argv[i], "-d") == 0){
862  tDelete = true;
863  argc++;
864  i--;
865  } else if (strcmp(argv[i], "-i") == 0){
866  tInsert = true;
867  argc++;
868  i--;
869  } else if (strcmp(argv[i], "-simple") == 0){
870  theSimpleFlag = 1;
871  argc++;
872  i--;
873  } else if (strcmp(argv[i], "-adaptive") == 0){
874  tSendForce = 0;
875  argc++;
876  i--;
877  } else if (strcmp(argv[i], "-force") == 0){
878  tSendForce = 1;
879  argc++;
880  i--;
881  } else if (strcmp(argv[i], "-non_adaptive") == 0){
882  tSendForce = 2;
883  argc++;
884  i--;
885  } else if (strcmp(argv[i], "-write") == 0){
886  theWriteFlag = 1;
887  argc++;
888  i--;
889  } else if (strcmp(argv[i], "-dirty") == 0){
890  theDirtyFlag = 1;
891  argc++;
892  i--;
893  } else if (strcmp(argv[i], "-table_create") == 0){
894  theTableCreateFlag = 0;
895  tInsert = true;
896  argc++;
897  i--;
898  } else if (strcmp(argv[i], "-temp") == 0){
899  tempTable = true;
900  argc++;
901  i--;
902  } else if (strcmp(argv[i], "-no_hint") == 0){
903  startTransGuess = false;
904  argc++;
905  i--;
906  } else {
907  return -1;
908  }
909 
910  argc -= 2;
911  i = i + 2;
912  }//while
913  if (tLocal == true) {
914  if (startTransGuess == false) {
915  ndbout_c("Not valid to use no_hint with local");
916  }//if
917  }//if
918  return 0;
919 }
920 
921 static
922 void
923 input_error(){
924 
925  ndbout_c("FLEXTT");
926  ndbout_c(" Perform benchmark of insert, update and delete transactions");
927  ndbout_c(" ");
928  ndbout_c("Arguments:");
929  ndbout_c(" -t Number of threads to start, default 1");
930  ndbout_c(" -p Number of parallel transactions per thread, default 32");
931  ndbout_c(" -o Number of transactions per loop, default 500");
932  ndbout_c(" -ufreq Number Update Frequency in percent (0 -> 100), rest is read");
933  ndbout_c(" -load_factor Number Fill level in index in percent (40 -> 99)");
934  ndbout_c(" -l Number of loops to run, default 1, 0=infinite");
935  ndbout_c(" -i Start by inserting all records");
936  ndbout_c(" -d End by deleting all records (only one loop)");
937  ndbout_c(" -simple Use simple read to read from database");
938  ndbout_c(" -dirty Use dirty read to read from database");
939  ndbout_c(" -write Use writeTuple in insert and update");
940  ndbout_c(" -n Use standard table names");
941  ndbout_c(" -table_create Create tables in db");
942  ndbout_c(" -temp Create table(s) without logging");
943  ndbout_c(" -no_hint Don't give hint on where to execute transaction coordinator");
944  ndbout_c(" -adaptive Use adaptive send algorithm (default)");
945  ndbout_c(" -force Force send when communicating");
946  ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
947  ndbout_c(" -local Number of part, only use keys in one part out of 16");
948 }