MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mainAsyncGenerator.cpp
1 /*
2  Copyright (c) 2005, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include <NdbHost.h>
21 #include <NdbSleep.h>
22 #include <NdbThread.h>
23 #include <NdbMain.h>
24 #include <NdbOut.hpp>
25 #include <NdbEnv.h>
26 #include <NdbTest.hpp>
27 
28 #include "userInterface.h"
29 #include "dbGenerator.h"
30 #include "ndb_schema.hpp"
31 
32 
33 static int numProcesses;
34 static int numSeconds;
35 static int numWarmSeconds;
36 static int parallellism;
37 static int millisSendPoll;
38 static int minEventSendPoll;
39 static int forceSendPoll;
40 static bool useNdbRecord;
41 static bool useCombUpd;
42 
43 static ThreadData *data;
44 static Ndb_cluster_connection *g_cluster_connection= 0;
45 
46 
47 static void usage(const char *prog)
48 {
49  const char *progname;
50 
51  /*--------------------------------------------*/
52  /* Get the name of the program (without path) */
53  /*--------------------------------------------*/
54  progname = strrchr(prog, '/');
55 
56  if (progname == 0)
57  progname = prog;
58  else
59  ++progname;
60 
61  ndbout_c(
62  "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>]"
63  "[-t <num> ] [ -e <num> ] [ -f <num>] [ -ndbrecord ]\n"
64  " -proc <num> Specifies that <num> is the number of\n"
65  " threads. The default is 1.\n"
66  " -time <num> Specifies that the test will run for <num> sec.\n"
67  " The default is 10 sec\n"
68  " -warm <num> Specifies the warm-up/cooldown period of <num> "
69  "sec.\n"
70  " The default is 10 sec\n"
71  " -p <num> The no of parallell transactions started by "
72  "one thread\n"
73  " -e <num> Minimum no of events before wake up in call to "
74  "sendPoll\n"
75  " Default is 1\n"
76  " -f <num> force parameter to sendPoll\n"
77  " Default is 0\n"
78  " -ndbrecord Use NdbRecord Api.\n"
79  " Default is to use old Api\n"
80  " -combupdread Use update pre-read operation where possible\n"
81  " Default is to use separate read+update ops\n",
82  progname);
83 }
84 
85 static
86 int
87 parse_args(int argc, const char **argv)
88 {
89  int i;
90 
91  numProcesses = 1;
92  numSeconds = 10;
93  numWarmSeconds = 10;
94  parallellism = 1;
95  millisSendPoll = 10000;
96  minEventSendPoll = 1;
97  forceSendPoll = 0;
98  useNdbRecord = false;
99  useCombUpd = false;
100 
101  i = 1;
102  while (i < argc){
103  if (strcmp("-proc",argv[i]) == 0) {
104  if (i + 1 >= argc) {
105  return 1;
106  }
107  if (sscanf(argv[i+1], "%d", &numProcesses) == -1 ||
108  numProcesses <= 0 || numProcesses > 127) {
109  ndbout_c("-proc flag requires a positive integer argument [1..127]");
110  return 1;
111  }
112  i += 2;
113  } else if (strcmp("-p", argv[i]) == 0){
114  if(i + 1 >= argc){
115  usage(argv[0]);
116  return 1;
117  }
118  if (sscanf(argv[i+1], "%d", &parallellism) == -1 ||
119  parallellism <= 0){
120  ndbout_c("-p flag requires a positive integer argument");
121  return 1;
122  }
123  i += 2;
124  }
125  else if (strcmp("-time",argv[i]) == 0) {
126  if (i + 1 >= argc) {
127  return 1;
128  }
129  if (sscanf(argv[i+1], "%d", &numSeconds) == -1 ||
130  numSeconds < 0) {
131  ndbout_c("-time flag requires a positive integer argument");
132  return 1;
133  }
134  i += 2;
135  }
136  else if (strcmp("-warm",argv[i]) == 0) {
137  if (i + 1 >= argc) {
138  return 1;
139  }
140  if (sscanf(argv[i+1], "%d", &numWarmSeconds) == -1 ||
141  numWarmSeconds < 0) {
142  ndbout_c("-warm flag requires a positive integer argument");
143  return 1;
144  }
145  i += 2;
146  }
147  else if (strcmp("-e",argv[i]) == 0) {
148  if (i + 1 >= argc) {
149  return 1;
150  }
151  if (sscanf(argv[i+1], "%d", &minEventSendPoll) == -1 ||
152  minEventSendPoll < 0) {
153  ndbout_c("-e flag requires a positive integer argument");
154  return 1;
155  }
156  i += 2;
157  }
158  else if (strcmp("-f",argv[i]) == 0) {
159  if (i + 1 >= argc) {
160  usage(argv[0]);
161  return 1;
162  }
163  if (sscanf(argv[i+1], "%d", &forceSendPoll) == -1 ||
164  forceSendPoll < 0) {
165  ndbout_c("-f flag requires a positive integer argument");
166  return 1;
167  }
168  i += 2;
169  }
170  else if (strcmp("-ndbrecord",argv[i]) == 0) {
171  useNdbRecord= true;
172  i++;
173  }
174  else if (strcmp("-combupdread",argv[i]) == 0) {
175  /* Comb up some dread */
176  useCombUpd= true;
177  i++;
178  }
179  else {
180  return 1;
181  }
182  }
183 
184  if(minEventSendPoll > parallellism){
185  ndbout_c("minEventSendPoll(%d) > parallellism(%d)",
186  minEventSendPoll, parallellism);
187  ndbout_c("not very good...");
188  ndbout_c("very bad...");
189  ndbout_c("exiting...");
190  return 1;
191  }
192  if (useNdbRecord && useCombUpd){
193  ndbout_c("NdbRecord does not currently support combined update "
194  "and read. Using separate read and update ops");
195  }
196  return 0;
197 }
198 
199 static
200 void
201 print_transaction(const char *header,
202  unsigned long totalCount,
203  TransactionDefinition *trans,
204  unsigned int printBranch,
205  unsigned int printRollback)
206 {
207  double f;
208 
209  ndbout_c(" %s: %d (%.2f%%) "
210  "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
211  header,
212  trans->count,
213  (double)trans->count / (double)totalCount * 100.0,
214  (int)trans->latency.getMean(),
215  (int)trans->latency.getMin(),
216  (int)trans->latency.getMax(),
217  (int)trans->latency.getStddev(),
218  (int)trans->latency.getCount()
219  );
220 
221  if( printBranch ){
222  if( trans->count == 0 )
223  f = 0.0;
224  else
225  f = (double)trans->branchExecuted / (double)trans->count * 100.0;
226  ndbout_c(" Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
227  }
228 
229  if( printRollback ){
230  if( trans->count == 0 )
231  f = 0.0;
232  else
233  f = (double)trans->rollbackExecuted / (double)trans->count * 100.0;
234  ndbout_c(" Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
235  }
236 }
237 
238 void
239 print_stats(const char *title,
240  unsigned int length,
241  unsigned int transactionFlag,
242  GeneratorStatistics *gen,
243  int numProc, int parallellism)
244 {
245  int i;
246  char buf[10];
247  char name[MAXHOSTNAMELEN];
248 
249  name[0] = 0;
250  NdbHost_GetHostName(name);
251 
252  ndbout_c("\n------ %s ------",title);
253  ndbout_c("Length : %d %s",
254  length,
255  transactionFlag ? "Transactions" : "sec");
256  ndbout_c("Processor : %s", name);
257  ndbout_c("Number of Proc: %d",numProc);
258  ndbout_c("Parallellism : %d", parallellism);
259  ndbout_c("UseNdbRecord : %u", useNdbRecord);
260  ndbout_c("\n");
261 
262  if( gen->totalTransactions == 0 ) {
263  ndbout_c(" No Transactions for this test");
264  }
265  else {
266  for(i = 0; i < 5; i++) {
267  sprintf(buf, "T%d",i+1);
268  print_transaction(buf,
269  gen->totalTransactions,
270  &gen->transactions[i],
271  i >= 2,
272  i >= 3 );
273  }
274 
275  ndbout_c("\n");
276  ndbout_c(" Overall Statistics:");
277  ndbout_c(" Transactions: %d", gen->totalTransactions);
278  ndbout_c(" Outer : %.0f TPS",gen->outerTps);
279  ndbout_c("\n");
280  ndbout_c("NDBT_Observation;tps;%.0f", gen->outerTps);
281  }
282 }
283 
284 static
285 void *
286 threadRoutine(void *arg)
287 {
288  int i;
289  ThreadData *data = (ThreadData *)arg;
290  Ndb * pNDB;
291 
292  pNDB = asyncDbConnect(parallellism);
293  /* NdbSleep_MilliSleep(rand() % 10); */
294 
295  for(i = 0; i<parallellism; i++){
296  data[i].pNDB = pNDB;
297  }
298  millisSendPoll = 30000;
299  asyncGenerator(data, parallellism,
300  millisSendPoll, minEventSendPoll, forceSendPoll);
301 
302  asyncDbDisconnect(pNDB);
303 
304  return NULL;
305 }
306 
307 NDB_COMMAND(DbAsyncGenerator, "DbAsyncGenerator",
308  "DbAsyncGenerator", "DbAsyncGenerator", 65535)
309 {
310  ndb_init();
311  int i;
312  int j;
313  int k;
314  struct NdbThread* pThread = NULL;
317  char threadName[32];
318  int rc = NDBT_OK;
319  void* tmp = NULL;
320  if(parse_args(argc,argv) != 0){
321  usage(argv[0]);
322  return NDBT_ProgramExit(NDBT_WRONGARGS);
323  }
324 
325 
326  ndbout_c("\nStarting Test with %d process(es) for %d %s parallellism %d",
327  numProcesses,
328  numSeconds,
329  "sec",
330  parallellism);
331 
332  ndbout_c(" WarmUp/coolDown = %d sec", numWarmSeconds);
333 
335  if(con.connect(12, 5, 1) != 0)
336  {
337  ndbout << "Unable to connect to management server." << endl;
338  return 0;
339  }
340  if (con.wait_until_ready(30,0) < 0)
341  {
342  ndbout << "Cluster nodes not ready in 30 seconds." << endl;
343  return 0;
344  }
345 
346  NdbRecordSharedData* ndbRecordSharedDataPtr= NULL;
347 
348  g_cluster_connection= &con;
349  data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
350 
351  NdbInterpretedCode* prog1= 0;
352  NdbInterpretedCode* prog2= 0;
353  NdbInterpretedCode* prog3= 0;
354 
355  if (useNdbRecord)
356  {
357  /* We'll create NdbRecord structures to match the TransactionData
358  * struct
359  */
360 
361  ndbRecordSharedDataPtr= (NdbRecordSharedData*)
362  malloc(sizeof(NdbRecordSharedData));
363  Ndb* tempNdb= asyncDbConnect(1);
364  NdbDictionary::Dictionary* dict= tempNdb->getDictionary();
365 
367 
368  const NdbDictionary::Table* tab= dict->getTable(SUBSCRIBER_TABLE);
369  cols[0].column= tab->getColumn((int) IND_SUBSCRIBER_NUMBER);
370  cols[0].offset= offsetof(TransactionData, number);
371  cols[0].nullbit_byte_offset= 0;
372  cols[0].nullbit_bit_in_byte= 0;
373  cols[1].column= tab->getColumn((int) IND_SUBSCRIBER_NAME);
374  cols[1].offset= offsetof(TransactionData, name);
375  cols[1].nullbit_byte_offset= 0;
376  cols[1].nullbit_bit_in_byte= 0;
377  cols[2].column= tab->getColumn((int) IND_SUBSCRIBER_GROUP);
378  cols[2].offset= offsetof(TransactionData, group_id);
379  cols[2].nullbit_byte_offset= 0;
380  cols[2].nullbit_bit_in_byte= 0;
381  cols[3].column= tab->getColumn((int) IND_SUBSCRIBER_LOCATION);
382  cols[3].offset= offsetof(TransactionData, location);
383  cols[3].nullbit_byte_offset= 0;
384  cols[3].nullbit_bit_in_byte= 0;
385  cols[4].column= tab->getColumn((int) IND_SUBSCRIBER_SESSIONS);
386  cols[4].offset= offsetof(TransactionData, sessions);
387  cols[4].nullbit_byte_offset= 0;
388  cols[4].nullbit_bit_in_byte= 0;
389  cols[5].column= tab->getColumn((int) IND_SUBSCRIBER_CHANGED_BY);
390  cols[5].offset= offsetof(TransactionData, changed_by);
391  cols[5].nullbit_byte_offset= 0;
392  cols[5].nullbit_bit_in_byte= 0;
393  cols[6].column= tab->getColumn((int) IND_SUBSCRIBER_CHANGED_TIME);
394  cols[6].offset= offsetof(TransactionData, changed_time);
395  cols[6].nullbit_byte_offset= 0;
396  cols[6].nullbit_bit_in_byte= 0;
397 
398  ndbRecordSharedDataPtr->subscriberTableNdbRecord=
399  dict->createRecord(tab, cols, 7, sizeof(cols[0]), 0);
400 
401  if (ndbRecordSharedDataPtr->subscriberTableNdbRecord == NULL)
402  {
403  ndbout << "Error creating record 1 : " << dict->getNdbError() << endl;
404  return -1;
405  }
406 
407  tab= dict->getTable(GROUP_TABLE);
408  cols[0].column= tab->getColumn((int) IND_GROUP_ID);
409  cols[0].offset= offsetof(TransactionData, group_id);
410  cols[0].nullbit_byte_offset= 0;
411  cols[0].nullbit_bit_in_byte= 0;
412  /* GROUP_NAME not used via NdbRecord */
413  cols[1].column= tab->getColumn((int) IND_GROUP_ALLOW_READ);
414  cols[1].offset= offsetof(TransactionData, permission);
415  cols[1].nullbit_byte_offset= 0;
416  cols[1].nullbit_bit_in_byte= 0;
417 
418  ndbRecordSharedDataPtr->groupTableAllowReadNdbRecord=
419  dict->createRecord(tab, cols, 2, sizeof(cols[0]), 0);
420 
421  if (ndbRecordSharedDataPtr->groupTableAllowReadNdbRecord == NULL)
422  {
423  ndbout << "Error creating record 2.1: " << dict->getNdbError() << endl;
424  return -1;
425  }
426 
427  cols[1].column= tab->getColumn((int) IND_GROUP_ALLOW_INSERT);
428  cols[1].offset= offsetof(TransactionData, permission);
429  cols[1].nullbit_byte_offset= 0;
430  cols[1].nullbit_bit_in_byte= 0;
431 
432  ndbRecordSharedDataPtr->groupTableAllowInsertNdbRecord=
433  dict->createRecord(tab, cols, 2, sizeof(cols[0]), 0);
434 
435  if (ndbRecordSharedDataPtr->groupTableAllowInsertNdbRecord == NULL)
436  {
437  ndbout << "Error creating record 2.2: " << dict->getNdbError() << endl;
438  return -1;
439  }
440 
441  cols[1].column= tab->getColumn((int) IND_GROUP_ALLOW_DELETE);
442  cols[1].offset= offsetof(TransactionData, permission);
443  cols[1].nullbit_byte_offset= 0;
444  cols[1].nullbit_bit_in_byte= 0;
445 
446  ndbRecordSharedDataPtr->groupTableAllowDeleteNdbRecord=
447  dict->createRecord(tab, cols, 2, sizeof(cols[0]), 0);
448 
449  if (ndbRecordSharedDataPtr->groupTableAllowDeleteNdbRecord == NULL)
450  {
451  ndbout << "Error creating record 2.3: " << dict->getNdbError() << endl;
452  return -1;
453  }
454 
455  tab= dict->getTable(SESSION_TABLE);
456  cols[0].column= tab->getColumn((int) IND_SESSION_SUBSCRIBER);
457  cols[0].offset= offsetof(TransactionData, number);
458  cols[0].nullbit_byte_offset= 0;
459  cols[0].nullbit_bit_in_byte= 0;
460  cols[1].column= tab->getColumn((int) IND_SESSION_SERVER);
461  cols[1].offset= offsetof(TransactionData, server_id);
462  cols[1].nullbit_byte_offset= 0;
463  cols[1].nullbit_bit_in_byte= 0;
464  cols[2].column= tab->getColumn((int) IND_SESSION_DATA);
465  cols[2].offset= offsetof(TransactionData, session_details);
466  cols[2].nullbit_byte_offset= 0;
467  cols[2].nullbit_bit_in_byte= 0;
468 
469  ndbRecordSharedDataPtr->sessionTableNdbRecord=
470  dict->createRecord(tab, cols, 3, sizeof(cols[0]), 0);
471 
472  if (ndbRecordSharedDataPtr->sessionTableNdbRecord == NULL)
473  {
474  ndbout << "Error creating record 3 : " << dict->getNdbError() << endl;
475  return -1;
476  }
477 
478  tab= dict->getTable(SERVER_TABLE);
479  cols[0].column= tab->getColumn((int) IND_SERVER_SUBSCRIBER_SUFFIX);
480  cols[0].offset= offsetof(TransactionData, suffix);
481  cols[0].nullbit_byte_offset= 0;
482  cols[0].nullbit_bit_in_byte= 0;
483  cols[1].column= tab->getColumn((int) IND_SERVER_ID);
484  cols[1].offset= offsetof(TransactionData, server_id);
485  cols[1].nullbit_byte_offset= 0;
486  cols[1].nullbit_bit_in_byte= 0;
487  /* SERVER_NAME not used via NdbRecord*/
488  /* SERVER_READS not used via NdbRecord */
489  /* SERVER_INSERTS not used via NdbRecord */
490  /* SERVER_DELETES not used via NdbRecord */
491 
492  ndbRecordSharedDataPtr->serverTableNdbRecord=
493  dict->createRecord(tab, cols, 2, sizeof(cols[0]), 0);
494 
495  if (ndbRecordSharedDataPtr->serverTableNdbRecord == NULL)
496  {
497  ndbout << "Error creating record 4 : " << dict->getNdbError() << endl;
498  return -1;
499  }
500 
501  /* Create program to increment server reads column */
502  prog1= new NdbInterpretedCode(tab);
503 
504  if (prog1->add_val(IND_SERVER_READS, (Uint32)1) ||
505  prog1->interpret_exit_ok() ||
506  prog1->finalise())
507  {
508  ndbout << "Program 1 definition failed, exiting." << endl;
509  return -1;
510  }
511 
512  prog2= new NdbInterpretedCode(tab);
513 
514  if (prog2->add_val(IND_SERVER_INSERTS, (Uint32)1) ||
515  prog2->interpret_exit_ok() ||
516  prog2->finalise())
517  {
518  ndbout << "Program 2 definition failed, exiting." << endl;
519  return -1;
520  }
521 
522  prog3= new NdbInterpretedCode(tab);
523 
524  if (prog3->add_val(IND_SERVER_DELETES, (Uint32)1) ||
525  prog3->interpret_exit_ok() ||
526  prog3->finalise())
527  {
528  ndbout << "Program 3 definition failed, exiting." << endl;
529  return -1;
530  }
531 
532  ndbRecordSharedDataPtr->incrServerReadsProg= prog1;
533  ndbRecordSharedDataPtr->incrServerInsertsProg= prog2;
534  ndbRecordSharedDataPtr->incrServerDeletesProg= prog3;
535 
536  asyncDbDisconnect(tempNdb);
537  }
538 
539  for(i = 0; i < numProcesses; i++) {
540  for(j = 0; j<parallellism; j++){
541  int tid= i*parallellism + j;
542  data[tid].warmUpSeconds = numWarmSeconds;
543  data[tid].testSeconds = numSeconds;
544  data[tid].coolDownSeconds = numWarmSeconds;
545  data[tid].randomSeed =
546  (unsigned long)(NdbTick_CurrentMillisecond()+i+j);
547  data[tid].changedTime = 0;
548  data[tid].runState = Runnable;
549  data[tid].ndbRecordSharedData = ndbRecordSharedDataPtr;
550  data[tid].useCombinedUpdate = useCombUpd;
551  }
552  sprintf(threadName, "AsyncThread[%d]", i);
553  pThread = NdbThread_Create(threadRoutine,
554  (void**)&data[i*parallellism],
555  65535,
556  threadName,
557  NDB_THREAD_PRIO_LOW);
558  if(pThread != 0 && pThread != NULL){
559  (&data[i*parallellism])->pThread = pThread;
560  } else {
561  perror("Failed to create thread");
562  rc = NDBT_FAILED;
563  }
564  }
565 
566  showTime();
567 
568  /*--------------------------------*/
569  /* Wait for all processes to exit */
570  /*--------------------------------*/
571  for(i = 0; i < numProcesses; i++) {
572  NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
573  NdbThread_Destroy(&data[i*parallellism].pThread);
574  }
575 
576  ndbout_c("All threads have finished");
577 
578  if (useNdbRecord)
579  {
580  free(ndbRecordSharedDataPtr);
581  delete(prog1);
582  delete(prog2);
583  delete(prog3);
584  }
585 
586  /*-------------------------------------------*/
587  /* Clear all structures for total statistics */
588  /*-------------------------------------------*/
589  stats.totalTransactions = 0;
590  stats.outerTps = 0.0;
591 
592  for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
593  stats.transactions[i].count = 0;
594  stats.transactions[i].branchExecuted = 0;
595  stats.transactions[i].rollbackExecuted = 0;
596  stats.transactions[i].latency.reset();
597  }
598 
599  /*--------------------------------*/
600  /* Add the values for all Threads */
601  /*--------------------------------*/
602  for(i = 0; i < numProcesses; i++) {
603  for(k = 0; k<parallellism; k++){
604  p = &data[i*parallellism+k].generator;
605 
606  stats.totalTransactions += p->totalTransactions;
607  stats.outerTps += p->outerTps;
608 
609  for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
610  stats.transactions[j].count +=
611  p->transactions[j].count;
612  stats.transactions[j].branchExecuted +=
613  p->transactions[j].branchExecuted;
614  stats.transactions[j].rollbackExecuted +=
615  p->transactions[j].rollbackExecuted;
616  stats.transactions[j].latency +=
617  p->transactions[j].latency;
618  }
619  }
620  }
621 
622  print_stats("Test Results",
623  numSeconds,
624  0,
625  &stats,
626  numProcesses,
627  parallellism);
628 
629  free(data);
630 
631  NDBT_ProgramExit(rc);
632 }
633 /***************************************************************
634 * I N C L U D E D F I L E S *
635 ***************************************************************/
636 
637 #include <stdio.h>
638 #include <stdlib.h>
639 #include <sys/types.h>
640 #include <time.h>
641 
642 #include "ndb_error.hpp"
643 #include "userInterface.h"
644 #include <NdbMutex.h>
645 #include <NdbThread.h>
646 #include <NdbTick.h>
647 #include <NdbApi.hpp>
648 #include <NdbOut.hpp>
649 
650 /***************************************************************
651 * L O C A L C O N S T A N T S *
652 ***************************************************************/
653 
654 /***************************************************************
655 * L O C A L D A T A S T R U C T U R E S *
656 ***************************************************************/
657 
658 /***************************************************************
659 * L O C A L F U N C T I O N S *
660 ***************************************************************/
661 
662 #ifndef NDB_WIN32
663 #include <unistd.h>
664 #endif
665 
666 Ndb*
667 asyncDbConnect(int parallellism){
668  Ndb * pNDB = new Ndb(g_cluster_connection, "TEST_DB");
669 
670  pNDB->init(parallellism + 1);
671 
672  while(pNDB->waitUntilReady() != 0){
673  }
674 
675  return pNDB;
676 }
677 
678 void
679 asyncDbDisconnect(Ndb* pNDB)
680 {
681  delete pNDB;
682 }
683 
684 double
685 userGetTime(void)
686 {
687  static bool initialized = false;
688  static NDB_TICKS initSecs = 0;
689  static Uint32 initMicros = 0;
690  double timeValue = 0;
691 
692  if ( !initialized ) {
693  initialized = true;
694  NdbTick_CurrentMicrosecond(&initSecs, &initMicros);
695  timeValue = 0.0;
696  } else {
697  NDB_TICKS secs = 0;
698  Uint32 micros = 0;
699 
700  NdbTick_CurrentMicrosecond(&secs, &micros);
701  double s = (double)secs - (double)initSecs;
702  double us = (double)micros - (double)initMicros;
703 
704  timeValue = s + (us / 1000000.0);
705  }
706  return timeValue;
707 }
708 
709 void showTime()
710 {
711  char buf[128];
712  struct tm* tm_now;
713  time_t now;
714  now = ::time((time_t*)NULL);
715  tm_now = ::gmtime(&now);
716 
717  BaseString::snprintf(buf, 128,
718  "%d-%.2d-%.2d %.2d:%.2d:%.2d",
719  tm_now->tm_year + 1900,
720  tm_now->tm_mon,
721  tm_now->tm_mday,
722  tm_now->tm_hour,
723  tm_now->tm_min,
724  tm_now->tm_sec);
725 
726  ndbout_c("Time: %s", buf);
727 }
728