MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
prioTransporterTest.cpp
1 /*
2  Copyright (C) 2003-2006 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <ndb_global.h>
20 
21 #include "TransporterRegistry.hpp"
22 #include "TransporterDefinitions.hpp"
23 #include "TransporterCallback.hpp"
24 #include <RefConvert.hpp>
25 
26 #include "prioTransporterTest.hpp"
27 
28 #include <NdbTick.h>
29 #include <NdbMain.h>
30 #include <NdbOut.hpp>
31 #include <NdbSleep.h>
32 
33 int basePortTCP = 17000;
34 
35 SCI_TransporterConfiguration sciTemplate = {
36  2000,
37  // Packet size
38  2000000, // Buffer size
39  2, // number of adapters
40  1, // remote node id SCI
41  2, // Remote node Id SCI
42  0, // local ndb node id (server)
43  0, // remote ndb node id (client)
44  0, // byteOrder;
45  false, // compression;
46  true, // checksum;
47  true // signalId;
48 };
49 
50 
51 SHM_TransporterConfiguration shmTemplate = {
52  100000, // shmSize
53  0, // shmKey
54  1, // local ndb node id (server)
55  2, // remote ndb node id (client)
56  0, // byteOrder;
57  false, // compression;
58  true, // checksum;
59  true // signalId;
60 };
61 
62 TCP_TransporterConfiguration tcpTemplate = {
63  17000, // port;
64  "", // remoteHostName;
65  "", // localhostname
66  2, // remoteNodeId;
67  1, // localNodeId;
68  2000000, // sendBufferSize - Size of SendBuffer of priority B
69  2000, // maxReceiveSize - Maximum no of bytes to receive
70  0, // byteOrder;
71  false, // compression;
72  true, // checksum;
73  true // signalId;
74 };
75 
76 TransporterRegistry *tReg = 0;
77 
78 #include <signal.h>
79 
80 extern "C"
81 void
82 signalHandler(int signo){
83  ::signal(13, signalHandler);
84  char buf[255];
85  sprintf(buf,"Signal: %d\n", signo);
86  ndbout << buf << endl;
87 }
88 
89 void
90 usage(const char * progName){
91  ndbout << "Usage: " << progName << " localNodeId localHostName"
92  << " remoteHostName"
93  << " [<loop count>] [<send buf size>] [<recv buf size>]" << endl;
94  ndbout << " localNodeId - {1,2}" << endl;
95 }
96 
97 typedef void (* CreateTransporterFunc)(void * conf,
98  NodeId localNodeId,
99  NodeId remoteNodeId,
100  const char * localHostName,
101  const char * remoteHostName,
102  int sendBuf,
103  int recvBuf);
104 
105 void
106 createSCITransporter(void * _conf,
107  NodeId localNodeId,
108  NodeId remoteNodeId,
109  const char * localHostName,
110  const char * remoteHostName,
111  int sendbuf,
112  int recvbuf) {
113 
114 
115  ndbout << "Creating SCI transporter from node "
116  << localNodeId << "(" << localHostName << ") to "
117  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
118 
119 
120  SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
121 
122  conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
123  conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
124 
125 
126  conf->localNodeId = localNodeId;
127  conf->remoteNodeId = remoteNodeId;
128 
129  bool res = tReg->createTransporter(conf);
130  if(res)
131  ndbout << "... -- Success " << endl;
132  else
133  ndbout << "... -- Failure " << endl;
134 }
135 
136 void
137 createSHMTransporter(void * _conf,
138  NodeId localNodeId,
139  NodeId remoteNodeId,
140  const char * localHostName,
141  const char * remoteHostName,
142  int sendbuf,
143  int recvbuf) {
144 
145 
146  ndbout << "Creating SHM transporter from node "
147  << localNodeId << "(" << localHostName << ") to "
148  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
149 
150 
151  SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
152 
153 
154  conf->localNodeId = localNodeId;
155  conf->remoteNodeId = remoteNodeId;
156 
157  bool res = tReg->createTransporter(conf);
158  if(res)
159  ndbout << "... -- Success " << endl;
160  else
161  ndbout << "... -- Failure " << endl;
162 }
163 
164 
165 void
166 createTCPTransporter(void * _conf,
167  NodeId localNodeId,
168  NodeId remoteNodeId,
169  const char * localHostName,
170  const char * remoteHostName,
171  int sendBuf,
172  int recvBuf){
173  ndbout << "Creating TCP transporter from node "
174  << localNodeId << "(" << localHostName << ") to "
175  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
176 
177  TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
178 
179  int port;
180  if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
181  if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
182  if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
183  if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
184  if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
185  if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
186 
187  if(sendBuf != -1){
188  conf->sendBufferSize = sendBuf;
189  }
190  if(recvBuf != -1){
191  conf->maxReceiveSize = recvBuf;
192  }
193 
194  ndbout << "\tSendBufferSize: " << conf->sendBufferSize << endl;
195  ndbout << "\tReceiveBufferSize: " << conf->maxReceiveSize << endl;
196 
197  conf->localNodeId = localNodeId;
198  conf->localHostName = localHostName;
199  conf->remoteNodeId = remoteNodeId;
200  conf->remoteHostName = remoteHostName;
201  conf->port = port;
202  bool res = tReg->createTransporter(conf);
203  if(res)
204  ndbout << "... -- Success " << endl;
205  else
206  ndbout << "... -- Failure " << endl;
207 }
208 
209 struct TestPhase {
210  int signalSize;
211  int noOfSignals;
212  int noOfSignalSent;
213  int noOfSignalReceived;
214  NDB_TICKS startTime;
215  NDB_TICKS stopTime;
216 
217  NDB_TICKS startTimePrioA;
218  NDB_TICKS stopTimePrioA;
219  NDB_TICKS totTimePrioA;
220  int bytesSentBeforePrioA;
221  NDB_TICKS accTime;
222  int loopCount;
223  Uint64 sendLenBytes, sendCount;
224  Uint64 recvLenBytes, recvCount;
225 };
226 
227 TestPhase testSpec[] = {
228  { 1, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
229  ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
230  ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
231  ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
232 
233  ,{ 8, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
234  ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
235  ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
236  ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
237 
238  ,{ 16, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
239  ,{ 16, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
240  ,{ 16, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
241  ,{ 16, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
242 
243  ,{ 24, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
244  ,{ 24, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
245  ,{ 24, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
246  ,{ 24, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
247 
248  ,{ 0, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of random size
249  ,{ 0, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of random size
250  ,{ 0, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of random size
251  ,{ 0, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of random size
252 };
253 
254 const int noOfTests = sizeof(testSpec)/sizeof(TestPhase);
255 
256 SendStatus
257 sendSignalTo(NodeId nodeId, int signalSize, int prio){
258  if(signalSize == 0)
259  signalSize = (rand() % 25) + 1;
260 
261  SignalHeader sh;
262  sh.theLength = signalSize;
263  sh.theVerId_signalNumber = rand();
264  sh.theReceiversBlockNumber = rand();
265  sh.theSendersBlockRef = rand();
266  sh.theSendersSignalId = rand();
267  sh.theSignalId = rand();
268  sh.theTrace = rand();
269 
270  Uint32 theData[25];
271  for(int i = 0; i<signalSize; i++)
272  theData[i] = (i+1) * (Uint32)(&theData[i]);
273 
274  return tReg->prepareSend(&sh, prio, theData, nodeId);
275 }
276 
277 void
278 reportHeader(){
279  ndbout << "#Sigs\tSz\tPayload\tTime\tSig/sec\tBps\t"
280  << "s len\tr len\tprioAtime\tbytesb4pA" << endl;
281 }
282 
283 void
284 printReport(TestPhase & p){
285  if(p.accTime > 0) {
286  Uint32 secs = (p.accTime/p.loopCount)/1000;
287  Uint32 mill = (p.accTime/p.loopCount)%1000;
288  char st[255];
289  if(secs > 0){
290  sprintf(st, "%d.%.2ds", secs, (mill/10));
291  } else {
292  sprintf(st, "%dms", mill);
293  }
294 
295  Uint32 sps = (1000*p.noOfSignals*p.loopCount)/p.accTime;
296  Uint32 bps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(p.signalSize+3));
297  if(p.signalSize == 0)
298  ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(13+3));
299 
300  char ssps[255];
301  if(sps > 1000000){
302  sps /= 1000000;
303  sprintf(ssps, "%dM", (int)sps);
304  } else if(sps > 1000){
305  sps /= 1000;
306  sprintf(ssps, "%dk", (int)sps);
307  } else {
308  sprintf(ssps, "%d", (int)sps);
309  }
310 
311  char sbps[255];
312  if(bps > 1000000){
313  bps /= 1000000;
314  sprintf(sbps, "%dM", bps);
315  } else if(bps>1000){
316  bps /= 1000;
317  sprintf(sbps, "%dk", bps);
318  } else {
319  sprintf(sbps, "%d", bps);
320  }
321 
322  char buf[255];
323  if(p.signalSize != 0){
324  BaseString::snprintf(buf, 255,
325  "%d\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d",
326  p.noOfSignals,
327  p.signalSize,
328  (4*p.signalSize),
329  st,
330  ssps,
331  sbps,
332  (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
333  (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
334  (int)(p.totTimePrioA / p.loopCount),
335  (int)(p.bytesSentBeforePrioA));
336  } else {
337  BaseString::snprintf(buf, 255,
338  "%d\trand\t4*rand\t%s\t%s\t%s\t%d\t%d\t%d\t%d",
339  p.noOfSignals,
340  st,
341  ssps,
342  sbps,
343  (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
344  (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
345  (int)(p.totTimePrioA / p.loopCount),
346  (int)(p.bytesSentBeforePrioA));
347 
348  }
349  ndbout << buf << endl;
350  }
351 }
352 
353 int loopCount = 1;
354 int sendBufSz = -1;
355 int recvBufSz = -1;
356 
357 NDB_TICKS startSec=0;
358 NDB_TICKS stopSec=0;
359 Uint32 startMicro=0;
360 Uint32 stopMicro=0;
361 int timerStarted;
362 int timerStopped;
363 
364 bool isClient = false;
365 bool isConnected = false;
366 bool isStarted = false;
367 int currentPhase = 0;
368 TestPhase allPhases[noOfTests];
369 Uint32 signalToEcho;
370 NDB_TICKS startTime, stopTime;
371 
372 void
373 client(NodeId remoteNodeId){
374  isClient = true;
375 
376  currentPhase = 0;
377  memcpy(allPhases, testSpec, sizeof(testSpec));
378 
379  int counter = 0;
380 
381  while(true){
382  TestPhase * current = &allPhases[currentPhase];
383  if(current->noOfSignals == current->noOfSignalSent &&
384  current->noOfSignals == current->noOfSignalReceived){
385 
389  current->stopTime = NdbTick_CurrentMillisecond();
390  current->accTime += (current->stopTime - current->startTime);
391 
392  NdbSleep_MilliSleep(500 / loopCount);
393 
394  current->startTime = NdbTick_CurrentMillisecond();
395 
396  current->noOfSignalSent = 0;
397  current->noOfSignalReceived = 0;
398 
399  current->loopCount ++;
400  if(current->loopCount == loopCount){
401 
402  printReport(allPhases[currentPhase]);
403 
404  currentPhase ++;
405  if(currentPhase == noOfTests){
409  break;
410  }
411  NdbSleep_MilliSleep(500);
412  current = &allPhases[currentPhase];
413  current->startTime = NdbTick_CurrentMillisecond();
414  }
415  }
416  int signalsLeft = current->noOfSignals - current->noOfSignalSent;
417  if(signalsLeft > 0){
418  for(; signalsLeft > 1; signalsLeft--){
419  if(sendSignalTo(remoteNodeId, current->signalSize, 1) == SEND_OK) {
420  current->noOfSignalSent++;
421  // ndbout << "sent prio b" << endl;
422  current->bytesSentBeforePrioA += (current->signalSize << 2);
423  }
424  else {
425  tReg->external_IO(10);
426  break;
427  }
428  }
429  //prio A
430  if(signalsLeft==1) {
431  NDB_TICKS sec = 0;
432  Uint32 micro=0;
433  int ret = NdbTick_CurrentMicrosecond(&sec,&micro);
434  if(ret==0)
435  current->startTimePrioA = micro + sec*1000000;
436  if(sendSignalTo(remoteNodeId, current->signalSize, 0) == SEND_OK) {
437  current->noOfSignalSent++;
438  signalsLeft--;
439  }
440  else {
441  tReg->external_IO(10);
442  break;
443  }
444  }
445  }
446 
447  if(counter % 10 == 0)
448  tReg->checkConnections();
449  tReg->external_IO(0);
450  counter++;
451  }
452 }
453 
454 void
455 server(){
456  isClient = false;
457 
458  signalToEcho = 0;
459  for(int i = 0; i<noOfTests; i++)
460  signalToEcho += testSpec[i].noOfSignals;
461 
462  signalToEcho *= loopCount;
463 
464  while(signalToEcho > 0){
465  tReg->checkConnections();
466  for(int i = 0; i<10; i++)
467  tReg->external_IO(10);
468  }
469 }
470 
471 int
472 prioTransporterTest(TestType tt, const char * progName,
473  int argc, const char **argv){
474 
475  loopCount = 100;
476  sendBufSz = -1;
477  recvBufSz = -1;
478 
479  isClient = false;
480  isConnected = false;
481  isStarted = false;
482  currentPhase = 0;
483 
484  signalHandler(0);
485 
486  if(argc < 4){
487  usage(progName);
488  return 0;
489  }
490 
491  const NodeId localNodeId = atoi(argv[1]);
492  const char * localHostName = argv[2];
493  const char * remoteHost1 = argv[3];
494 
495  if(argc >= 5)
496  loopCount = atoi(argv[4]);
497  if(argc >= 6)
498  sendBufSz = atoi(argv[5]);
499  if(argc >= 7)
500  recvBufSz = atoi(argv[6]);
501 
502  if(localNodeId < 1 || localNodeId > 2){
503  ndbout << "localNodeId = " << localNodeId << endl << endl;
504  usage(progName);
505  return 0;
506  }
507 
508  if(localNodeId == 1)
509  ndbout << "-- ECHO CLIENT --" << endl;
510  else
511  ndbout << "-- ECHO SERVER --" << endl;
512 
513  ndbout << "localNodeId: " << localNodeId << endl;
514  ndbout << "localHostName: " << localHostName << endl;
515  ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
516  << remoteHost1 << endl;
517  ndbout << "Loop count: " << loopCount << endl;
518  ndbout << "-----------------" << endl;
519 
520  void * confTemplate = 0;
521  CreateTransporterFunc func = 0;
522  switch(tt){
523  case TestTCP:
524  func = createTCPTransporter;
525  confTemplate = &tcpTemplate;
526  break;
527  case TestSCI:
528  func = createSCITransporter;
529  confTemplate = &sciTemplate;
530  break;
531  case TestSHM:
532  func = createSHMTransporter;
533  confTemplate = &shmTemplate;
534  break;
535  default:
536  ndbout << "Unsupported transporter type" << endl;
537  return 0;
538  }
539 
540  ndbout << "Creating transporter registry" << endl;
541  tReg = new TransporterRegistry;
542  tReg->init(localNodeId);
543 
544  switch(localNodeId){
545  case 1:
546  (* func)(confTemplate, 1, 2, localHostName, remoteHost1,
547  sendBufSz, recvBufSz);
548  break;
549  case 2:
550  (* func)(confTemplate, 2, 1, localHostName, remoteHost1,
551  sendBufSz, recvBufSz);
552  break;
553  }
554 
555  ndbout << "Doing startSending/startReceiving" << endl;
556  tReg->startSending();
557  tReg->startReceiving();
558 
559  ndbout << "Connecting" << endl;
560  tReg->setPerformState(PerformConnect);
561  tReg->checkConnections();
562 
563  if(localNodeId == 1)
564  client(2);
565  else
566  server();
567 
568  isStarted = false;
569 
570  ndbout << "Sleep 3 secs" << endl;
571  NdbSleep_SecSleep(3);
572 
573  ndbout << "Doing setPerformState(Disconnect)" << endl;
574  tReg->setPerformState(PerformDisconnect);
575 
576  ndbout << "Doing checkConnections()" << endl;
577  tReg->checkConnections();
578 
579  ndbout << "Deleting transporter registry" << endl;
580  delete tReg; tReg = 0;
581 
582  return 0;
583 }
584 
585 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
586  out << "-- Signal Header --" << endl;
587  out << "theLength: " << sh.theLength << endl;
588  out << "gsn: " << sh.theVerId_signalNumber << endl;
589  out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
590  out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
591  out << "sendersSig: " << sh.theSendersSignalId << endl;
592  out << "theSignalId: " << sh.theSignalId << endl;
593  out << "trace: " << (int)sh.theTrace << endl;
594  return out;
595 }
596 
597 void
598 execute(SignalHeader * const header, Uint8 prio, Uint32 * const theData){
599  const NodeId nodeId = refToNode(header->theSendersBlockRef);
600  NDB_TICKS sec = 0;
601  Uint32 micro=0;
602  int ret = NdbTick_CurrentMicrosecond(&sec,&micro);
603  if(prio == 0 && isClient && ret == 0) {
604  allPhases[currentPhase].stopTimePrioA = micro + sec*1000000;
605  allPhases[currentPhase].totTimePrioA +=
606  allPhases[currentPhase].stopTimePrioA -
607  allPhases[currentPhase].startTimePrioA;
608  }
609  if(ret!=0)
610  allPhases[currentPhase].totTimePrioA = -1;
611 
612  if(isClient){
613  allPhases[currentPhase].noOfSignalReceived++;
614  } else {
615  int sleepTime = 10;
616  while(tReg->prepareSend(header, prio, theData, nodeId) != SEND_OK){
617  ndbout << "Failed to echo" << sleepTime << endl;
618  NdbSleep_MilliSleep(sleepTime);
619  // sleepTime += 10;
620  }
621 
622  signalToEcho--;
623  }
624 }
625 
626 void
627 reportError(NodeId nodeId, TransporterError errorCode){
628  char buf[255];
629  sprintf(buf, "reportError (%d, %x) in perfTest", nodeId, errorCode);
630  ndbout << buf << endl;
631  if(errorCode & 0x8000){
632  tReg->setPerformState(nodeId, PerformDisconnect);
633  }
634 }
635 
639 void
640 reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes){
641  allPhases[currentPhase].sendCount += count;
642  allPhases[currentPhase].sendLenBytes += bytes;
643 
644  if(!isClient){
645  ndbout << "reportSendLen(" << nodeId << ", "
646  << (bytes/count) << ")" << endl;
647  }
648 }
649 
653 void
654 reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes){
655  allPhases[currentPhase].recvCount += count;
656  allPhases[currentPhase].recvLenBytes += bytes;
657 
658  if(!isClient){
659  ndbout << "reportReceiveLen(" << nodeId << ", "
660  << (bytes/count) << ")" << endl;
661  }
662 }
663 
667 void
668 reportConnect(NodeId nodeId){
669  char buf[255];
670  sprintf(buf, "reportConnect(%d)", nodeId);
671  ndbout << buf << endl;
672  tReg->setPerformState(nodeId, PerformIO);
673 
674  if(!isStarted){
675  isStarted = true;
676  startTime = NdbTick_CurrentMillisecond();
677  if(isClient){
678  reportHeader();
679  allPhases[0].startTime = startTime;
680  }
681  }
682  else{
683  // Resend signals that were lost when connection failed
684  TestPhase * current = &allPhases[currentPhase];
685  current->noOfSignalSent = current->noOfSignalReceived;
686  }
687 }
688 
692 void
693 reportDisconnect(NodeId nodeId, Uint32 errNo){
694  char buf[255];
695  sprintf(buf, "reportDisconnect(%d)", nodeId);
696  ndbout << buf << endl;
697 
698  if(isStarted)
699  tReg->setPerformState(nodeId, PerformConnect);
700 }
701 
702 
703 int
704 checkJobBuffer() {
709  return 0;
710 }