MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
flexBench.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 /* ***************************************************
20 FLEXBENCH
21 Perform benchmark of insert, update and delete transactions
22 
23 Arguments:
24  -t Number of threads to start, default 1
25  -o Number of operations per loop, default 500
26  -l Number of loops to run, default 1, 0=infinite
27  -a Number of attributes, default 25
28  -c Number of tables, default 1
29  -s Size of each attribute, default 1 (Primary Key is always of size 1,
30  independent of this value)
31  -lkn Number of long primary keys, default 1
32  -lks Size of each long primary key, default 1
33  -simple Use simple read to read from database
34  -write Use writeTuple in insert and update
35  -stdtables Use standard table names
36  -no_table_create Don't create tables in db
37  -sleep Sleep a number of seconds before running the test, this
38  can be used so that another flexBench have time to create tables
39  -temp Use tables without logging
40  -verify Verify inserts, updates and deletes
41 #ifdef CEBIT_STAT
42  -statserv host:port statistics server to report to
43  -statfreq ops report every ops operations (default 100)
44 #endif
45  Returns:
46  0 - Test passed
47  1 - Test failed
48  2 - Invalid arguments
49 
50 * *************************************************** */
51 
52 #include <ndb_global.h>
53 #include "NdbApi.hpp"
54 
55 #include <NdbMain.h>
56 #include <NdbOut.hpp>
57 #include <NdbSleep.h>
58 #include <NdbTick.h>
59 #include <NdbTimer.hpp>
60 #include <NdbThread.h>
61 
62 #include <NdbTest.hpp>
63 
64 #define MAXSTRLEN 16
65 #define MAXATTR 128
66 #define MAXTABLES 128
67 #define MAXATTRSIZE 1000
68 #define MAXNOLONGKEY 16 // Max number of long keys.
69 #define MAXLONGKEYTOTALSIZE 1023 // words = 4092 bytes
70 
71 extern "C" { static void* flexBenchThread(void*); }
72 static int readArguments(int argc, const char** argv);
73 static int createTables(Ndb*);
74 static int dropTables(Ndb*);
75 static void sleepBeforeStartingTest(int seconds);
76 static void input_error();
77 
78 enum StartType {
79  stIdle,
80  stInsert,
81  stVerify,
82  stRead,
83  stUpdate,
84  stDelete,
85  stTryDelete,
86  stVerifyDelete,
87  stStop
88 };
89 
90 struct ThreadData
91 {
92  int threadNo;
93  NdbThread* threadLife;
94  int threadReady;
95  StartType threadStart;
96  int threadResult;
97 };
98 
99 static int tNodeId = 0 ;
100 static char tableName[MAXTABLES][MAXSTRLEN+1];
101 static char attrName[MAXATTR][MAXSTRLEN+1];
102 static char** longKeyAttrName;
103 
104 // Program Parameters
105 static int tNoOfLoops = 1;
106 static int tAttributeSize = 1;
107 static unsigned int tNoOfThreads = 1;
108 static unsigned int tNoOfTables = 1;
109 static unsigned int tNoOfAttributes = 25;
110 static unsigned int tNoOfOperations = 500;
111 static unsigned int tSleepTime = 0;
112 static unsigned int tNoOfLongPK = 1;
113 static unsigned int tSizeOfLongPK = 1;
114 
115 //Program Flags
116 static int theSimpleFlag = 0;
117 static int theWriteFlag = 0;
118 static int theStdTableNameFlag = 0;
119 static int theTableCreateFlag = 0;
120 static bool theTempTable = false;
121 static bool VerifyFlag = true;
122 static bool useLongKeys = false;
123 
124 
125 static ErrorData theErrorData; // Part of flexBench-program
126 
127 #define START_TIMER { NdbTimer timer; timer.doStart();
128 #define STOP_TIMER timer.doStop();
129 #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
130 
131 #include <NdbTCP.h>
132 
133 #ifdef CEBIT_STAT
134 #include <NdbMutex.h>
135 static bool statEnable = false;
136 static char statHost[100];
137 static int statFreq = 100;
138 static int statPort = 0;
139 static int statSock = -1;
140 static enum { statError = -1, statClosed, statOpen } statState;
141 static NdbMutex statMutex = NDB_MUTEX_INITIALIZER;
142 #endif
143 
144 //-------------------------------------------------------------------
145 // Statistical Reporting routines
146 //-------------------------------------------------------------------
147 #ifdef CEBIT_STAT
148 // Experimental client-side statistic for CeBIT
149 
150 static void
151 statReport(enum StartType st, int ops)
152 {
153  if (!statEnable)
154  return;
155  if (NdbMutex_Lock(&statMutex) < 0) {
156  if (statState != statError) {
157  ndbout_c("stat: lock mutex failed: %s", strerror(errno));
158  statState = statError;
159  }
160  return;
161  }
162  static int nodeid;
163  // open connection
164  if (statState != statOpen) {
165  char *p = getenv("NDB_NODEID");
166  nodeid = p == 0 ? 0 : atoi(p);
167  if ((statSock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
168  if (statState != statError) {
169  ndbout_c("stat: create socket failed: %s", strerror(socket_errno));
170  statState = statError;
171  }
172  (void)NdbMutex_Unlock(&statMutex);
173  return;
174  }
175  struct sockaddr_in saddr;
176  memset(&saddr, 0, sizeof(saddr));
177  saddr.sin_family = AF_INET;
178  saddr.sin_port = htons(statPort);
179  if (Ndb_getInAddr(&saddr.sin_addr, statHost) < 0) {
180  if (statState != statError) {
181  ndbout_c("stat: host %s not found", statHost);
182  statState = statError;
183  }
184  (void)close(statSock);
185  (void)NdbMutex_Unlock(&statMutex);
186  return;
187  }
188  if (connect(statSock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
189  if (statState != statError) {
190  ndbout_c("stat: connect failed: %s", strerror(socket_errno));
191  statState = statError;
192  }
193  (void)close(statSock);
194  (void)NdbMutex_Unlock(&statMutex);
195  return;
196  }
197  statState = statOpen;
198  ndbout_c("stat: connection to %s:%d opened", statHost, (int)statPort);
199  }
200  const char *text;
201  switch (st) {
202  case stInsert:
203  text = "insert";
204  break;
205  case stVerify:
206  text = "verify";
207  break;
208  case stRead:
209  text = "read";
210  break;
211  case stUpdate:
212  text = "update";
213  break;
214  case stDelete:
215  text = "delete";
216  break;
217  case stVerifyDelete:
218  text = "verifydelete";
219  break;
220  default:
221  text = "unknown";
222  break;
223  }
224  char buf[100];
225  sprintf(buf, "%d %s %d\n", nodeid, text, ops);
226  int len = strlen(buf);
227  // assume SIGPIPE already ignored
228  if (send(statSock, buf, len, 0) != len) {
229  if (statState != statError) {
230  ndbout_c("stat: write failed: %s", strerror(socket_errno));
231  statState = statError;
232  }
233  (void)close(statSock);
234  (void)NdbMutex_Unlock(&statMutex);
235  return;
236  }
237  (void)NdbMutex_Unlock(&statMutex);
238 }
239 #endif // CEBIT_STAT
240 
241 static void
242 resetThreads(ThreadData* pt){
243  for (unsigned int i = 0; i < tNoOfThreads; i++){
244  pt[i].threadReady = 0;
245  pt[i].threadResult = 0;
246  pt[i].threadStart = stIdle;
247  }
248 }
249 
250 static int
251 checkThreadResults(ThreadData* pt){
252  for (unsigned int i = 0; i < tNoOfThreads; i++){
253  if(pt[i].threadResult != 0){
254  ndbout_c("Thread%d reported fatal error %d", i, pt[i].threadResult);
255  return -1;
256  }
257  }
258  return 0;
259 }
260 
261 static
262 void
263 waitForThreads(ThreadData* pt)
264 {
265  int cont = 1;
266  while (cont){
267  NdbSleep_MilliSleep(100);
268  cont = 0;
269  for (unsigned int i = 0; i < tNoOfThreads; i++){
270  if (pt[i].threadReady == 0)
271  cont = 1;
272  }
273  }
274 }
275 
276 static void
277 tellThreads(ThreadData* pt, StartType what)
278 {
279  for (unsigned int i = 0; i < tNoOfThreads; i++)
280  pt[i].threadStart = what;
281 }
282 
283 static Ndb_cluster_connection *g_cluster_connection= 0;
284 
285 NDB_COMMAND(flexBench, "flexBench", "flexBench", "flexbench", 65535)
286 {
287  ndb_init();
288  ThreadData* pThreadsData;
289  int tLoops = 0;
290  int returnValue = NDBT_OK;
291 
292  if (readArguments(argc, argv) != 0){
293  input_error();
294  return NDBT_ProgramExit(NDBT_WRONGARGS);
295  }
296 
297  if(useLongKeys){
298  longKeyAttrName = (char **) malloc(sizeof(char*) * tNoOfLongPK);
299  for (Uint32 i = 0; i < tNoOfLongPK; i++) {
300  longKeyAttrName[i] = (char *) malloc(strlen("KEYATTR ") + 1);
301  memset(longKeyAttrName[i], 0, strlen("KEYATTR ") + 1);
302  sprintf(longKeyAttrName[i], "KEYATTR%i", i);
303  }
304  }
305 
306  pThreadsData = new ThreadData[tNoOfThreads];
307 
308  ndbout << endl << "FLEXBENCH - Starting normal mode" << endl;
309  ndbout << "Perform benchmark of insert, update and delete transactions"<< endl;
310  ndbout << " " << tNoOfThreads << " thread(s) " << endl;
311  ndbout << " " << tNoOfLoops << " iterations " << endl;
312  ndbout << " " << tNoOfTables << " table(s) and " << 1 << " operation(s) per transaction " <<endl;
313  ndbout << " " << tNoOfAttributes << " attributes per table " << endl;
314  ndbout << " " << tNoOfOperations << " transaction(s) per thread and round " << endl;
315  ndbout << " " << tAttributeSize << " is the number of 32 bit words per attribute "<< endl;
316  ndbout << " " << "Table(s) without logging: " << (Uint32)theTempTable << endl;
317 
318  if(useLongKeys)
319  ndbout << " " << "Using long keys with " << tNoOfLongPK << " keys a' " <<
320  tSizeOfLongPK * 4 << " bytes each." << endl;
321 
322  ndbout << " " << "Verification is " ;
323  if(VerifyFlag) {
324  ndbout << "enabled" << endl ;
325  }else{
326  ndbout << "disabled" << endl ;
327  }
328  theErrorData.printSettings(ndbout);
329 
330  NdbThread_SetConcurrencyLevel(tNoOfThreads + 2);
331 
333  if(con.connect(12, 5, 1) != 0)
334  {
335  return NDBT_ProgramExit(NDBT_FAILED);
336  }
337 
338  g_cluster_connection= &con;
339 
340  Ndb* pNdb;
341  pNdb = new Ndb(&con, "TEST_DB" );
342  pNdb->init();
343 
344  tNodeId = pNdb->getNodeId();
345  ndbout << " NdbAPI node with id = " << tNodeId << endl;
346  ndbout << endl;
347 
348  ndbout << "Waiting for ndb to become ready..." <<endl;
349  if (pNdb->waitUntilReady(2000) != 0){
350  ndbout << "NDB is not ready" << endl;
351  ndbout << "Benchmark failed!" << endl;
352  returnValue = NDBT_FAILED;
353  }
354 
355  if(returnValue == NDBT_OK){
356  if (createTables(pNdb) != 0){
357  returnValue = NDBT_FAILED;
358  }
359  }
360 
361  if(returnValue == NDBT_OK){
362 
363  sleepBeforeStartingTest(tSleepTime);
364 
365  /****************************************************************
366  * Create threads. *
367  ****************************************************************/
368  resetThreads(pThreadsData);
369 
370  for (int i = 0; i < (int)tNoOfThreads; i++){
371  pThreadsData[i].threadNo = i;
372  pThreadsData[i].threadLife = NdbThread_Create(flexBenchThread,
373  (void**)&pThreadsData[i],
374  32768,
375  "flexBenchThread",
376  NDB_THREAD_PRIO_LOW);
377  }
378 
379  waitForThreads(pThreadsData);
380 
381  ndbout << endl << "All threads started" << endl << endl;
382 
383  /****************************************************************
384  * Execute program. *
385  ****************************************************************/
386 
387  for(;;){
388 
389  int loopCount = tLoops + 1;
390  ndbout << endl << "Loop # " << loopCount << endl << endl;
391 
392  /****************************************************************
393  * Perform inserts. *
394  ****************************************************************/
395  // Reset and start timer
396  START_TIMER;
397  // Give insert-command to all threads
398  resetThreads(pThreadsData);
399  tellThreads(pThreadsData, stInsert);
400  waitForThreads(pThreadsData);
401  if (checkThreadResults(pThreadsData) != 0){
402  ndbout << "Error: Threads failed in performing insert" << endl;
403  returnValue = NDBT_FAILED;
404  break;
405  }
406  // stop timer and print results.
407  STOP_TIMER;
408  PRINT_TIMER("insert", tNoOfOperations*tNoOfThreads, tNoOfTables);
409  /****************************************************************
410  * Verify inserts. *
411  ****************************************************************/
412  if (VerifyFlag) {
413  resetThreads(pThreadsData);
414  ndbout << "Verifying inserts...\t" ;
415  tellThreads(pThreadsData, stVerify);
416  waitForThreads(pThreadsData);
417  if (checkThreadResults(pThreadsData) != 0){
418  ndbout << "Error: Threads failed while verifying inserts" << endl;
419  returnValue = NDBT_FAILED;
420  break;
421  }else{
422  ndbout << "\t\tOK" << endl << endl ;
423  }
424  }
425 
426  /****************************************************************
427  * Perform read. *
428  ****************************************************************/
429  // Reset and start timer
430  START_TIMER;
431  // Give read-command to all threads
432  resetThreads(pThreadsData);
433  tellThreads(pThreadsData, stRead);
434  waitForThreads(pThreadsData);
435  if (checkThreadResults(pThreadsData) != 0){
436  ndbout << "Error: Threads failed in performing read" << endl;
437  returnValue = NDBT_FAILED;
438  break;
439  }
440  // stop timer and print results.
441  STOP_TIMER;
442  PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables);
443 
444  /****************************************************************
445  * Perform update. *
446  ****************************************************************/
447  // Reset and start timer
448  START_TIMER;
449  // Give insert-command to all threads
450  resetThreads(pThreadsData);
451  tellThreads(pThreadsData, stUpdate);
452  waitForThreads(pThreadsData);
453  if (checkThreadResults(pThreadsData) != 0){
454  ndbout << "Error: Threads failed in performing update" << endl;
455  returnValue = NDBT_FAILED;
456  break;
457  }
458  // stop timer and print results.
459  STOP_TIMER;
460  PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables);
461 
462  /****************************************************************
463  * Verify updates. *
464  ****************************************************************/
465  if (VerifyFlag) {
466  resetThreads(pThreadsData);
467  ndbout << "Verifying updates...\t" ;
468  tellThreads(pThreadsData, stVerify);
469  waitForThreads(pThreadsData);
470  if (checkThreadResults(pThreadsData) != 0){
471  ndbout << "Error: Threads failed while verifying updates" << endl;
472  returnValue = NDBT_FAILED;
473  break;
474  }else{
475  ndbout << "\t\tOK" << endl << endl ;
476  }
477  }
478 
479  /****************************************************************
480  * Perform read. *
481  ****************************************************************/
482  // Reset and start timer
483  START_TIMER;
484  // Give insert-command to all threads
485  resetThreads(pThreadsData);
486  tellThreads(pThreadsData, stRead);
487  waitForThreads(pThreadsData);
488  if (checkThreadResults(pThreadsData) != 0){
489  ndbout << "Error: Threads failed in performing read" << endl;
490  returnValue = NDBT_FAILED;
491  break;
492  }
493  // stop timer and print results.
494  STOP_TIMER;
495  PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables);
496 
497  /****************************************************************
498  * Perform delete. *
499  ****************************************************************/
500  // Reset and start timer
501  START_TIMER;
502  // Give insert-command to all threads
503  resetThreads(pThreadsData);
504  tellThreads(pThreadsData, stDelete);
505  waitForThreads(pThreadsData);
506  if (checkThreadResults(pThreadsData) != 0){
507  ndbout << "Error: Threads failed in performing delete" << endl;
508  returnValue = NDBT_FAILED;
509  break;
510  }
511  // stop timer and print results.
512  STOP_TIMER;
513  PRINT_TIMER("delete", tNoOfOperations*tNoOfThreads, tNoOfTables);
514 
515  /****************************************************************
516  * Verify deletes. *
517  ****************************************************************/
518  if (VerifyFlag) {
519  resetThreads(pThreadsData);
520  ndbout << "Verifying tuple deletion..." ;
521  tellThreads(pThreadsData, stVerifyDelete);
522  waitForThreads(pThreadsData);
523  if (checkThreadResults(pThreadsData) != 0){
524  ndbout << "Error: Threads failed in verifying deletes" << endl;
525  returnValue = NDBT_FAILED;
526  break;
527  }else{
528  ndbout << "\t\tOK" << endl << endl ;
529  }
530  }
531 
532  ndbout << "--------------------------------------------------" << endl;
533 
534  tLoops++;
535 
536  if ( 0 != tNoOfLoops && tNoOfLoops <= tLoops )
537  break;
538  theErrorData.printErrorCounters();
539  }
540 
541  resetThreads(pThreadsData);
542  tellThreads(pThreadsData, stStop);
543  waitForThreads(pThreadsData);
544 
545  void * tmp;
546  for(int i = 0; i<(int)tNoOfThreads; i++){
547  NdbThread_WaitFor(pThreadsData[i].threadLife, &tmp);
548  NdbThread_Destroy(&pThreadsData[i].threadLife);
549  }
550  }
551 
552  if (useLongKeys == true) {
553  // Only free these areas if they have been allocated
554  // Otherwise cores will happen
555  for (int i = 0; i < (int)tNoOfLongPK; i++)
556  free(longKeyAttrName[i]);
557  free(longKeyAttrName);
558  } // if
559 
560  dropTables(pNdb);
561 
562  delete [] pThreadsData;
563  delete pNdb;
564  theErrorData.printErrorCounters();
565  return NDBT_ProgramExit(returnValue);
566 }
568 
569 
570 unsigned long get_hash(unsigned long * hash_key, int len)
571 {
572  unsigned long hash_value = 147;
573  unsigned h_key;
574  int i;
575  for (i = 0; i < len; i++)
576  {
577  h_key = hash_key[i];
578  hash_value = (hash_value << 5) + hash_value + (h_key & 255);
579  hash_value = (hash_value << 5) + hash_value + ((h_key >> 8) & 255);
580  hash_value = (hash_value << 5) + hash_value + ((h_key >> 16) & 255);
581  hash_value = (hash_value << 5) + hash_value + ((h_key >> 24) & 255);
582  }
583  return hash_value;
584 }
585 
586 // End of warming up phase
587 
588 
589 
590 static void* flexBenchThread(void* pArg)
591 {
592  ThreadData* pThreadData = (ThreadData*)pArg;
593  unsigned int threadNo, threadBase;
594  Ndb* pNdb = NULL ;
595  NdbConnection *pTrans = NULL ;
596  const NdbOperation**
597  pOps = NULL ;
598  StartType tType ;
599  StartType tSaveType ;
600  int* attrValue = NULL ;
601  int* attrRefValue = NULL ;
602  int check = 0 ;
603  int loopCountOps, loopCountTables, loopCountAttributes;
604  int tAttemptNo = 0;
605  int tRetryAttempts = 20;
606  int tResult = 0;
607  int tSpecialTrans = 0;
608  int nRefLocalOpOffset = 0 ;
609  int nReadBuffSize =
610  tNoOfTables * tNoOfAttributes * sizeof(int) * tAttributeSize ;
611  int nRefBuffSize =
612  tNoOfOperations * tNoOfAttributes * sizeof(int) * tAttributeSize ;
613  unsigned** longKeyAttrValue;
614  NdbRecord** pRec= NULL;
615  unsigned char** pAttrSet= NULL;
616  int nRefOpOffset= 0;
617  NdbDictionary::Dictionary *dict= NULL;
618  NdbDictionary::RecordSpecification recSpec[MAXATTR+MAXNOLONGKEY];
619 
620  threadNo = pThreadData->threadNo ;
621 
622  /* Additional space in rows for long primary keys. */
623  if (useLongKeys)
624  nReadBuffSize+= tNoOfTables*sizeof(unsigned)*tSizeOfLongPK*tNoOfLongPK;
625 
626  attrValue = (int*)malloc(nReadBuffSize) ;
627  attrRefValue = (int*)malloc(nRefBuffSize) ;
628  pOps = (const NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ;
629  pNdb = new Ndb(g_cluster_connection, "TEST_DB" );
630  pRec= (NdbRecord **)calloc(tNoOfTables*3, sizeof(*pRec));
631  pAttrSet= (unsigned char **)calloc(tNoOfTables, sizeof(*pAttrSet));
632 
633  if (!attrValue || !attrRefValue || !pOps || !pNdb || !pRec || !pAttrSet)
634  {
635  // Check allocations to make sure we got all the memory we asked for
636  ndbout << "One or more memory allocations failed when starting thread #";
637  ndbout << threadNo << endl ;
638  ndbout << "Thread #" << threadNo << " will now exit" << endl ;
639  tResult = 13 ;
640  goto end;
641  }
642 
643  pNdb->init();
644  pNdb->waitUntilReady();
645 
646  // To make sure that two different threads doesn't operate on the same record
647  // Calculate an "unique" number to use as primary key
648  threadBase = (threadNo * 2000000) + (tNodeId * 260000000);
649 
650  /* Set up NdbRecord's for the tables. */
651  dict= pNdb->getDictionary();
652  for (int tab= 0; tab<(int)tNoOfTables; tab++)
653  {
654  const NdbDictionary::Table *table= dict->getTable(tableName[tab]);
655  int numPKs= (useLongKeys ? tNoOfLongPK : 1);
656 
657  /* First create NdbRecord for just the primary key(s). */
658  if (!useLongKeys)
659  {
660  recSpec[0].column= table->getColumn(0);;
661  recSpec[0].offset= 0;
662  pRec[tab]= dict->createRecord(table,
663  recSpec,
664  1,
665  sizeof(recSpec[0]));
666  }
667  else
668  {
669  for (Uint32 i= 0; i<tNoOfLongPK; i++)
670  {
671  recSpec[i].column= table->getColumn(longKeyAttrName[i]);
672  recSpec[i].offset= sizeof(unsigned)*tSizeOfLongPK*i;
673  }
674  pRec[tab]= dict->createRecord(table,
675  recSpec,
676  tNoOfLongPK,
677  sizeof(recSpec[0]));
678  }
679 
680  /* Next NdbRecord for just the non-pk attributes. */
681  Uint32 count= 0;
682  for (Uint32 i= 1; i<tNoOfAttributes; i++)
683  {
684  recSpec[count].column= table->getColumn(i+numPKs-1);
685  recSpec[count].offset= sizeof(int)*tAttributeSize*i;
686  count++;
687  }
688  pRec[tab+tNoOfTables]= dict->createRecord(table,
689  recSpec,
690  count,
691  sizeof(recSpec[0]));
692 
693  /* And finally NdbRecord for all attributes (for insert). */
694  /* Also test here specifying NdbRecord columns out-of-order. */
695  count= 0;
696  for (Uint32 i= (useLongKeys?1:0); i<tNoOfAttributes; i++)
697  {
698  recSpec[count].column= table->getColumn(i-1+numPKs);
699  recSpec[count].offset= sizeof(int)*tAttributeSize*i;
700  count++;
701  }
702  if (useLongKeys)
703  {
704  for (Uint32 i= 0; i<tNoOfLongPK; i++)
705  {
706  recSpec[count].column= table->getColumn(longKeyAttrName[i]);
707  recSpec[count].offset= sizeof(int)*tAttributeSize*tNoOfAttributes +
708  sizeof(unsigned)*tSizeOfLongPK*i;
709  count++;
710  }
711  }
712  pRec[tab+2*tNoOfTables]= dict->createRecord(table,
713  recSpec,
714  count,
715  sizeof(recSpec[0]));
716 
717  if (pRec[tab]==NULL ||
718  pRec[tab+tNoOfTables]==NULL ||
719  pRec[tab+2*tNoOfTables]==NULL) {
720  // This is a fatal error, abort program
721  ndbout << "Failed to allocate NdbRecord in thread" << threadNo;
722  ndbout << endl;
723  tResult = 13;
724  goto end;
725  }
726 
727  /* Attribute set for reading just one attribute, when verifying delete. */
728  pAttrSet[tab]=
729  (unsigned char *)calloc(tNoOfAttributes-1+numPKs, sizeof(char));
730  if (pAttrSet[tab]==NULL) {
731  // This is a fatal error, abort program
732  ndbout << "Failed to allocate NdbRecAttrSet in thread" << threadNo;
733  ndbout << endl;
734  tResult = 13;
735  goto end;
736  }
737  pAttrSet[tab][0]|= 1; // Set bit for attrId 0
738  }
739 
740  if(useLongKeys){
741  // Allocate and populate the longkey array.
742  longKeyAttrValue= (unsigned **) calloc(tNoOfOperations, sizeof(unsigned*));
743  if (longKeyAttrValue==NULL) {
744  ndbout << "Memory allocation failed for longKeyAttrValue in thread"
745  << threadNo;
746  ndbout << endl;
747  tResult = 13;
748  goto end;
749  }
750  Uint32 n;
751  for (n = 0; n < tNoOfOperations; n++)
752  {
753  longKeyAttrValue[n]=
754  (unsigned *) malloc(sizeof(unsigned) * tSizeOfLongPK * tNoOfLongPK );
755  if (longKeyAttrValue[n]==NULL) {
756  ndbout << "Memory allocation failed for longKeyAttrValue in thread"
757  << threadNo;
758  ndbout << endl;
759  tResult = 13;
760  goto end;
761  }
762 
763  for (Uint32 i = 0; i < tNoOfLongPK ; i++) {
764  for(Uint32 j = 0; j < tSizeOfLongPK; j++) {
765  // Repeat the unique value to fill up the long key.
766  longKeyAttrValue[n][i*tSizeOfLongPK+j]= threadBase + n;
767  }
768  }
769  }
770  }
771 
772  nRefOpOffset = 0 ;
773  //Assign reference attribute values to memory
774  for(Uint32 ops = 1 ; ops < tNoOfOperations ; ops++){
775  // Calculate offset value before going into the next loop
776  nRefOpOffset = tAttributeSize*tNoOfAttributes*(ops-1) ;
777  for(Uint32 a = 0 ; a < tNoOfAttributes ; a++)
778  for(Uint32 b= 0; b<(Uint32)tAttributeSize; b++)
779  attrRefValue[nRefOpOffset + tAttributeSize*a + b] =
780  (int)(threadBase + ops + a) ;
781  }
782 
783 #ifdef CEBIT_STAT
784  // ops not yet reported
785  int statOps = 0;
786 #endif
787  for (;;) {
788  pThreadData->threadResult = tResult; // Report error to main thread,
789  // normally tResult is set to 0
790  pThreadData->threadReady = 1;
791 
792  while (pThreadData->threadStart == stIdle){
793  NdbSleep_MilliSleep(100);
794  }//while
795 
796  // Check if signal to exit is received
797  if (pThreadData->threadStart == stStop){
798  pThreadData->threadReady = 1;
799  // ndbout_c("Thread%d is stopping", threadNo);
800  // In order to stop this thread, the main thread has signaled
801  // stStop, break out of the for loop so that destructors
802  // and the proper exit functions are called
803  break;
804  }//if
805 
806  tType = pThreadData->threadStart;
807  tSaveType = tType;
808  pThreadData->threadStart = stIdle;
809 
810  // Start transaction, type of transaction
811  // is received in the array ThreadStart
812  loopCountOps = tNoOfOperations;
813  loopCountTables = tNoOfTables;
814  loopCountAttributes = tNoOfAttributes;
815 
816  /* Hm, I wonder why we do one operation less that tNoOfAttributes here? */
817  for (int count = 1; count < loopCountOps && tResult == 0;){
818 
819  pTrans = pNdb->startTransaction();
820  if (pTrans == NULL) {
821  // This is a fatal error, abort program
822  ndbout << "Could not start transaction in thread" << threadNo;
823  ndbout << endl;
824  ndbout << pNdb->getNdbError() << endl;
825  tResult = 1; // Indicate fatal error
826  break; // Break out of for loop
827  }
828 
829  // Calculate the current operation offset in the reference array
830  nRefLocalOpOffset = tAttributeSize*tNoOfAttributes*(count - 1) ;
831 
832  for (int countTables = 0;
833  countTables < loopCountTables && tResult == 0;
834  countTables++) {
835  int nTableOffset= tAttributeSize*tNoOfAttributes*countTables;
836  int *pRow= &attrValue[nTableOffset];
837  char *pRowAttr= (char *)(&attrRefValue[nRefLocalOpOffset]);
838  char *pRowPK= (useLongKeys ?
839  (char *)longKeyAttrValue[count-1] :
840  (char *)(&attrRefValue[nRefLocalOpOffset]));
841 
842  /* For insert, we need a single row with both pk and non-pk attrs. */
843  if (tType==stInsert && theWriteFlag!=1)
844  {
845  /* Copy the non-PK columns to send to the server. */
846  if (tNoOfAttributes>1)
847  memcpy(&pRow[tAttributeSize],
848  &attrRefValue[nRefLocalOpOffset+tAttributeSize],
849  (tNoOfAttributes-1)*tAttributeSize*sizeof(int));
850  /* Copy the primary key(s). */
851  if (useLongKeys)
852  {
853  memcpy(pRow+tAttributeSize*tNoOfAttributes,
854  longKeyAttrValue[count-1],
855  tNoOfLongPK*tSizeOfLongPK*sizeof(unsigned));
856  }
857  else
858  {
859  pRow[0]= attrRefValue[nRefLocalOpOffset];
860  }
861  }
862 
863  const NdbRecord *pk_record= pRec[countTables];
864  const NdbRecord *attr_record= pRec[countTables+tNoOfTables];
865  const NdbRecord *all_record= pRec[countTables+2*tNoOfTables];
866 
867  switch (tType) {
868  case stInsert: // Insert case
869  if (theWriteFlag == 1)
870  pOps[countTables]= pTrans->writeTuple(pk_record, pRowPK,
871  attr_record, pRowAttr);
872  else
873  pOps[countTables]= pTrans->insertTuple(all_record, (char *)pRow);
874  break;
875  case stRead: // Read Case
876  if (theSimpleFlag == 1)
877  /* Apparently simpleRead is identical to normal read currently. */
878  pOps[countTables]= pTrans->readTuple(pk_record, pRowPK,
879  attr_record, (char *)pRow,
881  else
882  pOps[countTables]= pTrans->readTuple(pk_record, pRowPK,
883  attr_record, (char *)pRow);
884  break;
885  case stUpdate: // Update Case
886  if (theWriteFlag == 1)
887  pOps[countTables]= pTrans->writeTuple(pk_record, pRowPK,
888  attr_record, pRowAttr);
889  else
890  pOps[countTables]= pTrans->updateTuple(pk_record, pRowPK,
891  attr_record, pRowAttr);
892  break;
893  case stDelete: // Delete Case
894  pOps[countTables]= pTrans->deleteTuple(pk_record, pRowPK,
895  attr_record);
896  break;
897  case stVerify:
898  pOps[countTables]= pTrans->readTuple(pk_record, pRowPK,
899  attr_record, (char *)pRow);
900  break;
901  case stVerifyDelete:
902  pOps[countTables]= pTrans->readTuple(pk_record, pRowPK,
903  pk_record, (char *)pRow,
905  pAttrSet[countTables]);
906  break;
907  default:
908  assert(false);
909  }//switch
910 
911  if (pOps[countTables] == NULL) {
912  // This is a fatal error, abort program
913  ndbout << "getNdbOperation: " << pTrans->getNdbError();
914  tResult = 2; // Indicate fatal error
915  break;
916  }//if
917 
918  }//for Tables loop
919 
920  if (tResult != 0)
921  break;
922  check = pTrans->execute(Commit);
923 
924  // Decide what kind of error this is
925  if ((tSpecialTrans == 1) &&
926  (check == -1)) {
927  // --------------------------------------------------------------------
928  // A special transaction have been executed, change to check = 0 in
929  // certain situations.
930  // --------------------------------------------------------------------
931  switch (tType) {
932  case stInsert: // Insert case
933  if (630 == pTrans->getNdbError().code ) {
934  check = 0;
935  ndbout << "Insert with 4007 was successful" << endl;
936  }//if
937  break;
938  case stDelete: // Delete Case
939  if (626 == pTrans->getNdbError().code ) {
940  check = 0;
941  ndbout << "Delete with 4007 was successful" << endl;
942  }//if
943  break;
944  default:
945  assert(false);
946  }//switch
947  }//if
948  tSpecialTrans = 0;
949  if (check == -1) {
950  if ((stVerifyDelete == tType) &&
951  (626 == pTrans->getNdbError().code)) {
952  // ----------------------------------------------
953  // It's good news - the deleted tuple is gone,
954  // so reset "check" flag
955  // ----------------------------------------------
956  check = 0 ;
957  } else {
958  int retCode =
959  theErrorData.handleErrorCommon(pTrans->getNdbError());
960  if (retCode == 1) {
961  ndbout_c("execute: %d, %d, %s", count, tType,
962  pTrans->getNdbError().message );
963  ndbout_c("Error code = %d", pTrans->getNdbError().code );
964  tResult = 20;
965  } else if (retCode == 2) {
966  ndbout << "4115 should not happen in flexBench" << endl;
967  tResult = 20;
968  } else if (retCode == 3) {
969  // --------------------------------------------------------------------
970  // We are not certain if the transaction was successful or not.
971  // We must reexecute but might very well find that the transaction
972  // actually was updated. Updates and Reads are no problem here. Inserts
973  // will not cause a problem if error code 630 arrives. Deletes will
974  // not cause a problem if 626 arrives.
975  // --------------------------------------------------------------------
976  if ((tType == stInsert) || (tType == stDelete)) {
977  tSpecialTrans = 1;
978  }//if
979  }//if
980  }//if
981  }//if
982  // Check if retries should be made
983  if (check == -1 && tResult == 0) {
984  if (tAttemptNo < tRetryAttempts){
985  tAttemptNo++;
986  } else {
987  // --------------------------------------------------------------------
988  // Too many retries have been made, report error and break out of loop
989  // --------------------------------------------------------------------
990  ndbout << "Thread" << threadNo;
991  ndbout << ": too many errors reported" << endl;
992  tResult = 10;
993  break;
994  }//if
995  }//if
996 
997  if (check == 0){
998  // Go to the next record
999  count++;
1000  tAttemptNo = 0;
1001 #ifdef CEBIT_STAT
1002  // report successful ops
1003  if (statEnable) {
1004  statOps += loopCountTables;
1005  if (statOps >= statFreq) {
1006  statReport(tType, statOps);
1007  statOps = 0;
1008  }//if
1009  }//if
1010 #endif
1011  }//if
1012 
1013  if (stVerify == tType && 0 == check){
1014  int nTableOffset = 0 ;
1015  for (int a = 1 ; a < loopCountAttributes ; a++){
1016  for (int tables = 0 ; tables < loopCountTables ; tables++){
1017  nTableOffset = tables*loopCountAttributes*tAttributeSize ;
1018  if (*(int*)&attrValue[nTableOffset + tAttributeSize*a] != *(int*)&attrRefValue[nRefLocalOpOffset + tAttributeSize*a]){
1019  ndbout << "Error in verify:" << endl ;
1020  ndbout << "attrValue[" << nTableOffset + tAttributeSize*a << "] = " << attrValue[a] << endl ;
1021  ndbout << "attrRefValue[" << nRefLocalOpOffset + tAttributeSize*a << "]" << attrRefValue[nRefLocalOpOffset + tAttributeSize*a] << endl ;
1022  tResult = 11 ;
1023  break ;
1024  }//if
1025  }//for
1026  }//for
1027  }// if(stVerify ... )
1028  pNdb->closeTransaction(pTrans) ;
1029  }// operations loop
1030 #ifdef CEBIT_STAT
1031  // report remaining successful ops
1032  if (statEnable) {
1033  if (statOps > 0) {
1034  statReport(tType, statOps);
1035  statOps = 0;
1036  }//if
1037  }//if
1038 #endif
1039  }
1040 
1041  end:
1042  if(pAttrSet)
1043  {
1044  for (Uint32 i= 0; i<tNoOfTables; i++)
1045  if (pAttrSet[i])
1046  free(pAttrSet[i]);
1047  free(pAttrSet);
1048  }
1049  if(pRec)
1050  {
1051  for (Uint32 i= 0; i<tNoOfTables*3; i++)
1052  if (pRec[i])
1053  dict->releaseRecord(pRec[i]);
1054  free(pRec);
1055  }
1056  delete pNdb;
1057  if(attrValue)
1058  free(attrValue);
1059  if(attrRefValue)
1060  free(attrRefValue);
1061  if(pOps)
1062  free(pOps);
1063 
1064  if (useLongKeys == true) {
1065  // Only free these areas if they have been allocated
1066  // Otherwise cores will occur
1067  for (Uint32 n = 0; n < tNoOfOperations; n++)
1068  if (longKeyAttrValue[n])
1069  free(longKeyAttrValue[n]);
1070  free(longKeyAttrValue);
1071  } // if
1072 
1073  return NULL; // Thread exits
1074 }
1075 
1076 
1077 static int readArguments(int argc, const char** argv)
1078 {
1079 
1080  int i = 1;
1081  while (argc > 1){
1082  if (strcmp(argv[i], "-t") == 0){
1083  tNoOfThreads = atoi(argv[i+1]);
1084  if ((tNoOfThreads < 1))
1085  return -1;
1086  argc -= 1;
1087  i++;
1088  }else if (strcmp(argv[i], "-o") == 0){
1089  tNoOfOperations = atoi(argv[i+1]);
1090  if (tNoOfOperations < 1)
1091  return -1;;
1092  argc -= 1;
1093  i++;
1094  }else if (strcmp(argv[i], "-a") == 0){
1095  tNoOfAttributes = atoi(argv[i+1]);
1096  if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR))
1097  return -1;
1098  argc -= 1;
1099  i++;
1100  }else if (strcmp(argv[i], "-lkn") == 0){
1101  tNoOfLongPK = atoi(argv[i+1]);
1102  useLongKeys = true;
1103  if ((tNoOfLongPK < 1) || (tNoOfLongPK > MAXNOLONGKEY) ||
1104  (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
1105  ndbout << "Argument -lkn is not in the proper range." << endl;
1106  return -1;
1107  }
1108  argc -= 1;
1109  i++;
1110  }else if (strcmp(argv[i], "-lks") == 0){
1111  tSizeOfLongPK = atoi(argv[i+1]);
1112  useLongKeys = true;
1113  if ((tSizeOfLongPK < 1) || (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
1114  ndbout << "Argument -lks is not in the proper range 1 to " <<
1115  MAXLONGKEYTOTALSIZE << endl;
1116  return -1;
1117  }
1118  argc -= 1;
1119  i++;
1120  }else if (strcmp(argv[i], "-c") == 0){
1121  tNoOfTables = atoi(argv[i+1]);
1122  if ((tNoOfTables < 1) || (tNoOfTables > MAXTABLES))
1123  return -1;
1124  argc -= 1;
1125  i++;
1126  }else if (strcmp(argv[i], "-stdtables") == 0){
1127  theStdTableNameFlag = 1;
1128  }else if (strcmp(argv[i], "-l") == 0){
1129  tNoOfLoops = atoi(argv[i+1]);
1130  if ((tNoOfLoops < 0) || (tNoOfLoops > 100000))
1131  return -1;
1132  argc -= 1;
1133  i++;
1134  }else if (strcmp(argv[i], "-s") == 0){
1135  tAttributeSize = atoi(argv[i+1]);
1136  if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE))
1137  return -1;
1138  argc -= 1;
1139  i++;
1140  }else if (strcmp(argv[i], "-sleep") == 0){
1141  tSleepTime = atoi(argv[i+1]);
1142  if ((tSleepTime < 1) || (tSleepTime > 3600))
1143  return -1;
1144  argc -= 1;
1145  i++;
1146  }else if (strcmp(argv[i], "-simple") == 0){
1147  theSimpleFlag = 1;
1148  }else if (strcmp(argv[i], "-write") == 0){
1149  theWriteFlag = 1;
1150  }else if (strcmp(argv[i], "-no_table_create") == 0){
1151  theTableCreateFlag = 1;
1152  }else if (strcmp(argv[i], "-temp") == 0){
1153  theTempTable = true;
1154  }else if (strcmp(argv[i], "-noverify") == 0){
1155  VerifyFlag = false ;
1156  }else if (theErrorData.parseCmdLineArg(argv, i) == true){
1157  ; //empty, updated in errorArg(..)
1158  }else if (strcmp(argv[i], "-verify") == 0){
1159  VerifyFlag = true ;
1160 #ifdef CEBIT_STAT
1161  }else if (strcmp(argv[i], "-statserv") == 0){
1162  if (! (argc > 2))
1163  return -1;
1164  const char *p = argv[i+1];
1165  const char *q = strrchr(p, ':');
1166  if (q == 0)
1167  return -1;
1168  BaseString::snprintf(statHost, sizeof(statHost), "%.*s", q-p, p);
1169  statPort = atoi(q+1);
1170  statEnable = true;
1171  argc -= 1;
1172  i++;
1173  }else if (strcmp(argv[i], "-statfreq") == 0){
1174  if (! (argc > 2))
1175  return -1;
1176  statFreq = atoi(argv[i+1]);
1177  if (statFreq < 1)
1178  return -1;
1179  argc -= 1;
1180  i++;
1181 #endif
1182  }else{
1183  return -1;
1184  }
1185  argc -= 1;
1186  i++;
1187  }
1188  return 0;
1189 }
1190 
1191 static void sleepBeforeStartingTest(int seconds){
1192  if (seconds > 0){
1193  ndbout << "Sleeping(" <<seconds << ")...";
1194  NdbSleep_SecSleep(seconds);
1195  ndbout << " done!" << endl;
1196  }
1197 }
1198 
1199 
1200 static int
1201 createTables(Ndb* pMyNdb){
1202  for (Uint32 i = 0; i < tNoOfAttributes; i++){
1203  BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%u", i);
1204  }
1205 
1206  // Note! Uses only uppercase letters in table name's
1207  // so that we can look at the tables with SQL
1208  for (Uint32 i = 0; i < tNoOfTables; i++){
1209  if (theStdTableNameFlag == 0){
1210  BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i,
1211  (int)(NdbTick_CurrentMillisecond() / 1000));
1212  } else {
1213  BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
1214  }
1215  }
1216 
1217  for(Uint32 i = 0; i < tNoOfTables; i++){
1218  ndbout << "Creating " << tableName[i] << "... ";
1219 
1220  NdbDictionary::Table tmpTable(tableName[i]);
1221 
1222  tmpTable.setStoredTable(!theTempTable);
1223 
1224  if(useLongKeys){
1225  for(Uint32 i = 0; i < tNoOfLongPK; i++) {
1226  NdbDictionary::Column col(longKeyAttrName[i]);
1227  col.setType(NdbDictionary::Column::Unsigned);
1228  col.setLength(tSizeOfLongPK);
1229  col.setPrimaryKey(true);
1230  tmpTable.addColumn(col);
1231  }
1232  } else {
1233  NdbDictionary::Column col(attrName[0]);
1234  col.setType(NdbDictionary::Column::Unsigned);
1235  col.setLength(1);
1236  col.setPrimaryKey(true);
1237  tmpTable.addColumn(col);
1238  }
1239 
1240 
1243  col.setLength(tAttributeSize);
1244  for (unsigned j = 1; j < tNoOfAttributes; j++){
1245  col.setName(attrName[j]);
1246  tmpTable.addColumn(col);
1247  }
1248 
1249  if(pMyNdb->getDictionary()->createTable(tmpTable) == -1){
1250  return -1;
1251  }
1252  ndbout << "done" << endl;
1253  }
1254 
1255  return 0;
1256 }
1257 
1258 static int
1259 dropTables(Ndb* pMyNdb){
1260  unsigned int i;
1261 
1262  // Note! Uses only uppercase letters in table name's
1263  // so that we can look at the tables with SQL
1264  for(i = 0; i < tNoOfTables; i++){
1265  ndbout << "Dropping " << tableName[i] << "... ";
1266  pMyNdb->getDictionary()->dropTable(tableName[i]);
1267  ndbout << "done" << endl;
1268  }
1269 
1270  return 0;
1271 }
1272 
1273 
1274 static void input_error(){
1275  ndbout << endl << "Invalid argument!" << endl;
1276  ndbout << endl << "Arguments:" << endl;
1277  ndbout << " -t Number of threads to start, default 1" << endl;
1278  ndbout << " -o Number of operations per loop, default 500" << endl;
1279  ndbout << " -l Number of loops to run, default 1, 0=infinite" << endl;
1280  ndbout << " -a Number of attributes, default 25" << endl;
1281  ndbout << " -c Number of tables, default 1" << endl;
1282  ndbout << " -s Size of each attribute, default 1 (Primary Key is always of size 1," << endl;
1283  ndbout << " independent of this value)" << endl;
1284  ndbout << " -lkn Number of long primary keys, default 1" << endl;
1285  ndbout << " -lks Size of each long primary key, default 1" << endl;
1286 
1287  ndbout << " -simple Use simple read to read from database" << endl;
1288  ndbout << " -write Use writeTuple in insert and update" << endl;
1289  ndbout << " -stdtables Use standard table names" << endl;
1290  ndbout << " -no_table_create Don't create tables in db" << endl;
1291  ndbout << " -sleep Sleep a number of seconds before running the test, this" << endl;
1292  ndbout << " can be used so that another flexBench have time to create tables" << endl;
1293  ndbout << " -temp Use tables without logging" << endl;
1294  ndbout << " -verify Verify inserts, updates and deletes" << endl ;
1295  theErrorData.printCmdLineArgs(ndbout);
1296  ndbout << endl <<"Returns:" << endl;
1297  ndbout << "\t 0 - Test passed" << endl;
1298  ndbout << "\t 1 - Test failed" << endl;
1299  ndbout << "\t 2 - Invalid arguments" << endl << endl;
1300 }
1301 
1302 // vim: set sw=2: