MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mainAsyncGenerator.cpp
1 /*
2  Copyright (C) 2003-2006 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <ndb_global.h>
20 
21 #include <NdbHost.h>
22 #include <NdbSleep.h>
23 #include <NdbThread.h>
24 #include <NdbMain.h>
25 #include <NdbOut.hpp>
26 #include <NdbEnv.h>
27 #include <NdbTest.hpp>
28 
29 #include "userInterface.h"
30 #include "dbGenerator.h"
31 
32 static int numProcesses;
33 static int numSeconds;
34 static int numWarmSeconds;
35 static int parallellism;
36 static int millisSendPoll;
37 static int minEventSendPoll;
38 static int forceSendPoll;
39 
40 static ThreadData *data;
41 
42 static void usage(const char *prog)
43 {
44  const char *progname;
45 
46  /*--------------------------------------------*/
47  /* Get the name of the program (without path) */
48  /*--------------------------------------------*/
49  progname = strrchr(prog, '/');
50 
51  if (progname == 0)
52  progname = prog;
53  else
54  ++progname;
55 
56  ndbout_c(
57  "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] "
58  "[-t <num> ] [ -e <num> ] [ -f <num>] \n"
59  " -proc <num> Specifies that <num> is the number of\n"
60  " threads. The default is 1.\n"
61  " -time <num> Specifies that the test will run for <num> sec.\n"
62  " The default is 10 sec\n"
63  " -warm <num> Specifies the warm-up/cooldown period of <num> "
64  "sec.\n"
65  " The default is 10 sec\n"
66  " -p <num> The no of parallell transactions started by "
67  "one thread\n"
68  " -e <num> Minimum no of events before wake up in call to "
69  "sendPoll\n"
70  " Default is 1\n"
71  " -f <num> force parameter to sendPoll\n"
72  " Default is 0\n",
73  progname);
74 }
75 
76 static
77 int
78 parse_args(int argc, const char **argv)
79 {
80  int i;
81 
82  numProcesses = 1;
83  numSeconds = 10;
84  numWarmSeconds = 10;
85  parallellism = 1;
86  millisSendPoll = 10000;
87  minEventSendPoll = 1;
88  forceSendPoll = 0;
89 
90 
91  i = 1;
92  while (i < argc){
93  if (strcmp("-proc",argv[i]) == 0) {
94  if (i + 1 >= argc) {
95  return 1;
96  }
97  if (sscanf(argv[i+1], "%d", &numProcesses) == -1 ||
98  numProcesses <= 0 || numProcesses > 127) {
99  ndbout_c("-proc flag requires a positive integer argument [1..127]");
100  return 1;
101  }
102  i += 2;
103  } else if (strcmp("-p", argv[i]) == 0){
104  if(i + 1 >= argc){
105  usage(argv[0]);
106  return 1;
107  }
108  if (sscanf(argv[i+1], "%d", &parallellism) == -1 ||
109  parallellism <= 0){
110  ndbout_c("-p flag requires a positive integer argument");
111  return 1;
112  }
113  i += 2;
114  }
115  else if (strcmp("-time",argv[i]) == 0) {
116  if (i + 1 >= argc) {
117  return 1;
118  }
119  if (sscanf(argv[i+1], "%d", &numSeconds) == -1 ||
120  numSeconds < 0) {
121  ndbout_c("-time flag requires a positive integer argument");
122  return 1;
123  }
124  i += 2;
125  }
126  else if (strcmp("-warm",argv[i]) == 0) {
127  if (i + 1 >= argc) {
128  return 1;
129  }
130  if (sscanf(argv[i+1], "%d", &numWarmSeconds) == -1 ||
131  numWarmSeconds < 0) {
132  ndbout_c("-warm flag requires a positive integer argument");
133  return 1;
134  }
135  i += 2;
136  }
137  else if (strcmp("-e",argv[i]) == 0) {
138  if (i + 1 >= argc) {
139  return 1;
140  }
141  if (sscanf(argv[i+1], "%d", &minEventSendPoll) == -1 ||
142  minEventSendPoll < 0) {
143  ndbout_c("-e flag requires a positive integer argument");
144  return 1;
145  }
146  i += 2;
147  }
148  else if (strcmp("-f",argv[i]) == 0) {
149  if (i + 1 >= argc) {
150  usage(argv[0]);
151  return 1;
152  }
153  if (sscanf(argv[i+1], "%d", &forceSendPoll) == -1 ||
154  forceSendPoll < 0) {
155  ndbout_c("-f flag requires a positive integer argument");
156  return 1;
157  }
158  i += 2;
159  }
160  else {
161  return 1;
162  }
163  }
164 
165  if(minEventSendPoll > parallellism){
166  ndbout_c("minEventSendPoll(%d) > parallellism(%d)",
167  minEventSendPoll, parallellism);
168  ndbout_c("not very good...");
169  ndbout_c("very bad...");
170  ndbout_c("exiting...");
171  return 1;
172  }
173  return 0;
174 }
175 
176 static
177 void
178 print_transaction(const char *header,
179  unsigned long totalCount,
180  TransactionDefinition *trans,
181  unsigned int printBranch,
182  unsigned int printRollback)
183 {
184  double f;
185 
186  ndbout_c(" %s: %d (%.2f%%) "
187  "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
188  header,
189  trans->count,
190  (double)trans->count / (double)totalCount * 100.0,
191  (int)trans->latency.getMean(),
192  (int)trans->latency.getMin(),
193  (int)trans->latency.getMax(),
194  (int)trans->latency.getStddev(),
195  (int)trans->latency.getCount()
196  );
197 
198  if( printBranch ){
199  if( trans->count == 0 )
200  f = 0.0;
201  else
202  f = (double)trans->branchExecuted / (double)trans->count * 100.0;
203  ndbout_c(" Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
204  }
205 
206  if( printRollback ){
207  if( trans->count == 0 )
208  f = 0.0;
209  else
210  f = (double)trans->rollbackExecuted / (double)trans->count * 100.0;
211  ndbout_c(" Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
212  }
213 }
214 
215 void
216 print_stats(const char *title,
217  unsigned int length,
218  unsigned int transactionFlag,
219  GeneratorStatistics *gen,
220  int numProc, int parallellism)
221 {
222  int i;
223  char buf[10];
224  char name[MAXHOSTNAMELEN];
225 
226  name[0] = 0;
227  NdbHost_GetHostName(name);
228 
229  ndbout_c("\n------ %s ------",title);
230  ndbout_c("Length : %d %s",
231  length,
232  transactionFlag ? "Transactions" : "sec");
233  ndbout_c("Processor : %s", name);
234  ndbout_c("Number of Proc: %d",numProc);
235  ndbout_c("Parallellism : %d", parallellism);
236  ndbout_c("\n");
237 
238  if( gen->totalTransactions == 0 ) {
239  ndbout_c(" No Transactions for this test");
240  }
241  else {
242  for(i = 0; i < 5; i++) {
243  sprintf(buf, "T%d",i+1);
244  print_transaction(buf,
245  gen->totalTransactions,
246  &gen->transactions[i],
247  i >= 2,
248  i >= 3 );
249  }
250 
251  ndbout_c("\n");
252  ndbout_c(" Overall Statistics:");
253  ndbout_c(" Transactions: %d", gen->totalTransactions);
254  ndbout_c(" Outer : %.0f TPS",gen->outerTps);
255  ndbout_c("\n");
256  }
257 }
258 
259 static
260 void *
261 threadRoutine(void *arg)
262 {
263  int i;
264  ThreadData *data = (ThreadData *)arg;
265  Ndb * pNDB;
266 
267  pNDB = asyncDbConnect(parallellism);
268  /* NdbSleep_MilliSleep(rand() % 10); */
269 
270  for(i = 0; i<parallellism; i++){
271  data[i].pNDB = pNDB;
272  }
273  millisSendPoll = 30000;
274  asyncGenerator(data, parallellism,
275  millisSendPoll, minEventSendPoll, forceSendPoll);
276 
277  asyncDbDisconnect(pNDB);
278 
279  return NULL;
280 }
281 
282 NDB_COMMAND(DbAsyncGenerator, "DbAsyncGenerator",
283  "DbAsyncGenerator", "DbAsyncGenerator", 65535)
284 {
285  ndb_init();
286  int i;
287  int j;
288  int k;
289  struct NdbThread* pThread = NULL;
292  char threadName[32];
293  int rc = NDBT_OK;
294  void* tmp = NULL;
295  if(parse_args(argc,argv) != 0){
296  usage(argv[0]);
297  return NDBT_ProgramExit(NDBT_WRONGARGS);
298  }
299 
300 
301  ndbout_c("\nStarting Test with %d process(es) for %d %s parallellism %d",
302  numProcesses,
303  numSeconds,
304  "sec",
305  parallellism);
306 
307  ndbout_c(" WarmUp/coolDown = %d sec", numWarmSeconds);
308 
309  data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
310 
311  for(i = 0; i < numProcesses; i++) {
312  for(j = 0; j<parallellism; j++){
313  data[i*parallellism+j].warmUpSeconds = numWarmSeconds;
314  data[i*parallellism+j].testSeconds = numSeconds;
315  data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
316  data[i*parallellism+j].randomSeed =
317  NdbTick_CurrentMillisecond()+i+j;
318  data[i*parallellism+j].changedTime = 0;
319  data[i*parallellism+j].runState = Runnable;
320  }
321  sprintf(threadName, "AsyncThread[%d]", i);
322  pThread = NdbThread_Create(threadRoutine,
323  (void**)&data[i*parallellism],
324  65535,
325  threadName,
326  NDB_THREAD_PRIO_LOW);
327  if(pThread != 0 && pThread != NULL){
328  (&data[i*parallellism])->pThread = pThread;
329  } else {
330  perror("Failed to create thread");
331  rc = NDBT_FAILED;
332  }
333  }
334 
335  showTime();
336 
337  /*--------------------------------*/
338  /* Wait for all processes to exit */
339  /*--------------------------------*/
340  for(i = 0; i < numProcesses; i++) {
341  NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
342  NdbThread_Destroy(&data[i*parallellism].pThread);
343  }
344 
345  ndbout_c("All threads have finished");
346 
347  /*-------------------------------------------*/
348  /* Clear all structures for total statistics */
349  /*-------------------------------------------*/
350  stats.totalTransactions = 0;
351  stats.outerTps = 0.0;
352 
353  for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
354  stats.transactions[i].count = 0;
355  stats.transactions[i].branchExecuted = 0;
356  stats.transactions[i].rollbackExecuted = 0;
357  stats.transactions[i].latency.reset();
358  }
359 
360  /*--------------------------------*/
361  /* Add the values for all Threads */
362  /*--------------------------------*/
363  for(i = 0; i < numProcesses; i++) {
364  for(k = 0; k<parallellism; k++){
365  p = &data[i*parallellism+k].generator;
366 
367  stats.totalTransactions += p->totalTransactions;
368  stats.outerTps += p->outerTps;
369 
370  for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
371  stats.transactions[j].count +=
372  p->transactions[j].count;
373  stats.transactions[j].branchExecuted +=
374  p->transactions[j].branchExecuted;
375  stats.transactions[j].rollbackExecuted +=
376  p->transactions[j].rollbackExecuted;
377  stats.transactions[j].latency +=
378  p->transactions[j].latency;
379  }
380  }
381  }
382 
383  print_stats("Test Results",
384  numSeconds,
385  0,
386  &stats,
387  numProcesses,
388  parallellism);
389 
390  free(data);
391 
392  NDBT_ProgramExit(rc);
393 }