MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
msa.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include <NdbApi.hpp>
21 #include <NdbSchemaCon.hpp>
22 #include <NdbCondition.h>
23 #include <NdbMutex.h>
24 #include <NdbSleep.h>
25 #include <NdbThread.h>
26 #include <NdbTick.h>
27 #include <NdbOut.hpp>
28 
29 const char* const c_szDatabaseName = "TEST_DB";
30 
31 const char* const c_szTableNameStored = "CCStored";
32 const char* const c_szTableNameTemp = "CCTemp";
33 
34 const char* const c_szContextId = "ContextId";
35 const char* const c_szVersion = "Version";
36 const char* const c_szLockFlag = "LockFlag";
37 const char* const c_szLockTime = "LockTime";
38 const char* const c_szLockTimeUSec = "LockTimeUSec";
39 const char* const c_szContextData = "ContextData";
40 
41 const char* g_szTableName = c_szTableNameStored;
42 
43 
44 #ifdef NDB_WIN32
45 HANDLE hShutdownEvent = 0;
46 #else
47 bool bShutdownEvent = false;
48 #endif
49 long g_nMaxContextIdPerThread = 5000;
50 long g_nNumThreads = 0;
51 long g_nMaxCallsPerSecond = 0;
52 long g_nMaxRetry = 50;
53 bool g_bWriteTuple = false;
54 bool g_bInsertInitial = false;
55 bool g_bVerifyInitial = false;
56 
57 Ndb_cluster_connection* theConnection = 0;
58 NdbMutex* g_pNdbMutexPrintf = 0;
59 NdbMutex* g_pNdbMutexIncrement = 0;
60 long g_nNumCallsProcessed = 0;
61 NDB_TICKS g_tStartTime = 0;
62 NDB_TICKS g_tEndTime = 0;
63 
64 long g_nNumberOfInitialInsert = 0;
65 long g_nNumberOfInitialVerify = 0;
66 
67 const long c_nMaxMillisecForAllCall = 5000;
68 long* g_plCountMillisecForCall = 0;
69 const long c_nMaxMillisecForAllTrans = 5000;
70 long* g_plCountMillisecForTrans = 0;
71 bool g_bReport = false;
72 bool g_bReportPlus = false;
73 
74 
75 // data for CALL_CONTEXT and GROUP_RESOURCE
76 static char STATUS_DATA[]=
77 "000102030405060708090A0B0C0D0E0F000102030405060708090A0B0C0D0E0F"
78 "101112131415161718191A1B1C1D1E1F000102030405060708090A0B0C0D0E0F"
79 "202122232425262728292A2B2C2D2E2F000102030405060708090A0B0C0D0E0F"
80 "303132333435363738393A3B3C3D3E3F000102030405060708090A0B0C0D0E0F"
81 "404142434445464748494A4B4C4D4E4F000102030405060708090A0B0C0D0E0F"
82 "505152535455565758595A5B5C5D5E5F000102030405060708090A0B0C0D0E0F"
83 "606162636465666768696A6B6C6D6E6F000102030405060708090A0B0C0D0E0F"
84 "707172737475767778797A7B7C7D7E7F000102030405060708090A0B0C0D0E0F"
85 "808182838485868788898A8B8C8D8E8F000102030405060708090A0B0C0D0E0F"
86 "909192939495969798999A9B9C9D9E9F000102030405060708090A0B0C0D0E0F"
87 "10010110210310410510610710810910A000102030405060708090A0B0C0D0EF"
88 "10B10C10D10E10F110111112113114115000102030405060708090A0B0C0D0EF"
89 "11611711811911A11B11C11D11E11F120000102030405060708090A0B0C0D0EF"
90 "12112212312412512612712812912A12B000102030405060708090A0B0C0D0EF"
91 "12C12D12E12F130131132134135136137000102030405060708090A0B0C0D0EF"
92 "13813913A13B13C13D13E13F140141142000102030405060708090A0B0C0D0EF"
93 "14314414514614714814914A14B14C14D000102030405060708090A0B0C0D0EF"
94 "14E14F150151152153154155156157158000102030405060708090A0B0C0D0EF"
95 "15915A15B15C15D15E15F160161162163000102030405060708090A0B0C0D0EF"
96 "16416516616716816916A16B16C16D16E000102030405060708090A0B0C0D0EF"
97 "16F170171172173174175176177178179000102030405060708090A0B0C0D0EF"
98 "17A17B17C17D17E17F180181182183184000102030405060708090A0B0C0D0EF"
99 "18518618718818918A18B18C18D18E18F000102030405060708090A0B0C0D0EF"
100 "19019119219319419519619719819919A000102030405060708090A0B0C0D0EF"
101 "19B19C19D19E19F200201202203204205000102030405060708090A0B0C0D0EF"
102 "20620720820920A20B20C20D20F210211000102030405060708090A0B0C0D0EF"
103 "21221321421521621721821921A21B21C000102030405060708090A0B0C0D0EF"
104 "21D21E21F220221222223224225226227000102030405060708090A0B0C0D0EF"
105 "22822922A22B22C22D22E22F230231232000102030405060708090A0B0C0D0EF"
106 "23323423523623723823923A23B23C23D000102030405060708090A0B0C0D0EF"
107 "23E23F240241242243244245246247248000102030405060708090A0B0C0D0EF"
108 "24924A24B24C24D24E24F250251252253000102030405060708090A0B0C0D0EF"
109 "101112131415161718191A1B1C1D1E1F000102030405060708090A0B0C0D0E0F"
110 "202122232425262728292A2B2C2D2E2F000102030405060708090A0B0C0D0E0F"
111 "303132333435363738393A3B3C3D3E3F000102030405060708090A0B0C0D0E0F"
112 "404142434445464748494A4B4C4D4E4F000102030405060708090A0B0C0D0E0F"
113 "505152535455565758595A5B5C5D5E5F000102030405060708090A0B0C0D0E0F"
114 "606162636465666768696A6B6C6D6E6F000102030405060708090A0B0C0D0E0F"
115 "707172737475767778797A7B7C7D7E7F000102030405060708090A0B0C0D0E0F"
116 "808182838485868788898A8B8C8D8E8F000102030405060708090A0B0C0D0E0F"
117 "909192939495969798999A9B9C9D9E9F000102030405060708090A0B0C0D0E0F"
118 "10010110210310410510610710810910A000102030405060708090A0B0C0D0EF"
119 "10B10C10D10E10F110111112113114115000102030405060708090A0B0C0D0EF"
120 "11611711811911A11B11C11D11E11F120000102030405060708090A0B0C0D0EF"
121 "12112212312412512612712812912A12B000102030405060708090A0B0C0D0EF"
122 "12C12D12E12F130131132134135136137000102030405060708090A0B0C0D0EF"
123 "13813913A13B13C13D13E13F140141142000102030405060708090A0B0C0D0EF"
124 "14314414514614714814914A14B14C14D000102030405060708090A0B0C0D0EF"
125 "14E14F150151152153154155156157158000102030405060708090A0B0C0D0EF"
126 "15915A15B15C15D15E15F160161162163000102030405060708090A0B0C0D0EF"
127 "16416516616716816916A16B16C16D16E000102030405060708090A0B0C0D0EF"
128 "16F170171172173174175176177178179000102030405060708090A0B0C0D0EF"
129 "17A17B17C17D17E17F180181182183184000102030405060708090A0B0C0D0EF"
130 "18518618718818918A18B18C18D18E18F000102030405060708090A0B0C0D0EF"
131 "19019119219319419519619719819919A000102030405060708090A0B0C0D0EF"
132 "19B19C19D19E19F200201202203204205000102030405060708090A0B0C0D0EF"
133 "20620720820920A20B20C20D20F210211000102030405060708090A0B0C0D0EF"
134 "21221321421521621721821921A21B21C000102030405060708090A0B0C0D0EF"
135 "21D21E21F220221222223224225226227000102030405060708090A0B0C0D0EF"
136 "22822922A22B22C22D22E22F230231232000102030405060708090A0B0C0D0EF"
137 "23323423523623723823923A23B23C23D000102030405060708090A0B0C0D0EF"
138 "2366890FE1438751097E7F6325DC0E6326F"
139 "25425525625725825925A25B25C25D25E25F000102030405060708090A0B0C0F";
140 
141 long g_nStatusDataSize = sizeof(STATUS_DATA);
142 
143 
144 // Thread function for Call Context Inserts
145 
146 
147 #ifdef NDB_WIN32
148 
149 BOOL WINAPI ConsoleCtrlHandler(DWORD dwCtrlType)
150 {
151  if(CTRL_C_EVENT == dwCtrlType)
152  {
153  SetEvent(hShutdownEvent);
154  return TRUE;
155  }
156  return FALSE;
157 }
158 
159 #else
160 
161 void CtrlCHandler(int)
162 {
163  bShutdownEvent = true;
164 }
165 
166 #endif
167 
168 
169 
170 void ReportNdbError(const char* szMsg, const NdbError& err)
171 {
172  NdbMutex_Lock(g_pNdbMutexPrintf);
173  printf("%s: %d: %s\n", szMsg, err.code, (err.message ? err.message : ""));
174  NdbMutex_Unlock(g_pNdbMutexPrintf);
175 }
176 
177 
178 void
179 ReportCallsPerSecond(long nNumCallsProcessed,
180  NDB_TICKS tStartTime,
181  NDB_TICKS tEndTime)
182 {
183  NDB_TICKS tElapsed = tEndTime - tStartTime;
184  long lCallsPerSec;
185  if(tElapsed>0)
186  lCallsPerSec = (long)((1000*nNumCallsProcessed)/tElapsed);
187  else
188  lCallsPerSec = 0;
189 
190  NdbMutex_Lock(g_pNdbMutexPrintf);
191  printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)\n",
192  nNumCallsProcessed, (long)tElapsed, lCallsPerSec);
193  NdbMutex_Unlock(g_pNdbMutexPrintf);
194 }
195 
196 
197 #ifndef NDB_WIN32
198 void InterlockedIncrement(long* lp) // expensive
199 {
200  NdbMutex_Lock(g_pNdbMutexIncrement);
201  (*lp)++;
202  NdbMutex_Unlock(g_pNdbMutexIncrement);
203 }
204 #endif
205 
206 
207 void InterlockedIncrementAndReport(void)
208 {
209  NdbMutex_Lock(g_pNdbMutexIncrement);
210  ++g_nNumCallsProcessed;
211  if((g_nNumCallsProcessed%1000)==0)
212  {
213  g_tEndTime = NdbTick_CurrentMillisecond();
214  if(g_tStartTime)
215  ReportCallsPerSecond(1000, g_tStartTime, g_tEndTime);
216 
217  g_tStartTime = g_tEndTime;
218  }
219  NdbMutex_Unlock(g_pNdbMutexIncrement);
220 }
221 
222 
223 void SleepOneCall(void)
224 {
225  int iMillisecToSleep;
226  if(g_nMaxCallsPerSecond>0)
227  iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond;
228  else
229  iMillisecToSleep = 50;
230 
231  if(iMillisecToSleep>0)
232  NdbSleep_MilliSleep(iMillisecToSleep);
233 
234 }
235 
236 
237 
238 int QueryTransaction(Ndb* pNdb,
239  long iContextId,
240  long* piVersion,
241  long* piLockFlag,
242  long* piLockTime,
243  long* piLockTimeUSec,
244  char* pchContextData,
245  NdbError& err)
246 {
247  int iRes = -1;
248  NdbConnection* pNdbConnection = pNdb->startTransaction();
249  //0, (const char*)&iContextId, 4);
250  if(pNdbConnection)
251  {
252  NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
253  if(pNdbOperation)
254  {
255  NdbRecAttr* pNdbRecAttrVersion;
256  NdbRecAttr* pNdbRecAttrLockFlag;
257  NdbRecAttr* pNdbRecAttrLockTime;
258  NdbRecAttr* pNdbRecAttrLockTimeUSec;
259  NdbRecAttr* pNdbRecAttrContextData;
260  if(!pNdbOperation->readTuple()
261  && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)
262  && (pNdbRecAttrVersion=pNdbOperation->getValue(c_szVersion, (char*)piVersion))
263  && (pNdbRecAttrLockFlag=pNdbOperation->getValue(c_szLockFlag, (char*)piLockFlag))
264  && (pNdbRecAttrLockTime=pNdbOperation->getValue(c_szLockTime, (char*)piLockTime))
265  && (pNdbRecAttrLockTimeUSec=pNdbOperation->getValue(c_szLockTimeUSec, (char*)piLockTimeUSec))
266  && (pNdbRecAttrContextData=pNdbOperation->getValue(c_szContextData, pchContextData)))
267  {
268  if(!pNdbConnection->execute(Commit))
269  iRes = 0;
270  else
271  err = pNdbConnection->getNdbError();
272  }
273  else
274  err = pNdbOperation->getNdbError();
275  }
276  else
277  err = pNdbConnection->getNdbError();
278 
279  pNdb->closeTransaction(pNdbConnection);
280  }
281  else
282  err = pNdb->getNdbError();
283 
284  return iRes;
285 }
286 
287 
288 int RetryQueryTransaction(Ndb* pNdb,
289  long iContextId,
290  long* piVersion,
291  long* piLockFlag,
292  long* piLockTime,
293  long* piLockTimeUSec,
294  char* pchContextData,
295  NdbError& err,
296  int& nRetry)
297 {
298  int iRes = -1;
299  nRetry = 0;
300  bool bRetry = true;
301  while(bRetry && nRetry<g_nMaxRetry)
302  {
303  if(!QueryTransaction(pNdb, iContextId, piVersion, piLockFlag,
304  piLockTime, piLockTimeUSec, pchContextData, err))
305  {
306  iRes = 0;
307  bRetry = false;
308  }
309  else
310  {
311  switch(err.status)
312  {
315  SleepOneCall();
316  ++nRetry;
317  break;
318 
320  default:
321  bRetry = false;
322  break;
323  }
324  }
325  }
326  return iRes;
327 }
328 
329 
330 int DeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err)
331 {
332  int iRes = -1;
333  NdbConnection* pNdbConnection = pNdb->startTransaction();
334  //0, (const char*)&iContextId, 4);
335  if(pNdbConnection)
336  {
337  NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
338  if(pNdbOperation)
339  {
340  if(!pNdbOperation->deleteTuple()
341  && !pNdbOperation->equal(c_szContextId, (Int32)iContextId))
342  {
343  if(pNdbConnection->execute(Commit) == 0)
344  iRes = 0;
345  else
346  err = pNdbConnection->getNdbError();
347  }
348  else
349  err = pNdbOperation->getNdbError();
350  }
351  else
352  err = pNdbConnection->getNdbError();
353 
354  pNdb->closeTransaction(pNdbConnection);
355  }
356  else
357  err = pNdb->getNdbError();
358 
359  return iRes;
360 }
361 
362 
363 
364 int RetryDeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry)
365 {
366  int iRes = -1;
367  nRetry = 0;
368  bool bRetry = true;
369  bool bUnknown = false;
370  while(bRetry && nRetry<g_nMaxRetry)
371  {
372  if(!DeleteTransaction(pNdb, iContextId, err))
373  {
374  iRes = 0;
375  bRetry = false;
376  }
377  else
378  {
379  switch(err.status)
380  {
382  bUnknown = true;
383  ++nRetry;
384  break;
385 
387  bUnknown = false;
388  SleepOneCall();
389  ++nRetry;
390  break;
391 
393  if(err.code==626 && bUnknown)
394  iRes = 0;
395  bRetry = false;
396  break;
397 
398  default:
399  bRetry = false;
400  break;
401  }
402  }
403  }
404  return iRes;
405 }
406 
407 
408 
409 int InsertTransaction(Ndb* pNdb,
410  long iContextID,
411  long iVersion,
412  long iLockFlag,
413  long iLockTime,
414  long iLockTimeUSec,
415  const char* pchContextData,
416  NdbError& err)
417 {
418  int iRes = -1;
419  NdbConnection* pNdbConnection = pNdb->startTransaction();
420  //0, (const char*)&iContextID, 4);
421  if(pNdbConnection)
422  {
423  NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
424  if(pNdbOperation)
425  {
426  if(!(g_bWriteTuple ? pNdbOperation->writeTuple() : pNdbOperation->insertTuple())
427  && !pNdbOperation->equal(c_szContextId, (Int32)iContextID)
428  && !pNdbOperation->setValue(c_szVersion, (Int32)iVersion)
429  && !pNdbOperation->setValue(c_szLockFlag, (Int32)iLockFlag)
430  && !pNdbOperation->setValue(c_szLockTime, (Int32)iLockTime)
431  && !pNdbOperation->setValue(c_szLockTimeUSec, (Int32)iLockTimeUSec)
432  && !pNdbOperation->setValue(c_szContextData, pchContextData, g_nStatusDataSize))
433  {
434  if(!pNdbConnection->execute(Commit))
435  iRes = 0;
436  else
437  err = pNdbConnection->getNdbError();
438  }
439  else
440  err = pNdbOperation->getNdbError();
441  }
442  else
443  err = pNdbConnection->getNdbError();
444 
445  pNdb->closeTransaction(pNdbConnection);
446  }
447  else
448  err = pNdb->getNdbError();
449 
450  return iRes;
451 }
452 
453 
454 
455 int RetryInsertTransaction(Ndb* pNdb,
456  long iContextId,
457  long iVersion,
458  long iLockFlag,
459  long iLockTime,
460  long iLockTimeUSec,
461  const char* pchContextData,
462  NdbError& err, int& nRetry)
463 {
464  int iRes = -1;
465  nRetry = 0;
466  bool bRetry = true;
467  bool bUnknown = false;
468  while(bRetry && nRetry<g_nMaxRetry)
469  {
470  if(!InsertTransaction(pNdb, iContextId, iVersion, iLockFlag,
471  iLockTime, iLockTimeUSec, pchContextData, err))
472  {
473  iRes = 0;
474  bRetry = false;
475  }
476  else
477  {
478  switch(err.status)
479  {
481  bUnknown = true;
482  ++nRetry;
483  break;
484 
486  bUnknown = false;
487  SleepOneCall();
488  ++nRetry;
489  break;
490 
492  if(err.code==630 && bUnknown)
493  iRes = 0;
494  bRetry = false;
495  break;
496 
497  default:
498  bRetry = false;
499  break;
500  }
501  }
502  }
503  return iRes;
504 }
505 
506 
507 int UpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err)
508 {
509  int iRes = -1;
510  NdbConnection* pNdbConnection = pNdb->startTransaction();
511  //0, (const char*)&iContextId, 4);
512  if(pNdbConnection)
513  {
514  NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
515  if(pNdbOperation)
516  {
517  if(!pNdbOperation->updateTuple()
518  && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)
519  && !pNdbOperation->setValue(c_szContextData, STATUS_DATA, g_nStatusDataSize))
520  {
521  if(!pNdbConnection->execute(Commit))
522  iRes = 0;
523  else
524  err = pNdbConnection->getNdbError();
525  }
526  else
527  err = pNdbOperation->getNdbError();
528  }
529  else
530  err = pNdbConnection->getNdbError();
531 
532  pNdb->closeTransaction(pNdbConnection);
533  }
534  else
535  err = pNdb->getNdbError();
536 
537  return iRes;
538 }
539 
540 
541 int RetryUpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry)
542 {
543  int iRes = -1;
544  nRetry = 0;
545  bool bRetry = true;
546  while(bRetry && nRetry<g_nMaxRetry)
547  {
548  if(!UpdateTransaction(pNdb, iContextId, err))
549  {
550  iRes = 0;
551  bRetry = false;
552  }
553  else
554  {
555  switch(err.status)
556  {
559  SleepOneCall();
560  ++nRetry;
561  break;
562 
564  default:
565  bRetry = false;
566  break;
567  }
568  }
569  }
570  return iRes;
571 }
572 
573 
574 
575 int InsertInitialRecords(Ndb* pNdb, long nInsert, long nSeed)
576 {
577  int iRes = -1;
578  char szMsg[100];
579  for(long i=0; i<nInsert; ++i)
580  {
581  int iContextID = i+nSeed;
582  int nRetry = 0;
583  NdbError err;
584  memset(&err, 0, sizeof(err));
585  NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond();
586  iRes = RetryInsertTransaction(pNdb, iContextID, nSeed, iContextID,
587  (long)(tStartTrans/1000), (long)((tStartTrans%1000)*1000),
588  STATUS_DATA, err, nRetry);
589  NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond();
590  long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
591  if(nRetry>0)
592  {
593  sprintf(szMsg, "insert retried %d times, time %ld msec.",
594  nRetry, lMillisecForThisTrans);
595  ReportNdbError(szMsg, err);
596  }
597  if(iRes)
598  {
599  ReportNdbError("Insert initial record failed", err);
600  return iRes;
601  }
602  InterlockedIncrement(&g_nNumberOfInitialInsert);
603  }
604  return iRes;
605 }
606 
607 
608 
609 int VerifyInitialRecords(Ndb* pNdb, long nVerify, long nSeed)
610 {
611  int iRes = -1;
612  char* pchContextData = new char[g_nStatusDataSize];
613  char szMsg[100];
614  long iPrevLockTime = -1;
615  long iPrevLockTimeUSec = -1;
616  for(long i=0; i<nVerify; ++i)
617  {
618  int iContextID = i+nSeed;
619  long iVersion = 0;
620  long iLockFlag = 0;
621  long iLockTime = 0;
622  long iLockTimeUSec = 0;
623  int nRetry = 0;
624  NdbError err;
625  memset(&err, 0, sizeof(err));
626  NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond();
627  iRes = RetryQueryTransaction(pNdb, iContextID, &iVersion, &iLockFlag,
628  &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry);
629  NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond();
630  long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
631  if(nRetry>0)
632  {
633  sprintf(szMsg, "verify retried %d times, time %ld msec.",
634  nRetry, lMillisecForThisTrans);
635  ReportNdbError(szMsg, err);
636  }
637  if(iRes)
638  {
639  ReportNdbError("Read initial record failed", err);
640  delete[] pchContextData;
641  return iRes;
642  }
643  if(memcmp(pchContextData, STATUS_DATA, g_nStatusDataSize))
644  {
645  sprintf(szMsg, "wrong context data in tuple %d", iContextID);
646  ReportNdbError(szMsg, err);
647  delete[] pchContextData;
648  return -1;
649  }
650  if(iVersion!=nSeed
651  || iLockFlag!=iContextID
652  || iLockTime<iPrevLockTime
653  || (iLockTime==iPrevLockTime && iLockTimeUSec<iPrevLockTimeUSec))
654  {
655  sprintf(szMsg, "wrong call data in tuple %d", iContextID);
656  ReportNdbError(szMsg, err);
657  delete[] pchContextData;
658  return -1;
659  }
660  iPrevLockTime = iLockTime;
661  iPrevLockTimeUSec = iLockTimeUSec;
662  InterlockedIncrement(&g_nNumberOfInitialVerify);
663  }
664  delete[] pchContextData;
665  return iRes;
666 }
667 
668 
669 
670 
671 
672 void* RuntimeCallContext(void* lpParam)
673 {
674  long nNumCallsProcessed = 0;
675  int nStartingRecordID = *(int*)lpParam;
676 
677  Ndb* pNdb;
678  char* pchContextData = new char[g_nStatusDataSize];
679  char szMsg[100];
680 
681  int iRes;
682  const char* szOp;
683  long iVersion;
684  long iLockFlag;
685  long iLockTime;
686  long iLockTimeUSec;
687 
688  pNdb = new Ndb(theConnection, "TEST_DB");
689  if(!pNdb)
690  {
691  NdbMutex_Lock(g_pNdbMutexPrintf);
692  printf("new Ndb failed\n");
693  NdbMutex_Unlock(g_pNdbMutexPrintf);
694  delete[] pchContextData;
695  return 0;
696  }
697 
698  if(pNdb->init(1) || pNdb->waitUntilReady())
699  {
700  ReportNdbError("init of Ndb failed", pNdb->getNdbError());
701  delete pNdb;
702  delete[] pchContextData;
703  return 0;
704  }
705 
706  if(g_bInsertInitial)
707  {
708  if(InsertInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread))
709  {
710  delete pNdb;
711  delete[] pchContextData;
712  return 0;
713  }
714  }
715 
716  if(g_bVerifyInitial)
717  {
718  NdbError err;
719  memset(&err, 0, sizeof(err));
720  if(VerifyInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread))
721  {
722  delete pNdb;
723  delete[] pchContextData;
724  return 0;
725  }
726  }
727  if(g_bInsertInitial || g_bVerifyInitial)
728  {
729  delete[] pchContextData;
730  return 0;
731  }
732 
733  long nContextID = nStartingRecordID;
734 #ifdef NDB_WIN32
735  while(WaitForSingleObject(hShutdownEvent,0) != WAIT_OBJECT_0)
736 #else
737  while(!bShutdownEvent)
738 #endif
739  {
740  ++nContextID;
741  nContextID %= g_nMaxContextIdPerThread;
742  nContextID += nStartingRecordID;
743 
744  bool bTimeLatency = (nContextID==100);
745 
746  NDB_TICKS tStartCall = NdbTick_CurrentMillisecond();
747  for (int i=0; i < 20; i++)
748  {
749  int nRetry = 0;
750  NdbError err;
751  memset(&err, 0, sizeof(err));
752  NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond();
753  switch(i)
754  {
755  case 3:
756  case 6:
757  case 9:
758  case 11:
759  case 12:
760  case 15:
761  case 18: // Query Record
762  szOp = "Read";
763  iRes = RetryQueryTransaction(pNdb, nContextID, &iVersion, &iLockFlag,
764  &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry);
765  break;
766 
767  case 19: // Delete Record
768  szOp = "Delete";
769  iRes = RetryDeleteTransaction(pNdb, nContextID, err, nRetry);
770  break;
771 
772  case 0: // Insert Record
773  szOp = "Insert";
774  iRes = RetryInsertTransaction(pNdb, nContextID, 1, 1, 1, 1, STATUS_DATA, err, nRetry);
775  break;
776 
777  default: // Update Record
778  szOp = "Update";
779  iRes = RetryUpdateTransaction(pNdb, nContextID, err, nRetry);
780  break;
781  }
782  NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond();
783  long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
784 
785  if(g_bReport)
786  {
787  assert(lMillisecForThisTrans>=0 && lMillisecForThisTrans<c_nMaxMillisecForAllTrans);
788  InterlockedIncrement(g_plCountMillisecForTrans+lMillisecForThisTrans);
789  }
790 
791  if(nRetry>0)
792  {
793  sprintf(szMsg, "%s retried %d times, time %ld msec.",
794  szOp, nRetry, lMillisecForThisTrans);
795  ReportNdbError(szMsg, err);
796  }
797  else if(bTimeLatency)
798  {
799  NdbMutex_Lock(g_pNdbMutexPrintf);
800  printf("%s = %ld msec.\n", szOp, lMillisecForThisTrans);
801  NdbMutex_Unlock(g_pNdbMutexPrintf);
802  }
803 
804  if(iRes)
805  {
806  sprintf(szMsg, "%s failed after %ld calls, terminating thread",
807  szOp, nNumCallsProcessed);
808  ReportNdbError(szMsg, err);
809  delete pNdb;
810  delete[] pchContextData;
811  return 0;
812  }
813  }
814  NDB_TICKS tEndCall = NdbTick_CurrentMillisecond();
815  long lMillisecForThisCall = (long)(tEndCall-tStartCall);
816 
817  if(g_bReport)
818  {
819  assert(lMillisecForThisCall>=0 && lMillisecForThisCall<c_nMaxMillisecForAllCall);
820  InterlockedIncrement(g_plCountMillisecForCall+lMillisecForThisCall);
821  }
822 
823  if(bTimeLatency)
824  {
825  NdbMutex_Lock(g_pNdbMutexPrintf);
826  printf("Total time for call is %ld msec.\n", (long)lMillisecForThisCall);
827  NdbMutex_Unlock(g_pNdbMutexPrintf);
828  }
829 
830  nNumCallsProcessed++;
831  InterlockedIncrementAndReport();
832  if(g_nMaxCallsPerSecond>0)
833  {
834  int iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond;
835  iMillisecToSleep -= lMillisecForThisCall;
836  if(iMillisecToSleep>0)
837  {
838  NdbSleep_MilliSleep(iMillisecToSleep);
839  }
840  }
841  }
842 
843  NdbMutex_Lock(g_pNdbMutexPrintf);
844  printf("Terminating thread after %ld calls\n", nNumCallsProcessed);
845  NdbMutex_Unlock(g_pNdbMutexPrintf);
846 
847  delete pNdb;
848  delete[] pchContextData;
849  return 0;
850 }
851 
852 
853 int CreateCallContextTable(Ndb* pNdb, const char* szTableName, bool bStored)
854 {
855  int iRes = -1;
856  NdbError err;
857  memset(&err, 0, sizeof(err));
858 
859  NdbSchemaCon* pNdbSchemaCon = NdbSchemaCon::startSchemaTrans(pNdb);
860  if(pNdbSchemaCon)
861  {
862  NdbSchemaOp* pNdbSchemaOp = pNdbSchemaCon->getNdbSchemaOp();
863  if(pNdbSchemaOp)
864  {
865  if(!pNdbSchemaOp->createTable(szTableName, 8, TupleKey, 2,
866  All, 6, 78, 80, 1, bStored)
867  && !pNdbSchemaOp->createAttribute(c_szContextId, TupleKey, 32, 1, Signed)
868  && !pNdbSchemaOp->createAttribute(c_szVersion, NoKey, 32, 1, Signed)
869  && !pNdbSchemaOp->createAttribute(c_szLockFlag, NoKey, 32, 1, Signed)
870  && !pNdbSchemaOp->createAttribute(c_szLockTime, NoKey, 32, 1, Signed)
871  && !pNdbSchemaOp->createAttribute(c_szLockTimeUSec, NoKey, 32, 1, Signed)
872  && !pNdbSchemaOp->createAttribute(c_szContextData, NoKey, 8, g_nStatusDataSize, String))
873  {
874  if(!pNdbSchemaCon->execute())
875  iRes = 0;
876  else
877  err = pNdbSchemaCon->getNdbError();
878  }
879  else
880  err = pNdbSchemaOp->getNdbError();
881  }
882  else
883  err = pNdbSchemaCon->getNdbError();
884 
885  NdbSchemaCon::closeSchemaTrans(pNdbSchemaCon);
886  }
887  else
888  err = pNdb->getNdbError();
889 
890  if(iRes)
891  {
892  ReportNdbError("create call context table failed", err);
893  }
894  return iRes;
895 }
896 
897 
898 
899 void ReportResponseTimeStatistics(const char* szStat, long* plCount, const long lSize)
900 {
901  long lCount = 0;
902  Int64 llSum = 0;
903  Int64 llSum2 = 0;
904  long lMin = -1;
905  long lMax = -1;
906 
907  for(long l=0; l<lSize; ++l)
908  {
909  if(plCount[l]>0)
910  {
911  lCount += plCount[l];
912  llSum += (Int64)l*(Int64)plCount[l];
913  llSum2 += (Int64)l*(Int64)l*(Int64)plCount[l];
914  if(lMin==-1 || l<lMin)
915  {
916  lMin = l;
917  }
918  if(lMax==-1 || l>lMax)
919  {
920  lMax = l;
921  }
922  }
923  }
924 
925  long lAvg = long(llSum/lCount);
926  double dblVar = ((double)lCount*(double)llSum2 - (double)llSum*(double)llSum)/((double)lCount*(double)(lCount-1));
927  long lStd = long(sqrt(dblVar));
928 
929  long lMed = -1;
930  long l95 = -1;
931  long lSel = -1;
932  for(long l=lMin; l<=lMax; ++l)
933  {
934  if(plCount[l]>0)
935  {
936  lSel += plCount[l];
937  if(lMed==-1 && lSel>=(lCount/2))
938  {
939  lMed = l;
940  }
941  if(l95==-1 && lSel>=((lCount*95)/100))
942  {
943  l95 = l;
944  }
945  if(g_bReportPlus)
946  {
947  printf("%ld\t%ld\n", l, plCount[l]);
948  }
949  }
950  }
951 
952  printf("%s: Count=%ld, Min=%ld, Max=%ld, Avg=%ld, Std=%ld, Med=%ld, 95%%=%ld\n",
953  szStat, lCount, lMin, lMax, lAvg, lStd, lMed, l95);
954 }
955 
956 
957 
958 void ShowHelp(const char* szCmd)
959 {
960  printf("%s -t<threads> [-s<seed>] [-b<batch>] [-c<maxcps>] [-m<size>] [-d] [-i] [-v] [-f] [-w] [-r[+]]\n", szCmd);
961  printf("%s -?\n", szCmd);
962  puts("-d\t\tcreate the table");
963  puts("-i\t\tinsert initial records");
964  puts("-v\t\tverify initial records");
965  puts("-t<threads>\tnumber of threads making calls");
966  puts("-s<seed>\toffset for primary key");
967  puts("-b<batch>\tbatch size per thread");
968  puts("-c<maxcps>\tmax number of calls per second for this process");
969  puts("-m<size>\tsize of context data");
970  puts("-f\t\tno checkpointing and no logging");
971  puts("-w\t\tuse writeTuple instead of insertTuple");
972  puts("-r\t\treport response time statistics");
973  puts("-r+\t\treport response time distribution");
974  puts("-?\t\thelp");
975 }
976 
977 
978 int main(int argc, char* argv[])
979 {
980  ndb_init();
981  g_nNumThreads = 0;
982  g_nMaxCallsPerSecond = 0;
983  long nSeed = 0;
984  bool bStoredTable = true;
985  bool bCreateTable = false;
986  g_bWriteTuple = false;
987  g_bReport = false;
988  g_bReportPlus = false;
989 
990  for(int i=1; i<argc; ++i)
991  {
992  if(argv[i][0]=='-' || argv[i][0]=='/')
993  {
994  switch(argv[i][1])
995  {
996  case 't':
997  g_nNumThreads = atol(argv[i]+2);
998  break;
999  case 's':
1000  nSeed = atol(argv[i]+2);
1001  break;
1002  case 'b':
1003  g_nMaxContextIdPerThread = atol(argv[i]+2);
1004  break;
1005  case 'm':
1006  g_nStatusDataSize = atol(argv[i]+2);
1007  if(g_nStatusDataSize> (int) sizeof(STATUS_DATA))
1008  {
1009  g_nStatusDataSize = sizeof(STATUS_DATA);
1010  }
1011  break;
1012  case 'i':
1013  g_bInsertInitial = true;
1014  break;
1015  case 'v':
1016  g_bVerifyInitial = true;
1017  break;
1018  case 'd':
1019  bCreateTable = true;
1020  break;
1021  case 'f':
1022  bStoredTable = false;
1023  break;
1024  case 'w':
1025  g_bWriteTuple = true;
1026  break;
1027  case 'r':
1028  g_bReport = true;
1029  if(argv[i][2]=='+')
1030  {
1031  g_bReportPlus = true;
1032  }
1033  break;
1034  case 'c':
1035  g_nMaxCallsPerSecond = atol(argv[i]+2);
1036  break;
1037  case '?':
1038  default:
1039  ShowHelp(argv[0]);
1040  return -1;
1041  }
1042  }
1043  else
1044  {
1045  ShowHelp(argv[0]);
1046  return -1;
1047  }
1048  }
1049  if(bCreateTable)
1050  puts("-d\tcreate the table");
1051  if(g_bInsertInitial)
1052  printf("-i\tinsert initial records\n");
1053  if(g_bVerifyInitial)
1054  printf("-v\tverify initial records\n");
1055  if(g_nNumThreads>0)
1056  printf("-t%ld\tnumber of threads making calls\n", g_nNumThreads);
1057  if(g_nNumThreads>0)
1058  {
1059  printf("-s%ld\toffset for primary key\n", nSeed);
1060  printf("-b%ld\tbatch size per thread\n", g_nMaxContextIdPerThread);
1061  }
1062  if(g_nMaxCallsPerSecond>0)
1063  printf("-c%ld\tmax number of calls per second for this process\n", g_nMaxCallsPerSecond);
1064  if(!bStoredTable)
1065  puts("-f\tno checkpointing and no logging to disk");
1066  if(g_bWriteTuple)
1067  puts("-w\tuse writeTuple instead of insertTuple");
1068  if(g_bReport)
1069  puts("-r\treport response time statistics");
1070  if(g_bReportPlus)
1071  puts("-r+\treport response time distribution");
1072 
1073  if(!bCreateTable && g_nNumThreads<=0)
1074  {
1075  ShowHelp(argv[0]);
1076  return -1;
1077  }
1078  printf("-m%ld\tsize of context data\n", g_nStatusDataSize);
1079 
1080  g_szTableName = (bStoredTable ? c_szTableNameStored : c_szTableNameTemp);
1081 
1082 #ifdef NDB_WIN32
1083  SetConsoleCtrlHandler(ConsoleCtrlHandler, true);
1084 #else
1085  signal(SIGINT, CtrlCHandler);
1086 #endif
1087 
1088  if(g_bReport)
1089  {
1090  g_plCountMillisecForCall = new long[c_nMaxMillisecForAllCall];
1091  memset(g_plCountMillisecForCall, 0, c_nMaxMillisecForAllCall*sizeof(long));
1092  g_plCountMillisecForTrans = new long[c_nMaxMillisecForAllTrans];
1093  memset(g_plCountMillisecForTrans, 0, c_nMaxMillisecForAllTrans*sizeof(long));
1094  }
1095 
1096  g_pNdbMutexIncrement = NdbMutex_Create();
1097  g_pNdbMutexPrintf = NdbMutex_Create();
1098 #ifdef NDB_WIN32
1099  hShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
1100 #endif
1101 
1102  theConnection= new Ndb_cluster_connection();
1103  if (theConnection->connect(12, 5, 1) != 0)
1104  {
1105  ndbout << "Unable to connect to managment server." << endl;
1106  return -1;
1107  }
1108  if (theConnection->wait_until_ready(30,0) < 0)
1109  {
1110  ndbout << "Cluster nodes not ready in 30 seconds." << endl;
1111  return -1;
1112  }
1113 
1114  Ndb* pNdb = new Ndb(theConnection, c_szDatabaseName);
1115  if(!pNdb)
1116  {
1117  printf("could not construct ndb\n");
1118  return 1;
1119  }
1120 
1121  if(pNdb->init(1) || pNdb->waitUntilReady())
1122  {
1123  ReportNdbError("could not initialize ndb\n", pNdb->getNdbError());
1124  delete pNdb;
1125  return 2;
1126  }
1127 
1128  if(bCreateTable)
1129  {
1130  printf("Create CallContext table\n");
1131  if (bStoredTable)
1132  {
1133  if (CreateCallContextTable(pNdb, c_szTableNameStored, true))
1134  {
1135  printf("Create table failed\n");
1136  delete pNdb;
1137  return 3;
1138  }
1139  }
1140  else
1141  {
1142  if (CreateCallContextTable(pNdb, c_szTableNameTemp, false))
1143  {
1144  printf("Create table failed\n");
1145  delete pNdb;
1146  return 3;
1147  }
1148  }
1149  }
1150 
1151  if(g_nNumThreads>0)
1152  {
1153  printf("creating %d threads\n", (int)g_nNumThreads);
1154  if(g_bInsertInitial)
1155  {
1156  printf("each thread will insert %ld initial records, total %ld inserts\n",
1157  g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread);
1158  }
1159  if(g_bVerifyInitial)
1160  {
1161  printf("each thread will verify %ld initial records, total %ld reads\n",
1162  g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread);
1163  }
1164 
1165  g_nNumberOfInitialInsert = 0;
1166  g_nNumberOfInitialVerify = 0;
1167 
1168  NDB_TICKS tStartTime = NdbTick_CurrentMillisecond();
1169  NdbThread* pThreads[256];
1170  int pnStartingRecordNum[256];
1171  int ij;
1172  for(ij=0;ij<g_nNumThreads;ij++)
1173  {
1174  pnStartingRecordNum[ij] = (ij*g_nMaxContextIdPerThread) + nSeed;
1175  }
1176 
1177  for(ij=0;ij<g_nNumThreads;ij++)
1178  {
1179  pThreads[ij] = NdbThread_Create(RuntimeCallContext,
1180  (void**)(pnStartingRecordNum+ij),
1181  0, "RuntimeCallContext", NDB_THREAD_PRIO_LOW);
1182  }
1183 
1184  //Wait for the threads to finish
1185  for(ij=0;ij<g_nNumThreads;ij++)
1186  {
1187  void* status;
1188  NdbThread_WaitFor(pThreads[ij], &status);
1189  }
1190  NDB_TICKS tEndTime = NdbTick_CurrentMillisecond();
1191 
1192  //Print time taken
1193  printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)\n",
1194  g_nNumCallsProcessed,
1195  (long)(tEndTime-tStartTime),
1196  (long)((1000*g_nNumCallsProcessed)/(tEndTime-tStartTime)));
1197 
1198  if(g_bInsertInitial)
1199  printf("successfully inserted %ld tuples\n", g_nNumberOfInitialInsert);
1200  if(g_bVerifyInitial)
1201  printf("successfully verified %ld tuples\n", g_nNumberOfInitialVerify);
1202  }
1203 
1204  delete pNdb;
1205 
1206 #ifdef NDB_WIN32
1207  CloseHandle(hShutdownEvent);
1208 #endif
1209  NdbMutex_Destroy(g_pNdbMutexIncrement);
1210  NdbMutex_Destroy(g_pNdbMutexPrintf);
1211 
1212  if(g_bReport)
1213  {
1214  ReportResponseTimeStatistics("Calls", g_plCountMillisecForCall, c_nMaxMillisecForAllCall);
1215  ReportResponseTimeStatistics("Transactions", g_plCountMillisecForTrans, c_nMaxMillisecForAllTrans);
1216 
1217  delete[] g_plCountMillisecForCall;
1218  delete[] g_plCountMillisecForTrans;
1219  }
1220 
1221  return 0;
1222 }
1223