MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
asyncGenerator.cpp
1 /*
2  Copyright (C) 2005, 2006 MySQL AB, 2008, 2009 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 /***************************************************************
20 * I N C L U D E D F I L E S *
21 ***************************************************************/
22 
23 #include <ndb_global.h>
24 
25 #include "dbGenerator.h"
26 #include <NdbApi.hpp>
27 #include <NdbOut.hpp>
28 #include <NdbSleep.h>
29 
30 /***************************************************************
31 * L O C A L C O N S T A N T S *
32 ***************************************************************/
33 
34 /***************************************************************
35 * L O C A L D A T A S T R U C T U R E S *
36 ***************************************************************/
37 
38 /***************************************************************
39 * L O C A L F U N C T I O N S *
40 ***************************************************************/
41 
42 static void getRandomSubscriberNumber(SubscriberNumber number);
43 static void getRandomServerId(ServerId *serverId);
44 static void getRandomChangedBy(ChangedBy changedBy);
45 //static void getRandomChangedTime(ChangedTime changedTime);
46 
47 static void clearTransaction(TransactionDefinition *trans);
48 static void initGeneratorStatistics(GeneratorStatistics *gen);
49 
50 static void doOneTransaction(ThreadData * td,
51  int parallellism,
52  int millisSendPoll,
53  int minEventSendPoll,
54  int forceSendPoll);
55 static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
56 static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
57 static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
58 static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
59 static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);
60 
61 /***************************************************************
62 * L O C A L D A T A *
63 ***************************************************************/
64 
65 static SequenceValues transactionDefinition[] = {
66  {25, 1},
67  {25, 2},
68  {20, 3},
69  {15, 4},
70  {15, 5},
71  {0, 0}
72 };
73 
74 static SequenceValues rollbackDefinition[] = {
75  {98, 0},
76  {2 , 1},
77  {0, 0}
78 };
79 
80 static unsigned maxsize = 0;
81 
82 /***************************************************************
83 * P U B L I C D A T A *
84 ***************************************************************/
85 
86 /***************************************************************
87 ****************************************************************
88 * L O C A L F U N C T I O N S C O D E S E C T I O N *
89 ****************************************************************
90 ***************************************************************/
91 
92 static void getRandomSubscriberNumber(SubscriberNumber number)
93 {
94  uint32 tmp;
95  char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
96  tmp = myRandom48(NO_OF_SUBSCRIBERS);
97  sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
98  memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
99 }
100 
101 static void getRandomServerId(ServerId *serverId)
102 {
103  *serverId = myRandom48(NO_OF_SERVERS);
104 }
105 
106 static void getRandomChangedBy(ChangedBy changedBy)
107 {
108  memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
109  changedBy[CHANGED_BY_LENGTH] = 0;
110 }
111 
112 #if 0
113 static void getRandomChangedTime(ChangedTime changedTime)
114 {
115  memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
116  changedTime[CHANGED_TIME_LENGTH] = 0;
117 }
118 #endif
119 
120 static void clearTransaction(TransactionDefinition *trans)
121 {
122  trans->count = 0;
123  trans->branchExecuted = 0;
124  trans->rollbackExecuted = 0;
125  trans->latencyCounter = myRandom48(127);
126  trans->latency.reset();
127 }
128 
129 static int listFull(SessionList *list)
130 {
131  return(list->numberInList == SESSION_LIST_LENGTH);
132 }
133 
134 static int listEmpty(SessionList *list)
135 {
136  return(list->numberInList == 0);
137 }
138 
139 static void insertSession(SessionList *list,
140  SubscriberNumber number,
141  ServerId serverId)
142 {
143  SessionElement *e;
144  if( listFull(list) ) return;
145 
146  e = &list->list[list->writeIndex];
147 
148  strcpy(e->subscriberNumber, number);
149  e->serverId = serverId;
150 
151  list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
152  list->numberInList++;
153 
154  if( list->numberInList > maxsize )
155  maxsize = list->numberInList;
156 }
157 
158 static SessionElement *getNextSession(SessionList *list)
159 {
160  if( listEmpty(list) ) return(0);
161 
162  return(&list->list[list->readIndex]);
163 }
164 
165 static void deleteSession(SessionList *list)
166 {
167  if( listEmpty(list) ) return;
168 
169  list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
170  list->numberInList--;
171 }
172 
173 static void initGeneratorStatistics(GeneratorStatistics *gen)
174 {
175  int i;
176 
177  if( initSequence(&gen->transactionSequence,
178  transactionDefinition) != 0 ) {
179  ndbout_c("could not set the transaction types");
180  exit(0);
181  }
182 
183  if( initSequence(&gen->rollbackSequenceT4,
184  rollbackDefinition) != 0 ) {
185  ndbout_c("could not set the rollback sequence");
186  exit(0);
187  }
188 
189  if( initSequence(&gen->rollbackSequenceT5,
190  rollbackDefinition) != 0 ) {
191  ndbout_c("could not set the rollback sequence");
192  exit(0);
193  }
194 
195  for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
196  clearTransaction(&gen->transactions[i]);
197 
198  gen->totalTransactions = 0;
199 
200  gen->activeSessions.numberInList = 0;
201  gen->activeSessions.readIndex = 0;
202  gen->activeSessions.writeIndex = 0;
203 }
204 
205 
206 static
207 void
208 doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
209 {
210  int i;
211  unsigned int transactionType;
212  int async = 1;
213  if (p == 1) {
214  async = 0;
215  }//if
216  for(i = 0; i<p; i++){
217  if(td[i].runState == Runnable){
218  transactionType = getNextRandom(&td[i].generator.transactionSequence);
219 
220  switch(transactionType) {
221  case 1:
222  doTransaction_T1(td[i].pNDB, &td[i], async);
223  break;
224  case 2:
225  doTransaction_T2(td[i].pNDB, &td[i], async);
226  break;
227  case 3:
228  doTransaction_T3(td[i].pNDB, &td[i], async);
229  break;
230  case 4:
231  doTransaction_T4(td[i].pNDB, &td[i], async);
232  break;
233  case 5:
234  doTransaction_T5(td[i].pNDB, &td[i], async);
235  break;
236  default:
237  ndbout_c("Unknown transaction type: %d", transactionType);
238  }
239  }
240  }
241  if (async == 1) {
242  td[0].pNDB->sendPollNdb(millis, minEvents, force);
243  }//if
244 }
245 
246 static
247 void
248 doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
249 {
250  /*----------------*/
251  /* Init arguments */
252  /*----------------*/
253  getRandomSubscriberNumber(td->transactionData.number);
254  getRandomChangedBy(td->transactionData.changed_by);
255  BaseString::snprintf(td->transactionData.changed_time,
256  sizeof(td->transactionData.changed_time),
257  "%ld - %u", td->changedTime++, (unsigned)myRandom48(65536*1024));
258  //getRandomChangedTime(td->transactionData.changed_time);
259  td->transactionData.location = td->transactionData.changed_by[0];
260 
261  /*-----------------*/
262  /* Run transaction */
263  /*-----------------*/
264  td->runState = Running;
265  td->generator.transactions[0].startLatency();
266 
267  start_T1(pNDB, td, async);
268 }
269 
270 static
271 void
272 doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
273 {
274  /*----------------*/
275  /* Init arguments */
276  /*----------------*/
277  getRandomSubscriberNumber(td->transactionData.number);
278 
279  /*-----------------*/
280  /* Run transaction */
281  /*-----------------*/
282  td->runState = Running;
283  td->generator.transactions[1].startLatency();
284 
285  start_T2(pNDB, td, async);
286 }
287 
288 static
289 void
290 doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
291 {
292  SessionElement *se;
293 
294  /*----------------*/
295  /* Init arguments */
296  /*----------------*/
297  se = getNextSession(&td->generator.activeSessions);
298  if( se ) {
299  strcpy(td->transactionData.number, se->subscriberNumber);
300  td->transactionData.server_id = se->serverId;
301  td->transactionData.sessionElement = 1;
302  } else {
303  getRandomSubscriberNumber(td->transactionData.number);
304  getRandomServerId(&td->transactionData.server_id);
305  td->transactionData.sessionElement = 0;
306  }
307 
308  td->transactionData.server_bit = (1 << td->transactionData.server_id);
309 
310  /*-----------------*/
311  /* Run transaction */
312  /*-----------------*/
313  td->runState = Running;
314  td->generator.transactions[2].startLatency();
315  start_T3(pNDB, td, async);
316 }
317 
318 static
319 void
320 doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
321 {
322  /*----------------*/
323  /* Init arguments */
324  /*----------------*/
325  getRandomSubscriberNumber(td->transactionData.number);
326  getRandomServerId(&td->transactionData.server_id);
327 
328  td->transactionData.server_bit = (1 << td->transactionData.server_id);
329  td->transactionData.do_rollback =
330  getNextRandom(&td->generator.rollbackSequenceT4);
331 
332  memset(td->transactionData.session_details+2,
333  myRandom48(26)+'A', SESSION_DETAILS_LENGTH-3);
334  td->transactionData.session_details[SESSION_DETAILS_LENGTH-1] = 0;
335  int2store(td->transactionData.session_details,SESSION_DETAILS_LENGTH-2);
336 
337  /*-----------------*/
338  /* Run transaction */
339  /*-----------------*/
340  td->runState = Running;
341  td->generator.transactions[3].startLatency();
342  start_T4(pNDB, td, async);
343 }
344 
345 static
346 void
347 doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
348 {
349  SessionElement * se;
350  se = getNextSession(&td->generator.activeSessions);
351  if( se ) {
352  strcpy(td->transactionData.number, se->subscriberNumber);
353  td->transactionData.server_id = se->serverId;
354  td->transactionData.sessionElement = 1;
355  }
356  else {
357  getRandomSubscriberNumber(td->transactionData.number);
358  getRandomServerId(&td->transactionData.server_id);
359  td->transactionData.sessionElement = 0;
360  }
361 
362  td->transactionData.server_bit = (1 << td->transactionData.server_id);
363  td->transactionData.do_rollback
364  = getNextRandom(&td->generator.rollbackSequenceT5);
365 
366  /*-----------------*/
367  /* Run transaction */
368  /*-----------------*/
369  td->runState = Running;
370  td->generator.transactions[4].startLatency();
371  start_T5(pNDB, td, async);
372 }
373 
374 void
375 complete_T1(ThreadData * data){
376  data->generator.transactions[0].stopLatency();
377  data->generator.transactions[0].count++;
378 
379  data->runState = Runnable;
380  data->generator.totalTransactions++;
381 }
382 
383 void
384 complete_T2(ThreadData * data){
385  data->generator.transactions[1].stopLatency();
386  data->generator.transactions[1].count++;
387 
388  data->runState = Runnable;
389  data->generator.totalTransactions++;
390 }
391 
392 void
393 complete_T3(ThreadData * data){
394 
395  data->generator.transactions[2].stopLatency();
396  data->generator.transactions[2].count++;
397 
398  if(data->transactionData.branchExecuted)
399  data->generator.transactions[2].branchExecuted++;
400 
401  data->runState = Runnable;
402  data->generator.totalTransactions++;
403 }
404 
405 void
406 complete_T4(ThreadData * data){
407 
408  data->generator.transactions[3].stopLatency();
409  data->generator.transactions[3].count++;
410 
411  if(data->transactionData.branchExecuted)
412  data->generator.transactions[3].branchExecuted++;
413  if(data->transactionData.do_rollback)
414  data->generator.transactions[3].rollbackExecuted++;
415 
416  if(data->transactionData.branchExecuted &&
417  !data->transactionData.do_rollback){
418  insertSession(&data->generator.activeSessions,
419  data->transactionData.number,
420  data->transactionData.server_id);
421  }
422 
423  data->runState = Runnable;
424  data->generator.totalTransactions++;
425 
426 }
427 void
428 complete_T5(ThreadData * data){
429 
430  data->generator.transactions[4].stopLatency();
431  data->generator.transactions[4].count++;
432 
433  if(data->transactionData.branchExecuted)
434  data->generator.transactions[4].branchExecuted++;
435  if(data->transactionData.do_rollback)
436  data->generator.transactions[4].rollbackExecuted++;
437 
438  if(data->transactionData.sessionElement &&
439  !data->transactionData.do_rollback){
440  deleteSession(&data->generator.activeSessions);
441  }
442 
443  data->runState = Runnable;
444  data->generator.totalTransactions++;
445 }
446 
447 /***************************************************************
448 ****************************************************************
449 * P U B L I C F U N C T I O N S C O D E S E C T I O N *
450 ****************************************************************
451 ***************************************************************/
452 void
453 asyncGenerator(ThreadData *data,
454  int parallellism,
455  int millisSendPoll,
456  int minEventSendPoll,
457  int forceSendPoll)
458 {
459  ThreadData * startUp;
460 
462  double periodStop;
463  double benchTimeStart;
464  double benchTimeEnd;
465  int i, j, done;
466 
467  myRandom48Init(data->randomSeed);
468 
469  for(i = 0; i<parallellism; i++){
470  initGeneratorStatistics(&data[i].generator);
471  }
472 
473  startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
474  memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
475 
476  /*----------------*/
477  /* warm up period */
478  /*----------------*/
479  periodStop = userGetTime() + (double)data[0].warmUpSeconds;
480 
481  while(userGetTime() < periodStop){
482  doOneTransaction(startUp, parallellism,
483  millisSendPoll, minEventSendPoll, forceSendPoll);
484  }
485 
486  ndbout_c("Waiting for startup to finish");
487 
491  done = 0;
492  while(!done){
493  done = 1;
494  for(i = 0; i<parallellism; i++){
495  if(startUp[i].runState != Runnable){
496  done = 0;
497  break;
498  }
499  }
500  if(!done){
501  startUp[0].pNDB->sendPollNdb();
502  }
503  }
504  ndbout_c("Benchmark period starts");
505 
506  /*-------------------------*/
507  /* normal benchmark period */
508  /*-------------------------*/
509  benchTimeStart = userGetTime();
510 
511  periodStop = benchTimeStart + (double)data[0].testSeconds;
512  while(userGetTime() < periodStop)
513  doOneTransaction(data, parallellism,
514  millisSendPoll, minEventSendPoll, forceSendPoll);
515 
516  benchTimeEnd = userGetTime();
517 
518  ndbout_c("Benchmark period done");
519 
523  done = 0;
524  while(!done){
525  done = 1;
526  for(i = 0; i<parallellism; i++){
527  if(data[i].runState != Runnable){
528  done = 0;
529  break;
530  }
531  }
532  if(!done){
533  data[0].pNDB->sendPollNdb();
534  }
535  }
536 
537  /*------------------*/
538  /* cool down period */
539  /*------------------*/
540  periodStop = userGetTime() + (double)data[0].coolDownSeconds;
541  while(userGetTime() < periodStop){
542  doOneTransaction(startUp, parallellism,
543  millisSendPoll, minEventSendPoll, forceSendPoll);
544  }
545 
546  done = 0;
547  while(!done){
548  done = 1;
549  for(i = 0; i<parallellism; i++){
550  if(startUp[i].runState != Runnable){
551  done = 0;
552  break;
553  }
554  }
555  if(!done){
556  startUp[0].pNDB->sendPollNdb();
557  }
558  }
559 
560 
561  /*---------------------------------------------------------*/
562  /* add the times for all transaction for inner loop timing */
563  /*---------------------------------------------------------*/
564  for(j = 0; j<parallellism; j++){
565  st = &data[j].generator;
566 
567  st->outerLoopTime = benchTimeEnd - benchTimeStart;
568  st->outerTps = getTps(st->totalTransactions, st->outerLoopTime);
569  }
570  /* ndbout_c("maxsize = %d\n",maxsize); */
571 
572  free(startUp);
573 }
574