MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
flexAsynch.cpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 
20 #include <ndb_global.h>
21 #include "NdbApi.hpp"
22 #include <NdbSchemaCon.hpp>
23 #include <NdbMain.h>
24 #include <md5_hash.hpp>
25 
26 #include <NdbThread.h>
27 #include <NdbSleep.h>
28 #include <NdbTick.h>
29 #include <NdbOut.hpp>
30 #include <NdbTimer.hpp>
31 #include <NDBT_Error.hpp>
32 
33 #include <NdbTest.hpp>
34 #include <NDBT_Stats.hpp>
35 
36 #define MAX_PARTS 4
37 #define MAX_SEEK 16
38 #define MAXSTRLEN 16
39 #define MAXATTR 64
40 #define MAXTABLES 64
41 #define NDB_MAXTHREADS 128
42 /*
43  NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
44  #define from <sys/thread.h> on AIX (IBM compiler). We explicitly
45  #undef it here lest someone use it by habit and get really funny
46  results. K&R says we may #undef non-existent symbols, so let's go.
47 */
48 #undef MAXTHREADS
49 #define MAXPAR 1024
50 #define MAXATTRSIZE 1000
51 #define PKSIZE 2
52 
53 enum StartType {
54  stIdle,
55  stInsert,
56  stRead,
57  stUpdate,
58  stDelete,
59  stStop
60 } ;
61 
62 struct ThreadNdb
63 {
64  int NoOfOps;
65  int ThreadNo;
66  char * record;
67 };
68 
69 extern "C" { static void* threadLoop(void*); }
70 static void setAttrNames(void);
71 static void setTableNames(void);
72 static int readArguments(int argc, const char** argv);
73 static int createTables(Ndb*);
74 static void defineOperation(NdbConnection* aTransObject, StartType aType,
75  Uint32 base, Uint32 aIndex);
76 static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType,
77  Uint32 base, Uint32 aIndex);
78 static void execute(StartType aType);
79 static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
80 static void executeCallback(int result, NdbConnection* NdbObject,
81  void* aObject);
82 static bool error_handler(const NdbError & err);
83 static Uint32 getKey(Uint32, Uint32) ;
84 static void input_error();
85 
86 
87 static int retry_opt = 3 ;
88 static int failed = 0 ;
89 
90 ErrorData * flexAsynchErrorData;
91 
92 static NdbThread* threadLife[NDB_MAXTHREADS];
93 static int tNodeId;
94 static int ThreadReady[NDB_MAXTHREADS];
95 static StartType ThreadStart[NDB_MAXTHREADS];
96 static char tableName[MAXTABLES][MAXSTRLEN+1];
97 static char attrName[MAXATTR][MAXSTRLEN+1];
98 
99 // Program Parameters
100 static NdbRecord * g_record[MAXTABLES];
101 static bool tNdbRecord = false;
102 
103 static bool tLocal = false;
104 static int tLocalPart = 0;
105 static int tSendForce = 0;
106 static int tNoOfLoops = 1;
107 static int tAttributeSize = 1;
108 static unsigned int tNoOfThreads = 1;
109 static unsigned int tNoOfParallelTrans = 32;
110 static unsigned int tNoOfAttributes = 25;
111 static unsigned int tNoOfTransactions = 500;
112 static unsigned int tNoOfOpsPerTrans = 1;
113 static unsigned int tLoadFactor = 80;
114 static bool tempTable = false;
115 static bool startTransGuess = true;
116 static int tExtraReadLoop = 0;
117 
118 //Program Flags
119 static int theTestFlag = 0;
120 static int theSimpleFlag = 0;
121 static int theDirtyFlag = 0;
122 static int theWriteFlag = 0;
123 static int theStdTableNameFlag = 0;
124 static int theTableCreateFlag = 0;
125 static int tConnections = 1;
126 
127 #define START_REAL_TIME
128 #define STOP_REAL_TIME
129 #define START_TIMER { NdbTimer timer; timer.doStart();
130 #define STOP_TIMER timer.doStop();
131 #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
132 
133 NDBT_Stats a_i, a_u, a_d, a_r;
134 
135 static
136 void
137 print(const char * name, NDBT_Stats& s)
138 {
139  printf("%s average: %u/s min: %u/s max: %u/s stddev: %u%%\n",
140  name,
141  (unsigned)s.getMean(),
142  (unsigned)s.getMin(),
143  (unsigned)s.getMax(),
144  (unsigned)(100*s.getStddev() / s.getMean()));
145 }
146 
147 static void
148 resetThreads(){
149 
150  for (unsigned i = 0; i < tNoOfThreads ; i++) {
151  ThreadReady[i] = 0;
152  ThreadStart[i] = stIdle;
153  }//for
154 }
155 
156 static void
157 waitForThreads(void)
158 {
159  int cont = 0;
160  do {
161  cont = 0;
162  NdbSleep_MilliSleep(20);
163  for (unsigned i = 0; i < tNoOfThreads ; i++) {
164  if (ThreadReady[i] == 0) {
165  cont = 1;
166  }//if
167  }//for
168  } while (cont == 1);
169 }
170 
171 static void
172 tellThreads(StartType what)
173 {
174  for (unsigned i = 0; i < tNoOfThreads ; i++)
175  ThreadStart[i] = what;
176 }
177 
178 static Ndb_cluster_connection * g_cluster_connection = 0;
179 
180 NDB_COMMAND(flexAsynch, "flexAsynch", "flexAsynch", "flexAsynch", 65535)
181 {
182  ndb_init();
183  ThreadNdb* pThreadData;
184  int tLoops=0;
185  int returnValue = NDBT_OK;
186 
187  flexAsynchErrorData = new ErrorData;
188  flexAsynchErrorData->resetErrorCounters();
189 
190  if (readArguments(argc, argv) != 0){
191  input_error();
192  return NDBT_ProgramExit(NDBT_WRONGARGS);
193  }
194 
195  pThreadData = new ThreadNdb[NDB_MAXTHREADS];
196 
197  ndbout << endl << "FLEXASYNCH - Starting normal mode" << endl;
198  ndbout << "Perform benchmark of insert, update and delete transactions";
199  ndbout << endl;
200  ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl;
201  ndbout << " " << tNoOfParallelTrans;
202  ndbout << " number of parallel operation per thread " << endl;
203  ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl;
204  ndbout << " " << tNoOfLoops << " iterations " << endl;
205  ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl;
206  ndbout << " " << tNoOfAttributes << " attributes per table " << endl;
207  ndbout << " " << tAttributeSize;
208  ndbout << " is the number of 32 bit words per attribute " << endl;
209  if (tempTable == true) {
210  ndbout << " Tables are without logging " << endl;
211  } else {
212  ndbout << " Tables are with logging " << endl;
213  }//if
214  if (startTransGuess == true) {
215  ndbout << " Transactions are executed with hint provided" << endl;
216  } else {
217  ndbout << " Transactions are executed with round robin scheme" << endl;
218  }//if
219  if (tSendForce == 0) {
220  ndbout << " No force send is used, adaptive algorithm used" << endl;
221  } else if (tSendForce == 1) {
222  ndbout << " Force send used" << endl;
223  } else {
224  ndbout << " No force send is used, adaptive algorithm disabled" << endl;
225  }//if
226 
227  ndbout << endl;
228 
229  NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
230 
231  /* print Setting */
232  flexAsynchErrorData->printSettings(ndbout);
233 
234  setAttrNames();
235  setTableNames();
236 
237  g_cluster_connection = new Ndb_cluster_connection [tConnections];
238  if (tConnections > 1)
239  {
240  printf("Creating %u connections...", tConnections);
241  fflush(stdout);
242  }
243  for (int i = 0; i < tConnections; i++)
244  {
245  if(g_cluster_connection[i].connect(12, 5, 1) != 0)
246  return NDBT_ProgramExit(NDBT_FAILED);
247  }
248  if (tConnections > 1)
249  {
250  printf("\n");
251  fflush(stdout);
252  }
253 
254  Ndb * pNdb = new Ndb(g_cluster_connection+0, "TEST_DB");
255  pNdb->init();
256  tNodeId = pNdb->getNodeId();
257 
258  ndbout << " NdbAPI node with id = " << pNdb->getNodeId() << endl;
259  ndbout << endl;
260 
261  ndbout << "Waiting for ndb to become ready..." <<endl;
262  if (pNdb->waitUntilReady(10000) != 0){
263  ndbout << "NDB is not ready" << endl;
264  ndbout << "Benchmark failed!" << endl;
265  returnValue = NDBT_FAILED;
266  }
267 
268  if(returnValue == NDBT_OK){
269  if (createTables(pNdb) != 0){
270  returnValue = NDBT_FAILED;
271  }
272  }
273 
274  if (tNdbRecord)
275  {
276  Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
277  sz += 3;
278  for (Uint32 i = 0; i<tNoOfThreads; i++)
279  {
280  pThreadData[i].record = (char*)malloc(sz);
281  bzero(pThreadData[i].record, sz);
282  }
283  }
284 
285  if(returnValue == NDBT_OK){
286  /****************************************************************
287  * Create NDB objects. *
288  ****************************************************************/
289  resetThreads();
290  for (Uint32 i = 0; i < tNoOfThreads ; i++) {
291  pThreadData[i].ThreadNo = i
292 ;
293  threadLife[i] = NdbThread_Create(threadLoop,
294  (void**)&pThreadData[i],
295  32768,
296  "flexAsynchThread",
297  NDB_THREAD_PRIO_LOW);
298  }//for
299  ndbout << endl << "All NDB objects and table created" << endl << endl;
300  int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
301  /****************************************************************
302  * Execute program. *
303  ****************************************************************/
304 
305  for(;;) {
306 
307  int loopCount = tLoops + 1 ;
308  ndbout << endl << "Loop # " << loopCount << endl << endl ;
309 
310  /****************************************************************
311  * Perform inserts. *
312  ****************************************************************/
313 
314  failed = 0 ;
315 
316  START_TIMER;
317  execute(stInsert);
318  STOP_TIMER;
319  a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
320  PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
321 
322  if (0 < failed) {
323  int i = retry_opt ;
324  int ci = 1 ;
325  while (0 < failed && 0 < i){
326  ndbout << failed << " of the transactions returned errors!"
327  << endl << endl;
328  ndbout << "Attempting to redo the failed transactions now..."
329  << endl ;
330  ndbout << "Redo attempt " << ci <<" out of " << retry_opt
331  << endl << endl;
332  failed = 0 ;
333  START_TIMER;
334  execute(stInsert);
335  STOP_TIMER;
336  PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
337  i-- ;
338  ci++;
339  }
340  if(0 == failed ){
341  ndbout << endl <<"Redo attempt succeeded" << endl << endl;
342  }else{
343  ndbout << endl <<"Redo attempt failed, moving on now..." << endl
344  << endl;
345  }//if
346  }//if
347 
348  /****************************************************************
349  * Perform read. *
350  ****************************************************************/
351 
352  failed = 0 ;
353 
354  for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
355  {
356  START_TIMER;
357  execute(stRead);
358  STOP_TIMER;
359  a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
360  PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
361  }
362 
363  if (0 < failed) {
364  int i = retry_opt ;
365  int cr = 1;
366  while (0 < failed && 0 < i){
367  ndbout << failed << " of the transactions returned errors!"<<endl ;
368  ndbout << endl;
369  ndbout <<"Attempting to redo the failed transactions now..." << endl;
370  ndbout << endl;
371  ndbout <<"Redo attempt " << cr <<" out of ";
372  ndbout << retry_opt << endl << endl;
373  failed = 0 ;
374  START_TIMER;
375  execute(stRead);
376  STOP_TIMER;
377  PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
378  i-- ;
379  cr++ ;
380  }//while
381  if(0 == failed ) {
382  ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
383  }else{
384  ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
385  }//if
386  }//if
387 
388 
389  /****************************************************************
390  * Perform update. *
391  ****************************************************************/
392 
393  failed = 0 ;
394 
395  START_TIMER;
396  execute(stUpdate);
397  STOP_TIMER;
398  a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
399  PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
400 
401  if (0 < failed) {
402  int i = retry_opt ;
403  int cu = 1 ;
404  while (0 < failed && 0 < i){
405  ndbout << failed << " of the transactions returned errors!"<<endl ;
406  ndbout << endl;
407  ndbout <<"Attempting to redo the failed transactions now..." << endl;
408  ndbout << endl <<"Redo attempt " << cu <<" out of ";
409  ndbout << retry_opt << endl << endl;
410  failed = 0 ;
411  START_TIMER;
412  execute(stUpdate);
413  STOP_TIMER;
414  PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
415  i-- ;
416  cu++ ;
417  }//while
418  if(0 == failed ){
419  ndbout << endl <<"Redo attempt succeeded" << endl << endl;
420  } else {
421  ndbout << endl;
422  ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
423  }//if
424  }//if
425 
426  /****************************************************************
427  * Perform read. *
428  ****************************************************************/
429 
430  failed = 0 ;
431 
432  for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
433  {
434  START_TIMER;
435  execute(stRead);
436  STOP_TIMER;
437  a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
438  PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
439  }
440 
441  if (0 < failed) {
442  int i = retry_opt ;
443  int cr2 = 1 ;
444  while (0 < failed && 0 < i){
445  ndbout << failed << " of the transactions returned errors!"<<endl ;
446  ndbout << endl;
447  ndbout <<"Attempting to redo the failed transactions now..." << endl;
448  ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
449  ndbout << retry_opt << endl << endl;
450  failed = 0 ;
451  START_TIMER;
452  execute(stRead);
453  STOP_TIMER;
454  PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
455  i-- ;
456  cr2++ ;
457  }//while
458  if(0 == failed ){
459  ndbout << endl <<"Redo attempt succeeded" << endl << endl;
460  }else{
461  ndbout << endl;
462  ndbout << "Redo attempt failed, moving on now..." << endl << endl;
463  }//if
464  }//if
465 
466 
467  /****************************************************************
468  * Perform delete. *
469  ****************************************************************/
470 
471  failed = 0 ;
472 
473  START_TIMER;
474  execute(stDelete);
475  STOP_TIMER;
476  a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
477  PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
478 
479  if (0 < failed) {
480  int i = retry_opt ;
481  int cd = 1 ;
482  while (0 < failed && 0 < i){
483  ndbout << failed << " of the transactions returned errors!"<< endl ;
484  ndbout << endl;
485  ndbout <<"Attempting to redo the failed transactions now:" << endl ;
486  ndbout << endl <<"Redo attempt " << cd <<" out of ";
487  ndbout << retry_opt << endl << endl;
488  failed = 0 ;
489  START_TIMER;
490  execute(stDelete);
491  STOP_TIMER;
492  PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
493  i-- ;
494  cd++ ;
495  }//while
496  if(0 == failed ){
497  ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
498  }else{
499  ndbout << endl;
500  ndbout << "Redo attempt failed, moving on now..." << endl << endl;
501  }//if
502  }//if
503 
504  tLoops++;
505  ndbout << "--------------------------------------------------" << endl;
506 
507  if(tNoOfLoops != 0){
508  if(tNoOfLoops <= tLoops)
509  break ;
510  }
511  }//for
512 
513  execute(stStop);
514  void * tmp;
515  for(Uint32 i = 0; i<tNoOfThreads; i++){
516  NdbThread_WaitFor(threadLife[i], &tmp);
517  NdbThread_Destroy(&threadLife[i]);
518  }
519  }
520  delete [] pThreadData;
521  delete pNdb;
522 
523  //printing errorCounters
524  flexAsynchErrorData->printErrorCounters(ndbout);
525 
526  print("insert", a_i);
527  print("update", a_u);
528  print("delete", a_d);
529  print("read ", a_r);
530 
531  delete [] g_cluster_connection;
532 
533  return NDBT_ProgramExit(returnValue);
534 }//main()
535 
536 
537 static void execute(StartType aType)
538 {
539  resetThreads();
540  tellThreads(aType);
541  waitForThreads();
542 }//execute()
543 
544 static void*
545 threadLoop(void* ThreadData)
546 {
547  Ndb* localNdb;
548  StartType tType;
549  ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
550  int threadNo = tabThread->ThreadNo;
551  localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
552  localNdb->init(1024);
553  localNdb->waitUntilReady(10000);
554  unsigned int threadBase = (threadNo << 16) + tNodeId ;
555 
556  for (;;){
557  while (ThreadStart[threadNo] == stIdle) {
558  NdbSleep_MilliSleep(10);
559  }//while
560 
561  // Check if signal to exit is received
562  if (ThreadStart[threadNo] == stStop) {
563  break;
564  }//if
565 
566  tType = ThreadStart[threadNo];
567  ThreadStart[threadNo] = stIdle;
568  if(!executeThread(tabThread, tType, localNdb, threadBase)){
569  break;
570  }
571  ThreadReady[threadNo] = 1;
572  }//for
573 
574  delete localNdb;
575  ThreadReady[threadNo] = 1;
576 
577  return NULL;
578 }//threadLoop()
579 
580 static
581 bool
582 executeThread(ThreadNdb* pThread,
583  StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
584 
585  NdbConnection* tConArray[1024];
586  unsigned int tBase;
587  unsigned int tBase2;
588 
589  unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
590 
591  for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
592  {
593  for (unsigned int i = 0; i < tNoOfTransactions; i++) {
594  if (tLocal == false) {
595  tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
596  } else {
597  tBase = i * tNoOfParallelTrans * MAX_SEEK;
598  }//if
599  START_REAL_TIME;
600  for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
601  if (tLocal == false) {
602  tBase2 = tBase + (j * tNoOfOpsPerTrans);
603  } else {
604  tBase2 = tBase + (j * MAX_SEEK);
605  tBase2 = getKey(threadBase, tBase2);
606  }//if
607  if (startTransGuess == true) {
608  union {
609  Uint64 Tkey64;
610  Uint32 Tkey32[2];
611  };
612  Tkey32[0] = threadBase;
613  Tkey32[1] = tBase2;
614  tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
615  (const char*)&Tkey64, //Main PKey
616  (Uint32)4); //Key Length
617  } else {
618  tConArray[j] = aNdbObject->startTransaction();
619  }//if
620  if (tConArray[j] == NULL &&
621  !error_handler(aNdbObject->getNdbError()) ){
622  ndbout << endl << "Unable to recover! Quiting now" << endl ;
623  return false;
624  }//if
625 
626  for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
627  //-------------------------------------------------------
628  // Define the operation, but do not execute it yet.
629  //-------------------------------------------------------
630  if (tNdbRecord)
631  defineNdbRecordOperation(pThread,
632  tConArray[j], aType, threadBase,(tBase2+k));
633  else
634  defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
635  }//for
636 
637  tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
638  }//for
639  STOP_REAL_TIME;
640  //-------------------------------------------------------
641  // Now we have defined a set of operations, it is now time
642  // to execute all of them.
643  //-------------------------------------------------------
644  int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
645  while (unsigned(Tcomp) < tNoOfParallelTrans) {
646  int TlocalComp = aNdbObject->pollNdb(3000, 0);
647  Tcomp += TlocalComp;
648  }//while
649  for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
650  aNdbObject->closeTransaction(tConArray[j]);
651  }//for
652  }//for
653  } // for
654  return true;
655 }//executeThread()
656 
657 static
658 Uint32
659 getKey(Uint32 aBase, Uint32 anIndex) {
660  Uint32 Tfound = anIndex;
661  union {
662  Uint64 Tkey64;
663  Uint32 Tkey32[2];
664  };
665  Tkey32[0] = aBase;
666  Uint32 hash;
667  for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {
668  Tkey32[1] = (Uint32)i;
669  hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
670  hash = (hash >> 6) & (MAX_PARTS - 1);
671  if (hash == unsigned(tLocalPart)) {
672  Tfound = i;
673  break;
674  }//if
675  }//for
676  return Tfound;
677 }//getKey()
678 
679 static void
680 executeCallback(int result, NdbConnection* NdbObject, void* aObject)
681 {
682  if (result == -1) {
683 
684  // Add complete error handling here
685 
686  int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
687  if (retCode == 1) {
688  if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
689  ndbout_c("execute: %s", NdbObject->getNdbError().message);
690  ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
691  } else if (retCode == 2) {
692  ndbout << "4115 should not happen in flexAsynch" << endl;
693  } else if (retCode == 3) {
694  /* What can we do here? */
695  ndbout_c("execute: %s", NdbObject->getNdbError().message);
696  }//if(retCode == 3)
697 
698  // ndbout << "Error occured in poll:" << endl;
699  // ndbout << NdbObject->getNdbError() << endl;
700  failed++ ;
701  return;
702  }//if
703  return;
704 }//executeCallback()
705 
706 
707 
708 static void
709 defineOperation(NdbConnection* localNdbConnection, StartType aType,
710  Uint32 threadBase, Uint32 aIndex)
711 {
712  NdbOperation* localNdbOperation;
713  unsigned int loopCountAttributes = tNoOfAttributes;
714  unsigned int countAttributes;
715  Uint32 attrValue[MAXATTRSIZE];
716 
717  //-------------------------------------------------------
718  // Set-up the attribute values for this operation.
719  //-------------------------------------------------------
720  attrValue[0] = threadBase;
721  attrValue[1] = aIndex;
722  for (unsigned k = 2; k < loopCountAttributes; k++) {
723  attrValue[k] = aIndex;
724  }//for
725  localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);
726  if (localNdbOperation == NULL) {
727  error_handler(localNdbConnection->getNdbError());
728  }//if
729  switch (aType) {
730  case stInsert: { // Insert case
731  if (theWriteFlag == 1 && theDirtyFlag == 1) {
732  localNdbOperation->dirtyWrite();
733  } else if (theWriteFlag == 1) {
734  localNdbOperation->writeTuple();
735  } else {
736  localNdbOperation->insertTuple();
737  }//if
738  break;
739  }//case
740  case stRead: { // Read Case
741  if (theSimpleFlag == 1) {
742  localNdbOperation->simpleRead();
743  } else if (theDirtyFlag == 1) {
744  localNdbOperation->dirtyRead();
745  } else {
746  localNdbOperation->readTuple();
747  }//if
748  break;
749  }//case
750  case stUpdate: { // Update Case
751  if (theWriteFlag == 1 && theDirtyFlag == 1) {
752  localNdbOperation->dirtyWrite();
753  } else if (theWriteFlag == 1) {
754  localNdbOperation->writeTuple();
755  } else if (theDirtyFlag == 1) {
756  localNdbOperation->dirtyUpdate();
757  } else {
758  localNdbOperation->updateTuple();
759  }//if
760  break;
761  }//case
762  case stDelete: { // Delete Case
763  localNdbOperation->deleteTuple();
764  break;
765  }//case
766  default: {
767  error_handler(localNdbOperation->getNdbError());
768  }//default
769  }//switch
770  localNdbOperation->equal((Uint32)0,(char*)&attrValue[0]);
771  switch (aType) {
772  case stInsert: // Insert case
773  case stUpdate: // Update Case
774  {
775  for (countAttributes = 1;
776  countAttributes < loopCountAttributes; countAttributes++) {
777  localNdbOperation->setValue(countAttributes,
778  (char*)&attrValue[0]);
779  }//for
780  break;
781  }//case
782  case stRead: { // Read Case
783  for (countAttributes = 1;
784  countAttributes < loopCountAttributes; countAttributes++) {
785  localNdbOperation->getValue(countAttributes,
786  (char*)&attrValue[0]);
787  }//for
788  break;
789  }//case
790  case stDelete: { // Delete Case
791  break;
792  }//case
793  default: {
794  //goto error_handler; < epaulsa
795  error_handler(localNdbOperation->getNdbError());
796  }//default
797  }//switch
798  return;
799 }//defineOperation()
800 
801 
802 static void
803 defineNdbRecordOperation(ThreadNdb* pThread,
804  NdbConnection* pTrans, StartType aType,
805  Uint32 threadBase, Uint32 aIndex)
806 {
807  char * record = pThread->record;
808  Uint32 offset;
809  NdbDictionary::getOffset(g_record[0], 0, offset);
810  * (Uint32*)(record + offset) = threadBase;
811  * (Uint32*)(record + offset + 4) = aIndex;
812 
813  //-------------------------------------------------------
814  // Set-up the attribute values for this operation.
815  //-------------------------------------------------------
816  if (aType != stRead && aType != stDelete)
817  {
818  for (unsigned k = 1; k < tNoOfAttributes; k++) {
819  NdbDictionary::getOffset(g_record[0], k, offset);
820  * (Uint32*)(record + offset) = aIndex;
821  }//for
822  }
823 
824  const NdbOperation* op;
825  switch (aType) {
826  case stInsert: { // Insert case
827  if (theWriteFlag == 1)
828  {
829  op = pTrans->writeTuple(g_record[0],record,g_record[0],record);
830  }
831  else
832  {
833  op = pTrans->insertTuple(g_record[0],record,g_record[0],record);
834  }
835  break;
836  }//case
837  case stRead: { // Read Case
838  op = pTrans->readTuple(g_record[0],record,g_record[0],record, NdbOperation::LM_CommittedRead);
839  break;
840  }//case
841  case stUpdate:{ // Update Case
842  op = pTrans->updateTuple(g_record[0],record,g_record[0],record);
843  break;
844  }//case
845  case stDelete: { // Delete Case
846  op = pTrans->deleteTuple(g_record[0],record, g_record[0]);
847  break;
848  }//case
849  default: {
850  abort();
851  }//default
852  }//switch
853 
854  if (op == NULL)
855  {
856  ndbout << "Operation is null " << pTrans->getNdbError() << endl;
857  abort();
858  }
859 
860  assert(op != 0);
861 }
862 
863 static void setAttrNames()
864 {
865  int i;
866 
867  for (i = 0; i < MAXATTR ; i++){
868  BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
869  }
870 }
871 
872 
873 static void setTableNames()
874 {
875  // Note! Uses only uppercase letters in table name's
876  // so that we can look at the tables wits SQL
877  int i;
878  for (i = 0; i < MAXTABLES ; i++){
879  if (theStdTableNameFlag==0){
880  BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%u", i,
881  (unsigned)(NdbTick_CurrentMillisecond()+rand()));
882  } else {
883  BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
884  }
885  }
886 }
887 
888 static
889 int
890 createTables(Ndb* pMyNdb){
891 
892  NdbSchemaCon *MySchemaTransaction;
893  NdbSchemaOp *MySchemaOp;
894  int check;
895 
896  if (theTableCreateFlag == 0) {
897  for(int i=0; i < 1 ;i++) {
898  ndbout << "Creating " << tableName[i] << "..." << endl;
899  MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
900 
901  if(MySchemaTransaction == NULL &&
902  (!error_handler(MySchemaTransaction->getNdbError())))
903  return -1;
904 
905  MySchemaOp = MySchemaTransaction->getNdbSchemaOp();
906  if(MySchemaOp == NULL &&
907  (!error_handler(MySchemaTransaction->getNdbError())))
908  return -1;
909 
910 
911  check = MySchemaOp->createTable( tableName[i]
912  ,8 // Table Size
913  ,TupleKey // Key Type
914  ,40 // Nr of Pages
915  ,All
916  ,6
917  ,(tLoadFactor - 5)
918  ,(tLoadFactor)
919  ,1
920  ,!tempTable
921  );
922 
923  if (check == -1 &&
924  (!error_handler(MySchemaTransaction->getNdbError())))
925  return -1;
926 
927  check = MySchemaOp->createAttribute( (char*)attrName[0],
928  TupleKey,
929  32,
930  PKSIZE,
931  UnSigned,
932  MMBased,
933  NotNullAttribute );
934 
935  if (check == -1 &&
936  (!error_handler(MySchemaTransaction->getNdbError())))
937  return -1;
938  for (unsigned j = 1; j < tNoOfAttributes ; j++){
939  check = MySchemaOp->createAttribute( (char*)attrName[j],
940  NoKey,
941  32,
942  tAttributeSize,
943  UnSigned,
944  MMBased,
945  NotNullAttribute );
946  if (check == -1 &&
947  (!error_handler(MySchemaTransaction->getNdbError())))
948  return -1;
949  }
950 
951  if (MySchemaTransaction->execute() == -1 &&
952  (!error_handler(MySchemaTransaction->getNdbError())))
953  return -1;
954 
955  NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
956 
957  if (tNdbRecord)
958  {
959  NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
960  const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
961 
962  int off = 0;
964  for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
965  {
967  r0.column = pTab->getColumn(j);
968  r0.offset = off;
969  off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
970  spec.push_back(r0);
971  }
972  g_record[i] =
973  pDict->createRecord(pTab, spec.getBase(),
974  spec.size(),
976  assert(g_record[i]);
977  }
978  }
979  }
980 
981  return 0;
982 }
983 
984 static
985 bool error_handler(const NdbError & err){
986  ndbout << err << endl ;
987  switch(err.classification){
991  ndbout << endl << "Attempting to recover and continue now..." << endl ;
992  return true;
993  default:
994  break;
995  }
996  return false ; // return false to abort
997 }
998 
999 static
1000 int
1001 readArguments(int argc, const char** argv){
1002 
1003  int i = 1;
1004  while (argc > 1){
1005  if (strcmp(argv[i], "-t") == 0){
1006  tNoOfThreads = atoi(argv[i+1]);
1007  if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS)){
1008  ndbout_c("Invalid no of threads");
1009  return -1;
1010  }
1011  } else if (strcmp(argv[i], "-p") == 0){
1012  tNoOfParallelTrans = atoi(argv[i+1]);
1013  if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
1014  ndbout_c("Invalid no of parallell transactions");
1015  return -1;
1016  }
1017  } else if (strcmp(argv[i], "-load_factor") == 0){
1018  tLoadFactor = atoi(argv[i+1]);
1019  if ((tLoadFactor < 40) || (tLoadFactor > 99)){
1020  ndbout_c("Invalid load factor");
1021  return -1;
1022  }
1023  } else if (strcmp(argv[i], "-c") == 0) {
1024  tNoOfOpsPerTrans = atoi(argv[i+1]);
1025  if (tNoOfOpsPerTrans < 1){
1026  ndbout_c("Invalid no of operations per transaction");
1027  return -1;
1028  }
1029  } else if (strcmp(argv[i], "-o") == 0) {
1030  tNoOfTransactions = atoi(argv[i+1]);
1031  if (tNoOfTransactions < 1){
1032  ndbout_c("Invalid no of transactions");
1033  return -1;
1034  }
1035  } else if (strcmp(argv[i], "-a") == 0){
1036  tNoOfAttributes = atoi(argv[i+1]);
1037  if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)){
1038  ndbout_c("Invalid no of attributes");
1039  return -1;
1040  }
1041  } else if (strcmp(argv[i], "-n") == 0){
1042  theStdTableNameFlag = 1;
1043  argc++;
1044  i--;
1045  } else if (strcmp(argv[i], "-l") == 0){
1046  tNoOfLoops = atoi(argv[i+1]);
1047  if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)){
1048  ndbout_c("Invalid no of loops");
1049  return -1;
1050  }
1051  } else if (strcmp(argv[i], "-s") == 0){
1052  tAttributeSize = atoi(argv[i+1]);
1053  if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)){
1054  ndbout_c("Invalid attributes size");
1055  return -1;
1056  }
1057  } else if (strcmp(argv[i], "-local") == 0){
1058  tLocalPart = atoi(argv[i+1]);
1059  tLocal = true;
1060  startTransGuess = true;
1061  if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
1062  ndbout_c("Invalid local part");
1063  return -1;
1064  }
1065  } else if (strcmp(argv[i], "-simple") == 0){
1066  theSimpleFlag = 1;
1067  argc++;
1068  i--;
1069  } else if (strcmp(argv[i], "-adaptive") == 0){
1070  tSendForce = 0;
1071  argc++;
1072  i--;
1073  } else if (strcmp(argv[i], "-force") == 0){
1074  tSendForce = 1;
1075  argc++;
1076  i--;
1077  } else if (strcmp(argv[i], "-non_adaptive") == 0){
1078  tSendForce = 2;
1079  argc++;
1080  i--;
1081  } else if (strcmp(argv[i], "-write") == 0){
1082  theWriteFlag = 1;
1083  argc++;
1084  i--;
1085  } else if (strcmp(argv[i], "-dirty") == 0){
1086  theDirtyFlag = 1;
1087  argc++;
1088  i--;
1089  } else if (strcmp(argv[i], "-test") == 0){
1090  theTestFlag = 1;
1091  argc++;
1092  i--;
1093  } else if (strcmp(argv[i], "-no_table_create") == 0){
1094  theTableCreateFlag = 1;
1095  argc++;
1096  i--;
1097  } else if (strcmp(argv[i], "-temp") == 0){
1098  tempTable = true;
1099  argc++;
1100  i--;
1101  } else if (strcmp(argv[i], "-no_hint") == 0){
1102  startTransGuess = false;
1103  argc++;
1104  i--;
1105  } else if (strcmp(argv[i], "-ndbrecord") == 0){
1106  tNdbRecord = true;
1107  argc++;
1108  i--;
1109  } else if (strcmp(argv[i], "-r") == 0){
1110  tExtraReadLoop = atoi(argv[i+1]);
1111  } else if (strcmp(argv[i], "-con") == 0){
1112  tConnections = atoi(argv[i+1]);
1113  } else {
1114  return -1;
1115  }
1116 
1117  argc -= 2;
1118  i = i + 2;
1119  }//while
1120  if (tLocal == true) {
1121  if (tNoOfOpsPerTrans != 1) {
1122  ndbout_c("Not valid to have more than one op per trans with local");
1123  }//if
1124  if (startTransGuess == false) {
1125  ndbout_c("Not valid to use no_hint with local");
1126  }//if
1127  }//if
1128  return 0;
1129 }
1130 
1131 static
1132 void
1133 input_error(){
1134 
1135  ndbout_c("FLEXASYNCH");
1136  ndbout_c(" Perform benchmark of insert, update and delete transactions");
1137  ndbout_c(" ");
1138  ndbout_c("Arguments:");
1139  ndbout_c(" -t Number of threads to start, default 1");
1140  ndbout_c(" -p Number of parallel transactions per thread, default 32");
1141  ndbout_c(" -o Number of transactions per loop, default 500");
1142  ndbout_c(" -l Number of loops to run, default 1, 0=infinite");
1143  ndbout_c(" -load_factor Number Load factor in index in percent (40 -> 99)");
1144  ndbout_c(" -a Number of attributes, default 25");
1145  ndbout_c(" -c Number of operations per transaction");
1146  ndbout_c(" -s Size of each attribute, default 1 ");
1147  ndbout_c(" (PK is always of size 1, independent of this value)");
1148  ndbout_c(" -simple Use simple read to read from database");
1149  ndbout_c(" -dirty Use dirty read to read from database");
1150  ndbout_c(" -write Use writeTuple in insert and update");
1151  ndbout_c(" -n Use standard table names");
1152  ndbout_c(" -no_table_create Don't create tables in db");
1153  ndbout_c(" -temp Create table(s) without logging");
1154  ndbout_c(" -no_hint Don't give hint on where to execute transaction coordinator");
1155  ndbout_c(" -adaptive Use adaptive send algorithm (default)");
1156  ndbout_c(" -force Force send when communicating");
1157  ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
1158  ndbout_c(" -local Number of part, only use keys in one part out of 16");
1159  ndbout_c(" -ndbrecord");
1160 }
1161