MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
flex_bench_mysql.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 /* ***************************************************
20 FLEXBENCH
21 Perform benchmark of insert, update and delete transactions
22 
23 Arguments:
24  -t Number of threads to start, default 1
25  -o Number of operations per loop, default 500
26  -l Number of loops to run, default 1, 0=infinite
27  -a Number of attributes, default 25
28  -c Number of tables, default 1
29  -s Size of each attribute, default 1 (Primary Key is always of size 1,
30  independent of this value)
31  -lkn Number of long primary keys, default 1
32  -lks Size of each long primary key, default 1
33  -simple Use simple read to read from database
34  -dirty Use dirty read to read from database
35  -write Use writeTuple in insert and update
36  -stdtables Use standard table names
37  -no_table_create Don't create tables in db
38  -sleep Sleep a number of seconds before running the test, this
39  can be used so that another flexBench have time to create tables
40  -temp Use tables without logging
41  -verify Verify inserts, updates and deletes
42  -use_ndb Use NDB API, otherwise use mysql client
43 #ifdef CEBIT_STAT
44  -statserv host:port statistics server to report to
45  -statfreq ops report every ops operations (default 100)
46 #endif
47  Returns:
48  0 - Test passed
49  1 - Test failed
50  2 - Invalid arguments
51 
52 * *************************************************** */
53 
54 #define USE_MYSQL
55 #ifdef USE_MYSQL
56 #include <mysql.h>
57 #endif
58 
59 #include "NdbApi.hpp"
60 
61 #include <NdbMain.h>
62 #include <NdbOut.hpp>
63 #include <NdbSleep.h>
64 #include <NdbTick.h>
65 #include <NdbTimer.hpp>
66 #include <NdbThread.h>
67 #include <NdbAutoPtr.hpp>
68 
69 #include <NdbTest.hpp>
70 
71 #define MAXSTRLEN 16
72 #define MAXATTR 64
73 #define MAXTABLES 128
74 #define MAXATTRSIZE 1000
75 #define MAXNOLONGKEY 16 // Max number of long keys.
76 #define MAXLONGKEYTOTALSIZE 1023 // words = 4092 bytes
77 
78 extern "C" { static void* flexBenchThread(void*); }
79 static int readArguments(int argc, const char** argv);
80 #ifdef USE_MYSQL
81 static int createTables(MYSQL*);
82 static int dropTables(MYSQL*);
83 #endif
84 static int createTables(Ndb*);
85 static void sleepBeforeStartingTest(int seconds);
86 static void input_error();
87 
88 enum StartType {
89  stIdle,
90  stInsert,
91  stVerify,
92  stRead,
93  stUpdate,
94  stDelete,
95  stTryDelete,
96  stVerifyDelete,
97  stStop
98 };
99 
100 struct ThreadData
101 {
102  int threadNo;
103  NdbThread* threadLife;
104  int threadReady;
105  StartType threadStart;
106  int threadResult;
107 };
108 
109 static int tNodeId = 0 ;
110 static char tableName[MAXTABLES][MAXSTRLEN+1];
111 static char attrName[MAXATTR][MAXSTRLEN+1];
112 static char** longKeyAttrName;
113 
114 // Program Parameters
115 static int tNoOfLoops = 1;
116 static int tAttributeSize = 1;
117 static unsigned int tNoOfThreads = 1;
118 static unsigned int tNoOfTables = 1;
119 static unsigned int tNoOfAttributes = 25;
120 static unsigned int tNoOfOperations = 500;
121 static unsigned int tSleepTime = 0;
122 static unsigned int tNoOfLongPK = 1;
123 static unsigned int tSizeOfLongPK = 1;
124 static unsigned int t_instances = 1;
125 
126 //Program Flags
127 static int theSimpleFlag = 0;
128 static int theDirtyFlag = 0;
129 static int theWriteFlag = 0;
130 static int theStdTableNameFlag = 0;
131 static int theTableCreateFlag = 0;
132 static bool theTempTable = false;
133 static bool VerifyFlag = true;
134 static bool useLongKeys = false;
135 static bool verbose = false;
136 #ifdef USE_MYSQL
137 static bool use_ndb = false;
138 static int engine_id = 0;
139 static int sockets[16];
140 static int n_sockets = 0;
141 static char* engine[] =
142  {
143  " ENGINE = NDBCLUSTER ", // use default engine
144  " ENGINE = MEMORY ",
145  " ENGINE = MYISAM ",
146  " ENGINE = INNODB "
147  };
148 #else
149 static bool use_ndb = true;
150 #endif
151 
152 static ErrorData theErrorData; // Part of flexBench-program
153 
154 #define START_TIMER { NdbTimer timer; timer.doStart();
155 #define STOP_TIMER timer.doStop();
156 #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
157 
158 #include <NdbTCP.h>
159 
160 #ifdef CEBIT_STAT
161 #include <NdbMutex.h>
162 static bool statEnable = false;
163 static char statHost[100];
164 static int statFreq = 100;
165 static int statPort = 0;
166 static int statSock = -1;
167 static enum { statError = -1, statClosed, statOpen } statState;
168 static NdbMutex statMutex = NDB_MUTEX_INITIALIZER;
169 #endif
170 
171 //-------------------------------------------------------------------
172 // Statistical Reporting routines
173 //-------------------------------------------------------------------
174 #ifdef CEBIT_STAT
175 // Experimental client-side statistic for CeBIT
176 
177 static void
178 statReport(enum StartType st, int ops)
179 {
180  if (!statEnable)
181  return;
182  if (NdbMutex_Lock(&statMutex) < 0) {
183  if (statState != statError) {
184  ndbout_c("stat: lock mutex failed: %s", strerror(errno));
185  statState = statError;
186  }
187  return;
188  }
189  static int nodeid;
190  // open connection
191  if (statState != statOpen) {
192  char *p = getenv("NDB_NODEID");
193  nodeid = p == 0 ? 0 : atoi(p);
194  if ((statSock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
195  if (statState != statError) {
196  ndbout_c("stat: create socket failed: %s", strerror(errno));
197  statState = statError;
198  }
199  (void)NdbMutex_Unlock(&statMutex);
200  return;
201  }
202  struct sockaddr_in saddr;
203  memset(&saddr, 0, sizeof(saddr));
204  saddr.sin_family = AF_INET;
205  saddr.sin_port = htons(statPort);
206  if (Ndb_getInAddr(&saddr.sin_addr, statHost) < 0) {
207  if (statState != statError) {
208  ndbout_c("stat: host %s not found", statHost);
209  statState = statError;
210  }
211  (void)close(statSock);
212  (void)NdbMutex_Unlock(&statMutex);
213  return;
214  }
215  if (connect(statSock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
216  if (statState != statError) {
217  ndbout_c("stat: connect failed: %s", strerror(errno));
218  statState = statError;
219  }
220  (void)close(statSock);
221  (void)NdbMutex_Unlock(&statMutex);
222  return;
223  }
224  statState = statOpen;
225  ndbout_c("stat: connection to %s:%d opened", statHost, (int)statPort);
226  }
227  const char *text;
228  switch (st) {
229  case stInsert:
230  text = "insert";
231  break;
232  case stVerify:
233  text = "verify";
234  break;
235  case stRead:
236  text = "read";
237  break;
238  case stUpdate:
239  text = "update";
240  break;
241  case stDelete:
242  text = "delete";
243  break;
244  case stVerifyDelete:
245  text = "verifydelete";
246  break;
247  default:
248  text = "unknown";
249  break;
250  }
251  char buf[100];
252  sprintf(buf, "%d %s %d\n", nodeid, text, ops);
253  int len = strlen(buf);
254  // assume SIGPIPE already ignored
255  if (send(statSock, buf, len, 0) != len) {
256  if (statState != statError) {
257  ndbout_c("stat: write failed: %s", strerror(errno));
258  statState = statError;
259  }
260  (void)close(statSock);
261  (void)NdbMutex_Unlock(&statMutex);
262  return;
263  }
264  (void)NdbMutex_Unlock(&statMutex);
265 }
266 #endif // CEBIT_STAT
267 
268 static void
269 resetThreads(ThreadData* pt){
270  for (unsigned int i = 0; i < tNoOfThreads; i++){
271  pt[i].threadReady = 0;
272  pt[i].threadResult = 0;
273  pt[i].threadStart = stIdle;
274  }
275 }
276 
277 static int
278 checkThreadResults(ThreadData* pt){
279  for (unsigned int i = 0; i < tNoOfThreads; i++){
280  if(pt[i].threadResult != 0){
281  ndbout_c("Thread%d reported fatal error %d", i, pt[i].threadResult);
282  return -1;
283  }
284  }
285  return 0;
286 }
287 
288 static
289 void
290 waitForThreads(ThreadData* pt)
291 {
292  int cont = 1;
293  while (cont){
294  NdbSleep_MilliSleep(100);
295  cont = 0;
296  for (unsigned int i = 0; i < tNoOfThreads; i++){
297  if (pt[i].threadReady == 0)
298  cont = 1;
299  }
300  }
301 }
302 
303 static void
304 tellThreads(ThreadData* pt, StartType what)
305 {
306  for (unsigned int i = 0; i < tNoOfThreads; i++)
307  pt[i].threadStart = what;
308 }
309 
310 NDB_COMMAND(flexBench, "flexBench", "flexBench", "flexbench", 65535)
311 {
312  ndb_init();
313  ThreadData* pThreadsData;
314  int tLoops = 0;
315  int returnValue = NDBT_OK;
316  if (readArguments(argc, argv) != 0){
317  input_error();
318  return NDBT_ProgramExit(NDBT_WRONGARGS);
319  }
320  NdbAutoPtr<char> p10;
321  if(useLongKeys){
322  int e1 = sizeof(char*) * tNoOfLongPK;
323  int e2_1 = strlen("KEYATTR ") + 1;
324  int e2 = e2_1 * tNoOfLongPK;
325  char *tmp = (char *) malloc(e1 + e2);
326  p10.reset(tmp);
327  longKeyAttrName = (char **) tmp;
328  tmp += e1;
329  for (Uint32 i = 0; i < tNoOfLongPK; i++) {
330  // longKeyAttrName[i] = (char *) malloc(strlen("KEYATTR ") + 1);
331  longKeyAttrName[i] = tmp;
332  tmp += e2_1;
333  memset(longKeyAttrName[i], 0, e2_1);
334  sprintf(longKeyAttrName[i], "KEYATTR%i", i);
335  }
336  }
337 
339  p12( pThreadsData = new ThreadData[tNoOfThreads] );
340 
341 
342  ndbout << endl << "FLEXBENCH - Starting normal mode" << endl;
343  ndbout << "Perform benchmark of insert, update and delete transactions"<< endl;
344  ndbout << " " << tNoOfThreads << " thread(s) " << endl;
345  ndbout << " " << tNoOfLoops << " iterations " << endl;
346  ndbout << " " << tNoOfTables << " table(s) and " << 1 << " operation(s) per transaction " <<endl;
347  ndbout << " " << tNoOfAttributes << " attributes per table " << endl;
348  ndbout << " " << tNoOfOperations << " transaction(s) per thread and round " << endl;
349  ndbout << " " << tAttributeSize << " is the number of 32 bit words per attribute "<< endl;
350  ndbout << " " << "Table(s) without logging: " << (Uint32)theTempTable << endl;
351 
352  if(useLongKeys)
353  ndbout << " " << "Using long keys with " << tNoOfLongPK << " keys a' " <<
354  tSizeOfLongPK * 4 << " bytes each." << endl;
355 
356  ndbout << " " << "Verification is " ;
357  if(VerifyFlag) {
358  ndbout << "enabled" << endl ;
359  }else{
360  ndbout << "disabled" << endl ;
361  }
362  if (use_ndb) {
363  ndbout << "Use NDB API with NdbPool in this test case" << endl;
364  ndbout << "Pool size = " << t_instances << endl;
365  } else {
366  ndbout << "Use mysql client with " << engine[engine_id];
367  ndbout << " as engine" << endl;
368  }
369  theErrorData.printSettings(ndbout);
370 
371  NdbThread_SetConcurrencyLevel(tNoOfThreads + 2);
372 
373 #ifdef USE_MYSQL
374  MYSQL mysql;
375  if (!use_ndb) {
376  if ( mysql_thread_safe() == 0 ) {
377  ndbout << "Not thread safe mysql library..." << endl;
378  return NDBT_ProgramExit(NDBT_FAILED);
379  }
380 
381  ndbout << "Connecting to MySQL..." <<endl;
382 
383  mysql_init(&mysql);
384  {
385  int the_socket = sockets[0];
386  char the_socket_name[1024];
387  sprintf(the_socket_name, "%s%u%s", "/tmp/mysql.",the_socket,".sock");
388  // sprintf(the_socket_name, "%s", "/tmp/mysql.sock");
389  ndbout << the_socket_name << endl;
390  if ( mysql_real_connect(&mysql,
391  "localhost",
392  "root",
393  "",
394  "test",
395  the_socket,
396  the_socket_name,
397  0) == NULL ) {
398  ndbout << "Connect failed" <<endl;
399  returnValue = NDBT_FAILED;
400  }
401  mysql.reconnect= 1;
402  }
403  if(returnValue == NDBT_OK){
404  mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
405  if (createTables(&mysql) != 0){
406  returnValue = NDBT_FAILED;
407  }
408  }
409  }
410 #endif
411  if (use_ndb) {
412  Uint32 ndb_id = 0;
413  if (!create_instance(t_instances, 1, t_instances)) {
414  ndbout << "Creation of the NdbPool failed" << endl;
415  returnValue = NDBT_FAILED;
416  } else {
417  Ndb* pNdb = get_ndb_object(ndb_id, "test", "def");
418  if (pNdb == NULL) {
419  ndbout << "Failed to get a NDB object" << endl;
420  returnValue = NDBT_FAILED;
421  } else {
422  tNodeId = pNdb->getNodeId();
423  ndbout << " NdbAPI node with id = " << tNodeId << endl;
424  ndbout << endl;
425 
426  ndbout << "Waiting for ndb to become ready..." <<endl;
427  if (pNdb->waitUntilReady(2000) != 0){
428  ndbout << "NDB is not ready" << endl;
429  ndbout << "Benchmark failed!" << endl;
430  returnValue = NDBT_FAILED;
431  }
432  if(returnValue == NDBT_OK){
433  if (createTables(pNdb) != 0){
434  returnValue = NDBT_FAILED;
435  }
436  }
437  return_ndb_object(pNdb, ndb_id);
438  }
439  }
440  }
441  if(returnValue == NDBT_OK){
442 
443  sleepBeforeStartingTest(tSleepTime);
444 
445  /****************************************************************
446  * Create threads. *
447  ****************************************************************/
448  resetThreads(pThreadsData);
449 
450  for (unsigned int i = 0; i < tNoOfThreads; i++){
451  pThreadsData[i].threadNo = i;
452  pThreadsData[i].threadLife = NdbThread_Create(flexBenchThread,
453  (void**)&pThreadsData[i],
454  32768,
455  "flexBenchThread",
456  NDB_THREAD_PRIO_LOW);
457  }
458 
459  waitForThreads(pThreadsData);
460 
461  ndbout << endl << "All threads started" << endl << endl;
462 
463  /****************************************************************
464  * Execute program. *
465  ****************************************************************/
466 
467  for(;;){
468 
469  int loopCount = tLoops + 1;
470  ndbout << endl << "Loop # " << loopCount << endl << endl;
471 
472  /****************************************************************
473  * Perform inserts. *
474  ****************************************************************/
475  // Reset and start timer
476  START_TIMER;
477  // Give insert-command to all threads
478  resetThreads(pThreadsData);
479  tellThreads(pThreadsData, stInsert);
480  waitForThreads(pThreadsData);
481  if (checkThreadResults(pThreadsData) != 0){
482  ndbout << "Error: Threads failed in performing insert" << endl;
483  returnValue = NDBT_FAILED;
484  break;
485  }
486  // stop timer and print results.
487  STOP_TIMER;
488  PRINT_TIMER("insert", tNoOfOperations*tNoOfThreads, tNoOfTables);
489  /****************************************************************
490  * Verify inserts. *
491  ****************************************************************/
492  if (VerifyFlag) {
493  resetThreads(pThreadsData);
494  ndbout << "Verifying inserts...\t" ;
495  tellThreads(pThreadsData, stVerify);
496  waitForThreads(pThreadsData);
497  if (checkThreadResults(pThreadsData) != 0){
498  ndbout << "Error: Threads failed while verifying inserts" << endl;
499  returnValue = NDBT_FAILED;
500  break;
501  }else{
502  ndbout << "\t\tOK" << endl << endl ;
503  }
504  }
505 
506  /****************************************************************
507  * Perform read. *
508  ****************************************************************/
509  // Reset and start timer
510  START_TIMER;
511  // Give read-command to all threads
512  resetThreads(pThreadsData);
513  tellThreads(pThreadsData, stRead);
514  waitForThreads(pThreadsData);
515  if (checkThreadResults(pThreadsData) != 0){
516  ndbout << "Error: Threads failed in performing read" << endl;
517  returnValue = NDBT_FAILED;
518  break;
519  }
520  // stop timer and print results.
521  STOP_TIMER;
522  PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables);
523 
524  /****************************************************************
525  * Perform update. *
526  ****************************************************************/
527  // Reset and start timer
528  START_TIMER;
529  // Give update-command to all threads
530  resetThreads(pThreadsData);
531  tellThreads(pThreadsData, stUpdate);
532  waitForThreads(pThreadsData);
533  if (checkThreadResults(pThreadsData) != 0){
534  ndbout << "Error: Threads failed in performing update" << endl;
535  returnValue = NDBT_FAILED;
536  break;
537  }
538  // stop timer and print results.
539  STOP_TIMER;
540  PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables);
541 
542  /****************************************************************
543  * Verify updates. *
544  ****************************************************************/
545  if (VerifyFlag) {
546  resetThreads(pThreadsData);
547  ndbout << "Verifying updates...\t" ;
548  tellThreads(pThreadsData, stVerify);
549  waitForThreads(pThreadsData);
550  if (checkThreadResults(pThreadsData) != 0){
551  ndbout << "Error: Threads failed while verifying updates" << endl;
552  returnValue = NDBT_FAILED;
553  break;
554  }else{
555  ndbout << "\t\tOK" << endl << endl ;
556  }
557  }
558 
559  /****************************************************************
560  * Perform read. *
561  ****************************************************************/
562  // Reset and start timer
563  START_TIMER;
564  // Give read-command to all threads
565  resetThreads(pThreadsData);
566  tellThreads(pThreadsData, stRead);
567  waitForThreads(pThreadsData);
568  if (checkThreadResults(pThreadsData) != 0){
569  ndbout << "Error: Threads failed in performing read" << endl;
570  returnValue = NDBT_FAILED;
571  break;
572  }
573  // stop timer and print results.
574  STOP_TIMER;
575  PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables);
576 
577  /****************************************************************
578  * Perform delete. *
579  ****************************************************************/
580  // Reset and start timer
581  START_TIMER;
582  // Give delete-command to all threads
583  resetThreads(pThreadsData);
584  tellThreads(pThreadsData, stDelete);
585  waitForThreads(pThreadsData);
586  if (checkThreadResults(pThreadsData) != 0){
587  ndbout << "Error: Threads failed in performing delete" << endl;
588  returnValue = NDBT_FAILED;
589  break;
590  }
591  // stop timer and print results.
592  STOP_TIMER;
593  PRINT_TIMER("delete", tNoOfOperations*tNoOfThreads, tNoOfTables);
594 
595  /****************************************************************
596  * Verify deletes. *
597  ****************************************************************/
598  if (VerifyFlag) {
599  resetThreads(pThreadsData);
600  ndbout << "Verifying tuple deletion..." ;
601  tellThreads(pThreadsData, stVerifyDelete);
602  waitForThreads(pThreadsData);
603  if (checkThreadResults(pThreadsData) != 0){
604  ndbout << "Error: Threads failed in verifying deletes" << endl;
605  returnValue = NDBT_FAILED;
606  break;
607  }else{
608  ndbout << "\t\tOK" << endl << endl ;
609  }
610  }
611 
612  ndbout << "--------------------------------------------------" << endl;
613 
614  tLoops++;
615 
616  if ( 0 != tNoOfLoops && tNoOfLoops <= tLoops )
617  break;
618  theErrorData.printErrorCounters();
619  }
620 
621  resetThreads(pThreadsData);
622  tellThreads(pThreadsData, stStop);
623  waitForThreads(pThreadsData);
624 
625  void * tmp;
626  for(Uint32 i = 0; i<tNoOfThreads; i++){
627  NdbThread_WaitFor(pThreadsData[i].threadLife, &tmp);
628  NdbThread_Destroy(&pThreadsData[i].threadLife);
629  }
630  }
631 #ifdef USE_MYSQL
632  if (!use_ndb) {
633  dropTables(&mysql);
634  mysql_close(&mysql);
635  }
636 #endif
637  if (use_ndb) {
638  drop_instance();
639  }
640  theErrorData.printErrorCounters();
641  return NDBT_ProgramExit(returnValue);
642 }
644 
645 
646 unsigned long get_hash(unsigned long * hash_key, int len)
647 {
648  unsigned long hash_value = 147;
649  unsigned h_key;
650  int i;
651  for (i = 0; i < len; i++)
652  {
653  h_key = hash_key[i];
654  hash_value = (hash_value << 5) + hash_value + (h_key & 255);
655  hash_value = (hash_value << 5) + hash_value + ((h_key >> 8) & 255);
656  hash_value = (hash_value << 5) + hash_value + ((h_key >> 16) & 255);
657  hash_value = (hash_value << 5) + hash_value + ((h_key >> 24) & 255);
658  }
659  return hash_value;
660 }
661 
662 // End of warming up phase
663 
664 
665 
666 static void* flexBenchThread(void* pArg)
667 {
668  ThreadData* pThreadData = (ThreadData*)pArg;
669  unsigned int threadNo, threadBase;
670  Ndb* pNdb = NULL ;
671  Uint32 ndb_id = 0;
672  NdbConnection *pTrans = NULL ;
673  NdbOperation** pOps = NULL ;
674  StartType tType ;
675  StartType tSaveType ;
676  NdbRecAttr* tTmp = NULL ;
677  int* attrValue = NULL ;
678  int* attrRefValue = NULL ;
679  int check = 0 ;
680  int loopCountOps, loopCountTables, loopCountAttributes;
681  int tAttemptNo = 0;
682  int tRetryAttempts = 20;
683  int tResult = 0;
684  int tSpecialTrans = 0;
685  int nRefLocalOpOffset = 0 ;
686  int nReadBuffSize =
687  tNoOfTables * tNoOfAttributes * sizeof(int) * tAttributeSize ;
688  int nRefBuffSize =
689  tNoOfOperations * tNoOfAttributes * sizeof(int) * tAttributeSize ;
690  unsigned*** longKeyAttrValue = NULL;
691 
692 
693  threadNo = pThreadData->threadNo ;
694 
695 #ifdef USE_MYSQL
696  MYSQL mysql;
697  int the_socket = sockets[threadNo % n_sockets];
698  char the_socket_name[1024];
699  //sprintf(the_socket_name, "%s", "/tmp/mysql.sock");
700  sprintf(the_socket_name, "%s%u%s", "/tmp/mysql.",the_socket,".sock");
701  if (!use_ndb) {
702  ndbout << the_socket_name << endl;
703  ndbout << "Thread connecting to MySQL... " << endl;
704  mysql_init(&mysql);
705 
706  if ( mysql_real_connect(&mysql,
707  "localhost",
708  "root",
709  "",
710  "test",
711  the_socket,
712  the_socket_name,
713  0) == NULL ) {
714  ndbout << "failed" << endl;
715  return 0;
716  }
717  mysql.reconnect= 1;
718  ndbout << "ok" << endl;
719 
720  int r;
721  if (tNoOfTables > 1)
722  r = mysql_autocommit(&mysql, 0);
723  else
724  r = mysql_autocommit(&mysql, 1);
725 
726  if (r) {
727  ndbout << "autocommit on/off failed" << endl;
728  return 0;
729  }
730  }
731 #endif
732 
733  NdbAutoPtr<int> p00( attrValue= (int*)malloc(nReadBuffSize) ) ;
734  NdbAutoPtr<int> p01( attrRefValue= (int*)malloc(nRefBuffSize) );
735  if (use_ndb) {
736  pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ;
737  }
738  NdbAutoPtr<NdbOperation*> p02( pOps );
739 
740  if( !attrValue || !attrRefValue ||
741  ( use_ndb && ( !pOps) ) ){
742  // Check allocations to make sure we got all the memory we asked for
743  ndbout << "One or more memory allocations failed when starting thread #";
744  ndbout << threadNo << endl ;
745  ndbout << "Thread #" << threadNo << " will now exit" << endl ;
746  tResult = 13 ;
747  return 0;
748  }
749 
750  if (use_ndb) {
751  pNdb = get_ndb_object(ndb_id, "test", "def");
752  if (pNdb == NULL) {
753  ndbout << "Failed to get an NDB object" << endl;
754  ndbout << "Thread #" << threadNo << " will now exit" << endl ;
755  tResult = 13;
756  return 0;
757  }
758  pNdb->waitUntilReady();
759  return_ndb_object(pNdb, ndb_id);
760  pNdb = NULL;
761  }
762 
763  // To make sure that two different threads doesn't operate on the same record
764  // Calculate an "unique" number to use as primary key
765  threadBase = (threadNo * 2000000) + (tNodeId * 260000000);
766 
767  NdbAutoPtr<char> p22;
768  if(useLongKeys){
769  // Allocate and populate the longkey array.
770  int e1 = sizeof(unsigned**) * tNoOfOperations;
771  int e2 = sizeof(unsigned*) * tNoOfLongPK * tNoOfOperations;
772  int e3 = sizeof(unsigned) * tSizeOfLongPK * tNoOfLongPK * tNoOfOperations;
773  char* tmp;
774  p22.reset(tmp = (char*)malloc(e1+e2+e3));
775 
776  longKeyAttrValue = (unsigned ***) tmp;
777  tmp += e1;
778  for (Uint32 n = 0; n < tNoOfOperations; n++) {
779  longKeyAttrValue[n] = (unsigned **) tmp;
780  tmp += sizeof(unsigned*) * tNoOfLongPK;
781  }
782 
783  for (Uint32 n = 0; n < tNoOfOperations; n++){
784  for (Uint32 i = 0; i < tNoOfLongPK ; i++) {
785  longKeyAttrValue[n][i] = (unsigned *) tmp;
786  tmp += sizeof(unsigned) * tSizeOfLongPK;
787  memset(longKeyAttrValue[n][i], 0, sizeof(unsigned) * tSizeOfLongPK);
788  for(Uint32 j = 0; j < tSizeOfLongPK; j++) {
789  // Repeat the unique value to fill up the long key.
790  longKeyAttrValue[n][i][j] = threadBase + n;
791  }
792  }
793  }
794  }
795 
796  int nRefOpOffset = 0 ;
797  //Assign reference attribute values to memory
798  for(Uint32 ops = 1 ; ops < tNoOfOperations ; ops++){
799  // Calculate offset value before going into the next loop
800  nRefOpOffset = tAttributeSize*tNoOfAttributes*(ops-1) ;
801  for(Uint32 a = 0 ; a < tNoOfAttributes ; a++){
802  *(int*)&attrRefValue[nRefOpOffset + tAttributeSize*a] =
803  (int)(threadBase + ops + a) ;
804  }
805  }
806 
807 #ifdef CEBIT_STAT
808  // ops not yet reported
809  int statOps = 0;
810 #endif
811 
812 #ifdef USE_MYSQL
813  // temporary buffer to store prepared statement text
814  char buf[2048];
815  MYSQL_STMT** prep_read = NULL;
816  MYSQL_STMT** prep_delete = NULL;
817  MYSQL_STMT** prep_update = NULL;
818  MYSQL_STMT** prep_insert = NULL;
819  MYSQL_BIND* bind_delete = NULL;
820  MYSQL_BIND* bind_read = NULL;
821  MYSQL_BIND* bind_update = NULL;
822  MYSQL_BIND* bind_insert = NULL;
823  int* mysql_data = NULL;
824 
825  NdbAutoPtr<char> p21;
826 
827  if (!use_ndb) {
828  // data array to which prepared statements are bound
829  char* tmp;
830  int e1 = sizeof(int)*tAttributeSize*tNoOfAttributes;
831  int e2 = sizeof(MYSQL_BIND)*tNoOfAttributes;
832  int e3 = sizeof(MYSQL_BIND)*tNoOfAttributes;
833  int e4 = sizeof(MYSQL_BIND)*tNoOfAttributes;
834  int e5 = sizeof(MYSQL_BIND)*1;
835  int e6 = sizeof(MYSQL_STMT*)*tNoOfTables;
836  int e7 = sizeof(MYSQL_STMT*)*tNoOfTables;
837  int e8 = sizeof(MYSQL_STMT*)*tNoOfTables;
838  int e9 = sizeof(MYSQL_STMT*)*tNoOfTables;
839  p21.reset(tmp = (char*)malloc(e1+e2+e3+e4+e5+e6+e7+e8+e9));
840 
841  mysql_data = (int*)tmp; tmp += e1;
842  bind_insert = (MYSQL_BIND*)tmp; tmp += e2;
843  bind_update = (MYSQL_BIND*)tmp; tmp += e3;
844  bind_read = (MYSQL_BIND*)tmp; tmp += e4;
845  bind_delete = (MYSQL_BIND*)tmp; tmp += e5;
846  prep_insert = (MYSQL_STMT**)tmp; tmp += e6;
847  prep_update = (MYSQL_STMT**)tmp; tmp += e7;
848  prep_read = (MYSQL_STMT**)tmp; tmp += e8;
849  prep_delete = (MYSQL_STMT**)tmp;
850 
851  for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
852  MYSQL_BIND& bi = bind_insert[ca];
853  bi.buffer_type = MYSQL_TYPE_LONG;
854  bi.buffer = (char*)&mysql_data[ca*tAttributeSize];
855  bi.buffer_length = 0;
856  bi.length = NULL;
857  bi.is_null = NULL;
858  }//for
859 
860  for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
861  MYSQL_BIND& bi = bind_update[ca];
862  bi.buffer_type = MYSQL_TYPE_LONG;
863  if ( ca == tNoOfAttributes-1 ) // the primary key comes last in statement
864  bi.buffer = (char*)&mysql_data[0];
865  else
866  bi.buffer = (char*)&mysql_data[(ca+1)*tAttributeSize];
867  bi.buffer_length = 0;
868  bi.length = NULL;
869  bi.is_null = NULL;
870  }//for
871 
872  for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
873  MYSQL_BIND& bi = bind_read[ca];
874  bi.buffer_type = MYSQL_TYPE_LONG;
875  bi.buffer = (char*)&mysql_data[ca*tAttributeSize];
876  bi.buffer_length = 4;
877  bi.length = NULL;
878  bi.is_null = NULL;
879  }//for
880 
881  for (Uint32 ca = 0; ca < 1; ca++){
882  MYSQL_BIND& bi = bind_delete[ca];
883  bi.buffer_type = MYSQL_TYPE_LONG;
884  bi.buffer = (char*)&mysql_data[ca*tAttributeSize];
885  bi.buffer_length = 0;
886  bi.length = NULL;
887  bi.is_null = NULL;
888  }//for
889 
890  for (Uint32 i = 0; i < tNoOfTables; i++) {
891  int pos = 0;
892  pos += sprintf(buf+pos, "%s%s%s",
893  "INSERT INTO ",
894  tableName[i],
895  " VALUES(");
896  pos += sprintf(buf+pos, "%s", "?");
897  for (Uint32 j = 1; j < tNoOfAttributes; j++) {
898  pos += sprintf(buf+pos, "%s", ",?");
899  }
900  pos += sprintf(buf+pos, "%s", ")");
901  if (verbose)
902  ndbout << buf << endl;
903  prep_insert[i] = mysql_prepare(&mysql, buf, pos);
904  if (prep_insert[i] == 0) {
905  ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
906  return 0;
907  }
908  if (mysql_bind_param(prep_insert[i], bind_insert)) {
909  ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
910  return 0;
911  }
912  }
913 
914  for (Uint32 i = 0; i < tNoOfTables; i++) {
915  int pos = 0;
916  pos += sprintf(buf+pos, "%s%s%s",
917  "UPDATE ",
918  tableName[i],
919  " SET ");
920  for (Uint32 j = 1; j < tNoOfAttributes; j++) {
921  if (j != 1)
922  pos += sprintf(buf+pos, "%s", ",");
923  pos += sprintf(buf+pos, "%s%s", attrName[j],"=?");
924  }
925  pos += sprintf(buf+pos, "%s%s%s", " WHERE ", attrName[0], "=?");
926 
927  if (verbose)
928  ndbout << buf << endl;
929  prep_update[i] = mysql_prepare(&mysql, buf, pos);
930  if (prep_update[i] == 0) {
931  ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
932  return 0;
933  }
934  if (mysql_bind_param(prep_update[i], bind_update)) {
935  ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
936  return 0;
937  }
938  }
939 
940  for (Uint32 i = 0; i < tNoOfTables; i++) {
941  int pos = 0;
942  pos += sprintf(buf+pos, "%s", "SELECT ");
943  for (Uint32 j = 1; j < tNoOfAttributes; j++) {
944  if (j != 1)
945  pos += sprintf(buf+pos, "%s", ",");
946  pos += sprintf(buf+pos, "%s", attrName[j]);
947  }
948  pos += sprintf(buf+pos, "%s%s%s%s%s",
949  " FROM ",
950  tableName[i],
951  " WHERE ",
952  attrName[0],
953  "=?");
954  if (verbose)
955  ndbout << buf << endl;
956  prep_read[i] = mysql_prepare(&mysql, buf, pos);
957  if (prep_read[i] == 0) {
958  ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
959  return 0;
960  }
961  if (mysql_bind_param(prep_read[i], bind_read)) {
962  ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
963  return 0;
964  }
965  if (mysql_bind_result(prep_read[i], &bind_read[1])) {
966  ndbout << "mysql_bind_result: " << mysql_error(&mysql) << endl;
967  return 0;
968  }
969  }
970 
971  for (Uint32 i = 0; i < tNoOfTables; i++) {
972  int pos = 0;
973  pos += sprintf(buf+pos, "%s%s%s%s%s",
974  "DELETE FROM ",
975  tableName[i],
976  " WHERE ",
977  attrName[0],
978  "=?");
979  if (verbose)
980  ndbout << buf << endl;
981  prep_delete[i] = mysql_prepare(&mysql, buf, pos);
982  if (prep_delete[i] == 0) {
983  ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
984  return 0;
985  }
986  if (mysql_bind_param(prep_delete[i], bind_delete)) {
987  ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
988  return 0;
989  }
990  }
991  }
992 #endif
993 
994  for (;;) {
995  pThreadData->threadResult = tResult; // Report error to main thread,
996  // normally tResult is set to 0
997  pThreadData->threadReady = 1;
998 
999  while (pThreadData->threadStart == stIdle){
1000  NdbSleep_MilliSleep(100);
1001  }//while
1002 
1003  // Check if signal to exit is received
1004  if (pThreadData->threadStart == stStop){
1005  pThreadData->threadReady = 1;
1006  // ndbout_c("Thread%d is stopping", threadNo);
1007  // In order to stop this thread, the main thread has signaled
1008  // stStop, break out of the for loop so that destructors
1009  // and the proper exit functions are called
1010  break;
1011  }//if
1012 
1013  tType = pThreadData->threadStart;
1014  tSaveType = tType;
1015  pThreadData->threadStart = stIdle;
1016 
1017  // Start transaction, type of transaction
1018  // is received in the array ThreadStart
1019  loopCountOps = tNoOfOperations;
1020  loopCountTables = tNoOfTables;
1021  loopCountAttributes = tNoOfAttributes;
1022  for (int count = 1; count < loopCountOps && tResult == 0;){
1023 
1024  if (use_ndb) {
1025  pNdb = get_ndb_object(ndb_id, "test", "def");
1026  if (pNdb == NULL) {
1027  ndbout << "Could not get Ndb object in thread" << threadNo;
1028  ndbout << endl;
1029  tResult = 1; //Indicate fatal error
1030  break;
1031  }
1032  pTrans = pNdb->startTransaction();
1033  if (pTrans == NULL) {
1034  // This is a fatal error, abort program
1035  ndbout << "Could not start transaction in thread" << threadNo;
1036  ndbout << endl;
1037  ndbout << pNdb->getNdbError() << endl;
1038  tResult = 1; // Indicate fatal error
1039  break; // Break out of for loop
1040  }
1041  }
1042 
1043  // Calculate the current operation offset in the reference array
1044  nRefLocalOpOffset = tAttributeSize*tNoOfAttributes*(count - 1) ;
1045  int* tmpAttrRefValue = attrRefValue + nRefLocalOpOffset;
1046 
1047  for (int countTables = 0;
1048  countTables < loopCountTables && tResult == 0;
1049  countTables++) {
1050 
1051  int nTableOffset = tAttributeSize *
1052  loopCountAttributes *
1053  countTables ;
1054 
1055  int* tmpAttrValue = attrValue + nTableOffset;
1056 
1057  if (use_ndb) {
1058  pOps[countTables] = pTrans->getNdbOperation(tableName[countTables]);
1059  if (pOps[countTables] == NULL) {
1060  // This is a fatal error, abort program
1061  ndbout << "getNdbOperation: " << pTrans->getNdbError();
1062  tResult = 2; // Indicate fatal error
1063  break;
1064  }//if
1065 
1066  switch (tType) {
1067  case stInsert: // Insert case
1068  if (theWriteFlag == 1 && theDirtyFlag == 1)
1069  pOps[countTables]->dirtyWrite();
1070  else if (theWriteFlag == 1)
1071  pOps[countTables]->writeTuple();
1072  else
1073  pOps[countTables]->insertTuple();
1074  break;
1075  case stRead: // Read Case
1076  if (theSimpleFlag == 1)
1077  pOps[countTables]->simpleRead();
1078  else if (theDirtyFlag == 1)
1079  pOps[countTables]->dirtyRead();
1080  else
1081  pOps[countTables]->readTuple();
1082  break;
1083  case stUpdate: // Update Case
1084  if (theWriteFlag == 1 && theDirtyFlag == 1)
1085  pOps[countTables]->dirtyWrite();
1086  else if (theWriteFlag == 1)
1087  pOps[countTables]->writeTuple();
1088  else if (theDirtyFlag == 1)
1089  pOps[countTables]->dirtyUpdate();
1090  else
1091  pOps[countTables]->updateTuple();
1092  break;
1093  case stDelete: // Delete Case
1094  pOps[countTables]->deleteTuple();
1095  break;
1096  case stVerify:
1097  pOps[countTables]->readTuple();
1098  break;
1099  case stVerifyDelete:
1100  pOps[countTables]->readTuple();
1101  break;
1102  default:
1103  assert(false);
1104  }//switch
1105 
1106  if(useLongKeys){
1107  // Loop the equal call so the complete key is send to the kernel.
1108  for(Uint32 i = 0; i < tNoOfLongPK; i++)
1109  pOps[countTables]->equal(longKeyAttrName[i],
1110  (char *)longKeyAttrValue[count - 1][i],
1111  tSizeOfLongPK*4);
1112  }
1113  else
1114  pOps[countTables]->equal((char*)attrName[0],
1115  (char*)&tmpAttrRefValue[0]);
1116 
1117  if (tType == stInsert) {
1118  for (int ca = 1; ca < loopCountAttributes; ca++){
1119  pOps[countTables]->setValue((char*)attrName[ca],
1120  (char*)&tmpAttrRefValue[tAttributeSize*ca]);
1121  }//for
1122  } else if (tType == stUpdate) {
1123  for (int ca = 1; ca < loopCountAttributes; ca++){
1124  int* tmp = (int*)&tmpAttrRefValue[tAttributeSize*ca];
1125  if (countTables == 0)
1126  (*tmp)++;
1127  pOps[countTables]->setValue((char*)attrName[ca],(char*)tmp);
1128  }//for
1129  } else if (tType == stRead || stVerify == tType) {
1130  for (int ca = 1; ca < loopCountAttributes; ca++) {
1131  tTmp =
1132  pOps[countTables]->getValue((char*)attrName[ca],
1133  (char*)&tmpAttrValue[tAttributeSize*ca]);
1134  }//for
1135  } else if (stVerifyDelete == tType) {
1136  if(useLongKeys){
1137  tTmp = pOps[countTables]->getValue(longKeyAttrName[0],
1138  (char*)&tmpAttrValue[0]);
1139  } else {
1140  tTmp = pOps[countTables]->getValue((char*)attrName[0],
1141  (char*)&tmpAttrValue[0]);
1142  }
1143  }//if
1144  } else { // !use_ndb
1145 #ifndef USE_MYSQL
1146  assert(false);
1147 #else
1148  switch (tType)
1149  {
1150  case stInsert:
1151  for (int ca = 0; ca < loopCountAttributes; ca++){
1152  mysql_data[ca] = tmpAttrRefValue[tAttributeSize*ca];
1153  }//for
1154  if (mysql_execute(prep_insert[countTables])) {
1155  ndbout << tableName[countTables];
1156  ndbout << " mysql_execute: " << mysql_error(&mysql) << endl;
1157  tResult = 1 ;
1158  }
1159  break;
1160  case stUpdate: // Update Case
1161  mysql_data[0] = tmpAttrRefValue[0];
1162  for (int ca = 1; ca < loopCountAttributes; ca++){
1163  int* tmp = (int*)&tmpAttrRefValue[tAttributeSize*ca];
1164  if (countTables == 0)
1165  (*tmp)++;
1166  mysql_data[ca] = *tmp;
1167  }//for
1168  if (mysql_execute(prep_update[countTables])) {
1169  ndbout << tableName[countTables];
1170  ndbout << " mysql_execute: " << mysql_error(&mysql) << endl;
1171  tResult = 2 ;
1172  }
1173  break;
1174  case stVerify:
1175  case stRead: // Read Case
1176  mysql_data[0] = tmpAttrRefValue[0];
1177  if (mysql_execute(prep_read[countTables])) {
1178  ndbout << tableName[countTables];
1179  ndbout << " mysql_execute: " << mysql_error(&mysql) << endl;
1180  tResult = 3 ;
1181  break;
1182  }
1183  if (mysql_stmt_store_result(prep_read[countTables])) {
1184  ndbout << tableName[countTables];
1185  ndbout << " mysql_stmt_store_result: "
1186  << mysql_error(&mysql) << endl;
1187  tResult = 4 ;
1188  break;
1189  }
1190  {
1191  int rows= 0;
1192  int r;
1193  while ( (r= mysql_fetch(prep_read[countTables])) == 0 ){
1194  rows++;
1195  }
1196  if ( r == 1 ) {
1197  ndbout << tableName[countTables];
1198  ndbout << " mysql_fetch: " << mysql_error(&mysql) << endl;
1199  tResult = 5 ;
1200  break;
1201  }
1202  if ( rows != 1 ) {
1203  ndbout << tableName[countTables];
1204  ndbout << " mysql_fetch: rows = " << rows << endl;
1205  tResult = 6 ;
1206  break;
1207  }
1208  }
1209  {
1210  for (int ca = 1; ca < loopCountAttributes; ca++) {
1211  tmpAttrValue[tAttributeSize*ca] = mysql_data[ca];
1212  }
1213  }
1214  break;
1215  case stDelete: // Delete Case
1216  mysql_data[0] = tmpAttrRefValue[0];
1217  if (mysql_execute(prep_delete[countTables])) {
1218  ndbout << tableName[countTables];
1219  ndbout << " mysql_execute: " << mysql_error(&mysql) << endl;
1220  tResult = 7 ;
1221  break;
1222  }
1223  break;
1224  case stVerifyDelete:
1225  {
1226  sprintf(buf, "%s%s%s",
1227  "SELECT COUNT(*) FROM ",tableName[countTables],";");
1228  if (mysql_query(&mysql, buf)) {
1229  ndbout << buf << endl;
1230  ndbout << "Error: " << mysql_error(&mysql) << endl;
1231  tResult = 8 ;
1232  break;
1233  }
1234  MYSQL_RES *res = mysql_store_result(&mysql);
1235  if ( res == NULL ) {
1236  ndbout << "mysql_store_result: "
1237  << mysql_error(&mysql) << endl
1238  << "errno: " << mysql_errno(&mysql) << endl;
1239  tResult = 9 ;
1240  break;
1241  }
1242  int num_fields = mysql_num_fields(res);
1243  int num_rows = mysql_num_rows(res);
1244  if ( num_rows != 1 || num_fields != 1 ) {
1245  ndbout << tableName[countTables];
1246  ndbout << " mysql_store_result: num_rows = " << num_rows
1247  << " num_fields = " << num_fields << endl;
1248  tResult = 10 ;
1249  break;
1250  }
1251  MYSQL_ROW row = mysql_fetch_row(res);
1252  if ( row == NULL ) {
1253  ndbout << "mysql_fetch_row: "
1254  << mysql_error(&mysql) << endl;
1255  tResult = 11 ;
1256  break;
1257  }
1258  if ( *(char*)row[0] != '0' ) {
1259  ndbout << tableName[countTables];
1260  ndbout << " mysql_fetch_row: value = "
1261  << (char*)(row[0]) << endl;
1262  tResult = 12 ;
1263  break;
1264  }
1265  mysql_free_result(res);
1266  }
1267  break;
1268  default:
1269  assert(false);
1270  }
1271 #endif
1272  }
1273  }//for Tables loop
1274 
1275  if (tResult != 0)
1276  break;
1277 
1278  if (use_ndb){
1279  check = pTrans->execute(Commit);
1280  } else {
1281 #ifdef USE_MYSQL
1282  if (tNoOfTables > 1)
1283  if (mysql_commit(&mysql)) {
1284  ndbout << " mysql_commit: " << mysql_error(&mysql) << endl;
1285  tResult = 13;
1286  } else
1287  check = 0;
1288 #endif
1289  }
1290 
1291  if (use_ndb) {
1292  // Decide what kind of error this is
1293  if ((tSpecialTrans == 1) &&
1294  (check == -1)) {
1295 // --------------------------------------------------------------------
1296 // A special transaction have been executed, change to check = 0 in
1297 // certain situations.
1298 // --------------------------------------------------------------------
1299  switch (tType) {
1300  case stInsert: // Insert case
1301  if (630 == pTrans->getNdbError().code ) {
1302  check = 0;
1303  ndbout << "Insert with 4007 was successful" << endl;
1304  }//if
1305  break;
1306  case stDelete: // Delete Case
1307  if (626 == pTrans->getNdbError().code ) {
1308  check = 0;
1309  ndbout << "Delete with 4007 was successful" << endl;
1310  }//if
1311  break;
1312  default:
1313  assert(false);
1314  }//switch
1315  }//if
1316  tSpecialTrans = 0;
1317  if (check == -1) {
1318  if ((stVerifyDelete == tType) &&
1319  (626 == pTrans->getNdbError().code)) {
1320  // ----------------------------------------------
1321  // It's good news - the deleted tuple is gone,
1322  // so reset "check" flag
1323  // ----------------------------------------------
1324  check = 0 ;
1325  } else {
1326  int retCode =
1327  theErrorData.handleErrorCommon(pTrans->getNdbError());
1328  if (retCode == 1) {
1329  ndbout_c("execute: %d, %d, %s", count, tType,
1330  pTrans->getNdbError().message );
1331  ndbout_c("Error code = %d", pTrans->getNdbError().code );
1332  tResult = 20;
1333  } else if (retCode == 2) {
1334  ndbout << "4115 should not happen in flexBench" << endl;
1335  tResult = 20;
1336  } else if (retCode == 3) {
1337 // --------------------------------------------------------------------
1338 // We are not certain if the transaction was successful or not.
1339 // We must reexecute but might very well find that the transaction
1340 // actually was updated. Updates and Reads are no problem here. Inserts
1341 // will not cause a problem if error code 630 arrives. Deletes will
1342 // not cause a problem if 626 arrives.
1343 // --------------------------------------------------------------------
1344  if ((tType == stInsert) || (tType == stDelete)) {
1345  tSpecialTrans = 1;
1346  }//if
1347  }//if
1348  }//if
1349  }//if
1350  // Check if retries should be made
1351  if (check == -1 && tResult == 0) {
1352  if (tAttemptNo < tRetryAttempts){
1353  tAttemptNo++;
1354  } else {
1355 // --------------------------------------------------------------------
1356 // Too many retries have been made, report error and break out of loop
1357 // --------------------------------------------------------------------
1358  ndbout << "Thread" << threadNo;
1359  ndbout << ": too many errors reported" << endl;
1360  tResult = 10;
1361  break;
1362  }//if
1363  }//if
1364  }
1365 
1366  if (check == 0){
1367  // Go to the next record
1368  count++;
1369  tAttemptNo = 0;
1370 #ifdef CEBIT_STAT
1371  // report successful ops
1372  if (statEnable) {
1373  statOps += loopCountTables;
1374  if (statOps >= statFreq) {
1375  statReport(tType, statOps);
1376  statOps = 0;
1377  }//if
1378  }//if
1379 #endif
1380  }//if
1381 
1382  if (stVerify == tType && 0 == check){
1383  int nTableOffset = 0 ;
1384  for (int a = 1 ; a < loopCountAttributes ; a++){
1385  for (int tables = 0 ; tables < loopCountTables ; tables++){
1386  nTableOffset = tables*loopCountAttributes*tAttributeSize;
1387  int ov =*(int*)&attrValue[nTableOffset + tAttributeSize*a];
1388  int nv =*(int*)&tmpAttrRefValue[tAttributeSize*a];
1389  if (ov != nv){
1390  ndbout << "Error in verify ";
1391  ndbout << "pk = " << tmpAttrRefValue[0] << ":" << endl;
1392  ndbout << "attrValue[" << nTableOffset + tAttributeSize*a << "] = " << ov << endl ;
1393  ndbout << "attrRefValue[" << nRefLocalOpOffset + tAttributeSize*a << "]" << nv << endl ;
1394  tResult = 11 ;
1395  break ;
1396  }//if
1397  }//for
1398  }//for
1399  }// if(stVerify ... )
1400  if (use_ndb) {
1401  pNdb->closeTransaction(pTrans);
1402  return_ndb_object(pNdb, ndb_id);
1403  pNdb = NULL;
1404  }
1405  }// operations loop
1406 #ifdef CEBIT_STAT
1407  // report remaining successful ops
1408  if (statEnable) {
1409  if (statOps > 0) {
1410  statReport(tType, statOps);
1411  statOps = 0;
1412  }//if
1413  }//if
1414 #endif
1415  if (pNdb) {
1416  pNdb->closeTransaction(pTrans);
1417  return_ndb_object(pNdb, ndb_id);
1418  pNdb = NULL;
1419  }
1420  }
1421 
1422 #ifdef USE_MYSQL
1423  if (!use_ndb) {
1424  mysql_close(&mysql);
1425  for (Uint32 i = 0; i < tNoOfTables; i++) {
1426  mysql_stmt_close(prep_insert[i]);
1427  mysql_stmt_close(prep_update[i]);
1428  mysql_stmt_close(prep_delete[i]);
1429  mysql_stmt_close(prep_read[i]);
1430  }
1431  }
1432 #endif
1433  if (use_ndb && pNdb) {
1434  ndbout << "I got here " << endl;
1435  return_ndb_object(pNdb, ndb_id);
1436  }
1437  return NULL;
1438 }
1439 
1440 
1441 static int readArguments(int argc, const char** argv)
1442 {
1443 
1444  int i = 1;
1445  while (argc > 1){
1446  if (strcmp(argv[i], "-t") == 0){
1447  tNoOfThreads = atoi(argv[i+1]);
1448  if ((tNoOfThreads < 1))
1449  return -1;
1450  argc -= 1;
1451  i++;
1452  }else if (strcmp(argv[i], "-o") == 0){
1453  tNoOfOperations = atoi(argv[i+1]);
1454  if (tNoOfOperations < 1)
1455  return -1;;
1456  argc -= 1;
1457  i++;
1458  }else if (strcmp(argv[i], "-a") == 0){
1459  tNoOfAttributes = atoi(argv[i+1]);
1460  if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR))
1461  return -1;
1462  argc -= 1;
1463  i++;
1464  }else if (strcmp(argv[i], "-c") == 0){
1465  tNoOfTables = atoi(argv[i+1]);
1466  if ((tNoOfTables < 1) || (tNoOfTables > MAXTABLES))
1467  return -1;
1468  argc -= 1;
1469  i++;
1470  }else if (strcmp(argv[i], "-stdtables") == 0){
1471  theStdTableNameFlag = 1;
1472  }else if (strcmp(argv[i], "-l") == 0){
1473  tNoOfLoops = atoi(argv[i+1]);
1474  if ((tNoOfLoops < 0) || (tNoOfLoops > 100000))
1475  return -1;
1476  argc -= 1;
1477  i++;
1478  }else if (strcmp(argv[i], "-pool_size") == 0){
1479  t_instances = atoi(argv[i+1]);
1480  if ((t_instances < 1) || (t_instances > 240))
1481  return -1;
1482  argc -= 1;
1483  i++;
1484 #ifdef USE_MYSQL
1485  }else if (strcmp(argv[i], "-engine") == 0){
1486  engine_id = atoi(argv[i+1]);
1487  if ((engine_id < 0) || (engine_id > 3))
1488  return -1;
1489  argc -= 1;
1490  i++;
1491  }else if (strcmp(argv[i], "-socket") == 0){
1492  sockets[n_sockets] = atoi(argv[i+1]);
1493  if (sockets[n_sockets] <= 0)
1494  return -1;
1495  n_sockets++;
1496  argc -= 1;
1497  i++;
1498  }else if (strcmp(argv[i], "-use_ndb") == 0){
1499  use_ndb = true;
1500 #endif
1501  }else if (strcmp(argv[i], "-s") == 0){
1502  tAttributeSize = atoi(argv[i+1]);
1503  if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE))
1504  return -1;
1505  argc -= 1;
1506  i++;
1507  }else if (strcmp(argv[i], "-lkn") == 0){
1508  tNoOfLongPK = atoi(argv[i+1]);
1509  useLongKeys = true;
1510  if ((tNoOfLongPK < 1) || (tNoOfLongPK > MAXNOLONGKEY) ||
1511  (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
1512  ndbout << "Argument -lkn is not in the proper range." << endl;
1513  return -1;
1514  }
1515  argc -= 1;
1516  i++;
1517  }else if (strcmp(argv[i], "-lks") == 0){
1518  tSizeOfLongPK = atoi(argv[i+1]);
1519  useLongKeys = true;
1520  if ((tSizeOfLongPK < 1) || (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
1521  ndbout << "Argument -lks is not in the proper range 1 to " <<
1522  MAXLONGKEYTOTALSIZE << endl;
1523  return -1;
1524  }
1525  argc -= 1;
1526  i++;
1527  }else if (strcmp(argv[i], "-simple") == 0){
1528  theSimpleFlag = 1;
1529  }else if (strcmp(argv[i], "-write") == 0){
1530  theWriteFlag = 1;
1531  }else if (strcmp(argv[i], "-dirty") == 0){
1532  theDirtyFlag = 1;
1533  }else if (strcmp(argv[i], "-sleep") == 0){
1534  tSleepTime = atoi(argv[i+1]);
1535  if ((tSleepTime < 1) || (tSleepTime > 3600))
1536  return -1;
1537  argc -= 1;
1538  i++;
1539  }else if (strcmp(argv[i], "-no_table_create") == 0){
1540  theTableCreateFlag = 1;
1541  }else if (strcmp(argv[i], "-temp") == 0){
1542  theTempTable = true;
1543  }else if (strcmp(argv[i], "-noverify") == 0){
1544  VerifyFlag = false ;
1545  }else if (theErrorData.parseCmdLineArg(argv, i) == true){
1546  ; //empty, updated in errorArg(..)
1547  }else if (strcmp(argv[i], "-verify") == 0){
1548  VerifyFlag = true ;
1549 #ifdef CEBIT_STAT
1550  }else if (strcmp(argv[i], "-statserv") == 0){
1551  if (! (argc > 2))
1552  return -1;
1553  const char *p = argv[i+1];
1554  const char *q = strrchr(p, ':');
1555  if (q == 0)
1556  return -1;
1557  BaseString::snprintf(statHost, sizeof(statHost), "%.*s", q-p, p);
1558  statPort = atoi(q+1);
1559  statEnable = true;
1560  argc -= 1;
1561  i++;
1562  }else if (strcmp(argv[i], "-statfreq") == 0){
1563  if (! (argc > 2))
1564  return -1;
1565  statFreq = atoi(argv[i+1]);
1566  if (statFreq < 1)
1567  return -1;
1568  argc -= 1;
1569  i++;
1570 #endif
1571  }else{
1572  return -1;
1573  }
1574  argc -= 1;
1575  i++;
1576  }
1577 #ifdef USE_MYSQL
1578  if (n_sockets == 0) {
1579  n_sockets = 1;
1580  sockets[0] = 3306;
1581  }
1582 #endif
1583  return 0;
1584 }
1585 
1586 static void sleepBeforeStartingTest(int seconds){
1587  if (seconds > 0){
1588  ndbout << "Sleeping(" <<seconds << ")...";
1589  NdbSleep_SecSleep(seconds);
1590  ndbout << " done!" << endl;
1591  }
1592 }
1593 
1594 
1595 #ifdef USE_MYSQL
1596 static int
1597 dropTables(MYSQL* mysqlp){
1598  char buf[2048];
1599  for(unsigned i = 0; i < tNoOfTables; i++){
1600  int pos = 0;
1601  ndbout << "Dropping " << tableName[i] << "... ";
1602  pos += sprintf(buf+pos, "%s", "DROP TABLE ");
1603  pos += sprintf(buf+pos, "%s%s", tableName[i], ";");
1604  if (verbose)
1605  ndbout << endl << buf << endl;
1606  if (mysql_query(mysqlp, buf) != 0){
1607  ndbout << "Failed!"<<endl
1608  <<mysql_error(mysqlp)<<endl
1609  <<buf<<endl;
1610  } else
1611  ndbout << "OK!" << endl;
1612  }
1613 
1614  return 0;
1615 }
1616 #endif
1617 
1618 #ifdef USE_MYSQL
1619 static int
1620 createTables(MYSQL* mysqlp){
1621 
1622  for (Uint32 i = 0; i < tNoOfAttributes; i++){
1623  BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
1624  }
1625 
1626  // Note! Uses only uppercase letters in table name's
1627  // so that we can look at the tables with SQL
1628  for (Uint32 i = 0; i < tNoOfTables; i++){
1629  if (theStdTableNameFlag == 0){
1630  BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i,
1631  (int)(NdbTick_CurrentMillisecond() / 1000));
1632  } else {
1633  BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
1634  }
1635  }
1636 
1637  char buf[2048];
1638  for(unsigned i = 0; i < tNoOfTables; i++){
1639  int pos = 0;
1640  ndbout << "Creating " << tableName[i] << "... ";
1641 
1642  pos += sprintf(buf+pos, "%s", "CREATE TABLE ");
1643  pos += sprintf(buf+pos, "%s%s", tableName[i], " ");
1644  if(useLongKeys){
1645  for(Uint32 i = 0; i < tNoOfLongPK; i++) {
1646  }
1647  } else {
1648  pos += sprintf(buf+pos, "%s%s%s",
1649  "(", attrName[0], " int unsigned primary key");
1650  }
1651  for (unsigned j = 1; j < tNoOfAttributes; j++)
1652  pos += sprintf(buf+pos, "%s%s%s", ",", attrName[j], " int unsigned");
1653  pos += sprintf(buf+pos, "%s%s%s", ")", engine[engine_id], ";");
1654  if (verbose)
1655  ndbout << endl << buf << endl;
1656  if (mysql_query(mysqlp, buf) != 0)
1657  return -1;
1658  ndbout << "done" << endl;
1659  }
1660  return 0;
1661 }
1662 #endif
1663 
1664 static int
1665 createTables(Ndb* pMyNdb){
1666 
1667  for (Uint32 i = 0; i < tNoOfAttributes; i++){
1668  BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
1669  }
1670 
1671  // Note! Uses only uppercase letters in table name's
1672  // so that we can look at the tables with SQL
1673  for (Uint32 i = 0; i < tNoOfTables; i++){
1674  if (theStdTableNameFlag == 0){
1675  BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i,
1676  (int)(NdbTick_CurrentMillisecond() / 1000));
1677  } else {
1678  BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
1679  }
1680  }
1681 
1682  for(unsigned i = 0; i < tNoOfTables; i++){
1683  ndbout << "Creating " << tableName[i] << "... ";
1684 
1685  NdbDictionary::Table tmpTable(tableName[i]);
1686 
1687  tmpTable.setStoredTable(!theTempTable);
1688 
1689  if(useLongKeys){
1690  for(Uint32 i = 0; i < tNoOfLongPK; i++) {
1691  NdbDictionary::Column col(longKeyAttrName[i]);
1692  col.setType(NdbDictionary::Column::Unsigned);
1693  col.setLength(tSizeOfLongPK);
1694  col.setPrimaryKey(true);
1695  tmpTable.addColumn(col);
1696  }
1697  } else {
1698  NdbDictionary::Column col(attrName[0]);
1699  col.setType(NdbDictionary::Column::Unsigned);
1700  col.setLength(1);
1701  col.setPrimaryKey(true);
1702  tmpTable.addColumn(col);
1703  }
1706  col.setLength(tAttributeSize);
1707  for (unsigned j = 1; j < tNoOfAttributes; j++){
1708  col.setName(attrName[j]);
1709  tmpTable.addColumn(col);
1710  }
1711  if(pMyNdb->getDictionary()->createTable(tmpTable) == -1){
1712  return -1;
1713  }
1714  ndbout << "done" << endl;
1715  }
1716 
1717  return 0;
1718 }
1719 
1720 
1721 static void input_error(){
1722  ndbout << endl << "Invalid argument!" << endl;
1723  ndbout << endl << "Arguments:" << endl;
1724  ndbout << " -t Number of threads to start, default 1" << endl;
1725  ndbout << " -o Number of operations per loop, default 500" << endl;
1726  ndbout << " -l Number of loops to run, default 1, 0=infinite" << endl;
1727  ndbout << " -a Number of attributes, default 25" << endl;
1728  ndbout << " -c Number of tables, default 1" << endl;
1729  ndbout << " -s Size of each attribute, default 1 (Primary Key is always of size 1," << endl;
1730  ndbout << " independent of this value)" << endl;
1731  ndbout << " -lkn Number of long primary keys, default 1" << endl;
1732  ndbout << " -lks Size of each long primary key, default 1" << endl;
1733 
1734  ndbout << " -simple Use simple read to read from database" << endl;
1735  ndbout << " -dirty Use dirty read to read from database" << endl;
1736  ndbout << " -write Use writeTuple in insert and update" << endl;
1737  ndbout << " -stdtables Use standard table names" << endl;
1738  ndbout << " -no_table_create Don't create tables in db" << endl;
1739  ndbout << " -sleep Sleep a number of seconds before running the test, this" << endl;
1740  ndbout << " can be used so that another flexBench have time to create tables" << endl;
1741  ndbout << " -temp Use tables without logging" << endl;
1742  ndbout << " -verify Verify inserts, updates and deletes" << endl ;
1743  ndbout << " -use_ndb Use NDB API (otherwise use mysql client)" << endl ;
1744  ndbout << " -pool_size Number of Ndb objects in pool" << endl ;
1745  theErrorData.printCmdLineArgs(ndbout);
1746  ndbout << endl <<"Returns:" << endl;
1747  ndbout << "\t 0 - Test passed" << endl;
1748  ndbout << "\t 1 - Test failed" << endl;
1749  ndbout << "\t 2 - Invalid arguments" << endl << endl;
1750 }
1751 
1752 // vim: set sw=2: