MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
basicTransporterTest.cpp
1 /*
2  Copyright (C) 2003-2006 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <ndb_global.h>
20 
21 #include "TransporterRegistry.hpp"
22 #include "TransporterDefinitions.hpp"
23 #include "TransporterCallback.hpp"
24 #include <RefConvert.hpp>
25 
26 #include <NdbTick.h>
27 #include <NdbMain.h>
28 #include <NdbOut.hpp>
29 #include <NdbSleep.h>
30 
31 int basePortTCP = 17000;
32 
33 SCI_TransporterConfiguration sciTemplate = {
34  8000,
35  // Packet size
36  2500000, // Buffer size
37  2, // number of adapters
38  1, // remote node id SCI
39  2, // Remote node Id SCI
40  0, // local ndb node id (server)
41  0, // remote ndb node id (client)
42  0, // byteOrder;
43  false, // compression;
44  true, // checksum;
45  true // signalId;
46 };
47 
48 TCP_TransporterConfiguration tcpTemplate = {
49  17000, // port;
50  "", // remoteHostName;
51  "", // localhostname
52  2, // remoteNodeId;
53  1, // localNodeId;
54  10000, // sendBufferSize - Size of SendBuffer of priority B
55  10000, // maxReceiveSize - Maximum no of bytes to receive
56  0, // byteOrder;
57  false, // compression;
58  true, // checksum;
59  true // signalId;
60 };
61 
62 SHM_TransporterConfiguration shmTemplate = {
63  0, //remoteNodeId
64  0, //localNodeId;
65  false, //compression
66  true, //checksum;
67  true, //signalId;
68  0, //byteOrder;
69  123, //shmKey;
70  2500000 //shmSize;
71 };
72 
73 TransporterRegistry *tReg = 0;
74 
75 #include <signal.h>
76 
77 extern "C"
78 void
79 signalHandler(int signo){
80  ::signal(13, signalHandler);
81  char buf[255];
82  sprintf(buf,"Signal: %d\n", signo);
83  ndbout << buf << endl;
84 }
85 
86 void
87 usage(const char * progName){
88  ndbout << "Usage: " << progName << " <type> localNodeId localHostName"
89  << " remoteHostName1 remoteHostName2" << endl;
90  ndbout << " type = shm tcp ose sci" << endl;
91  ndbout << " localNodeId - 1 to 3" << endl;
92 }
93 
94 typedef void (* CreateTransporterFunc)(void * conf,
95  NodeId localNodeId,
96  NodeId remoteNodeId,
97  const char * localHostName,
98  const char * remoteHostName);
99 
100 void createSCITransporter(void *, NodeId, NodeId, const char *, const char *);
101 void createTCPTransporter(void *, NodeId, NodeId, const char *, const char *);
102 void createSHMTransporter(void *, NodeId, NodeId, const char *, const char *);
103 
104 int signalReceived[4];
105 
106 int
107 main(int argc, const char **argv){
108 
109  signalHandler(0);
110 
111  for(int i = 0; i<4; i++)
112  signalReceived[i] = 0;
113 
114  if(argc < 5){
115  usage(argv[0]);
116  return 0;
117  }
118 
119  Uint32 noOfConnections = 0;
120  const char * progName = argv[0];
121  const char * type = argv[1];
122  const NodeId localNodeId = atoi(argv[2]);
123  const char * localHostName = argv[3];
124  const char * remoteHost1 = argv[4];
125  const char * remoteHost2 = NULL;
126 
127  if(argc == 5)
128  noOfConnections = 1;
129  else {
130  noOfConnections = 2;
131  remoteHost2 = argv[5];
132  }
133 
134  if(localNodeId < 1 || localNodeId > 3){
135  ndbout << "localNodeId = " << localNodeId << endl << endl;
136  usage(progName);
137  return 0;
138  }
139 
140  ndbout << "-----------------" << endl;
141  ndbout << "localNodeId: " << localNodeId << endl;
142  ndbout << "localHostName: " << localHostName << endl;
143  ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
144  << remoteHost1 << endl;
145  if(noOfConnections == 2){
146  ndbout << "remoteHost2 (node " << (localNodeId == 3?2:3) << "): "
147  << remoteHost2 << endl;
148  }
149  ndbout << "-----------------" << endl;
150 
151  void * confTemplate = 0;
152  CreateTransporterFunc func = 0;
153 
154  if(strcasecmp(type, "tcp") == 0){
155  func = createTCPTransporter;
156  confTemplate = &tcpTemplate;
157  } else if(strcasecmp(type, "sci") == 0){
158  func = createSCITransporter;
159  confTemplate = &sciTemplate;
160  } else if(strcasecmp(type, "shm") == 0){
161  func = createSHMTransporter;
162  confTemplate = &shmTemplate;
163  } else {
164  ndbout << "Unsupported transporter type" << endl;
165  return 0;
166  }
167 
168  ndbout << "Creating transporter registry" << endl;
169  tReg = new TransporterRegistry;
170  tReg->init(localNodeId);
171 
172  switch(localNodeId){
173  case 1:
174  (* func)(confTemplate, 1, 2, localHostName, remoteHost1);
175  if(noOfConnections == 2)
176  (* func)(confTemplate, 1, 3, localHostName, remoteHost2);
177  break;
178  case 2:
179  (* func)(confTemplate, 2, 1, localHostName, remoteHost1);
180  if(noOfConnections == 2)
181  (* func)(confTemplate, 2, 3, localHostName, remoteHost2);
182  break;
183  case 3:
184  (* func)(confTemplate, 3, 1, localHostName, remoteHost1);
185  if(noOfConnections == 2)
186  (* func)(confTemplate, 3, 2, localHostName, remoteHost2);
187  break;
188  }
189 
190  ndbout << "Doing startSending/startReceiving" << endl;
191  tReg->startSending();
192  tReg->startReceiving();
193 
194  ndbout << "Connecting" << endl;
195  tReg->setPerformState(PerformConnect);
196  tReg->checkConnections();
197 
198  unsigned sum = 0;
199  do {
200  sum = 0;
201  for(int i = 0; i<4; i++)
202  sum += signalReceived[i];
203 
204  tReg->checkConnections();
205 
206  tReg->external_IO(500);
207  NdbSleep_MilliSleep(500);
208 
209  ndbout << "In main loop" << endl;
210  } while(sum != 2*noOfConnections);
211 
212  ndbout << "Doing setPerformState(Disconnect)" << endl;
213  tReg->setPerformState(PerformDisconnect);
214 
215  ndbout << "Doing checkConnections()" << endl;
216  tReg->checkConnections();
217 
218  ndbout << "Sleeping 3 secs" << endl;
219  NdbSleep_SecSleep(3);
220 
221  ndbout << "Deleting transporter registry" << endl;
222  delete tReg; tReg = 0;
223 
224  return 0;
225 }
226 
227 void
228 checkData(SignalHeader * const header, Uint8 prio, Uint32 * const theData,
229  LinearSectionPtr ptr[3]){
230  Uint32 expectedLength = 0;
231  if(prio == 0)
232  expectedLength = 17;
233  else
234  expectedLength = 19;
235 
236  if(header->theLength != expectedLength){
237  ndbout << "Unexpected signal length: " << header->theLength
238  << " expected: " << expectedLength << endl;
239  abort();
240  }
241 
242  if(header->theVerId_signalNumber != expectedLength + 1)
243  abort();
244 
245  if(header->theReceiversBlockNumber != expectedLength + 2)
246  abort();
247 
248  if(refToBlock(header->theSendersBlockRef) != expectedLength + 3)
249  abort();
250 
251  if(header->theSendersSignalId != expectedLength + 5)
252  abort();
253 
254  if(header->theTrace != expectedLength + 6)
255  abort();
256 
257  if(header->m_noOfSections != (prio == 0 ? 0 : 1))
258  abort();
259 
260  if(header->m_fragmentInfo != (prio + 1))
261  abort();
262 
263  Uint32 dataWordStart = header->theLength ;
264  for(unsigned i = 0; i<header->theLength; i++){
265  if(theData[i] != i){ //dataWordStart){
266  ndbout << "data corrupt!\n" << endl;
267  abort();
268  }
269  dataWordStart ^= (~i*i);
270  }
271 
272  if(prio != 0){
273  ndbout_c("Found section");
274  if(ptr[0].sz != header->theLength)
275  abort();
276 
277  if(memcmp(ptr[0].p, theData, (ptr[0].sz * 4)) != 0)
278  abort();
279  }
280 }
281 
282 void
283 sendSignalTo(NodeId nodeId, int prio){
284  SignalHeader sh;
285  sh.theLength = (prio == 0 ? 17 : 19);
286  sh.theVerId_signalNumber = sh.theLength + 1;
287  sh.theReceiversBlockNumber = sh.theLength + 2;
288  sh.theSendersBlockRef = sh.theLength + 3;
289  sh.theSendersSignalId = sh.theLength + 4;
290  sh.theSignalId = sh.theLength + 5;
291  sh.theTrace = sh.theLength + 6;
292  sh.m_noOfSections = (prio == 0 ? 0 : 1);
293  sh.m_fragmentInfo = prio + 1;
294 
295  Uint32 theData[25];
296 
297  Uint32 dataWordStart = sh.theLength;
298  for(unsigned i = 0; i<sh.theLength; i++){
299  theData[i] = i;
300  dataWordStart ^= (~i*i);
301  }
302  ndbout << "Sending prio " << (int)prio << " signal to node: "
303  << nodeId
304  << " gsn = " << sh.theVerId_signalNumber << endl;
305 
306  LinearSectionPtr ptr[3];
307  ptr[0].p = &theData[0];
308  ptr[0].sz = sh.theLength;
309 
310  SendStatus s = tReg->prepareSend(&sh, prio, theData, nodeId, ptr);
311  if(s != SEND_OK){
312  ndbout << "Send was not ok. Send was: " << s << endl;
313  }
314 }
315 
316 void
317 execute(void* callbackObj,
318  SignalHeader * const header, Uint8 prio, Uint32 * const theData,
319  LinearSectionPtr ptr[3]){
320  const NodeId nodeId = refToNode(header->theSendersBlockRef);
321 
322  ndbout << "Recieved prio " << (int)prio << " signal from node: "
323  << nodeId
324  << " gsn = " << header->theVerId_signalNumber << endl;
325  checkData(header, prio, theData, ptr);
326  ndbout << " Data is ok!\n" << endl;
327 
328  signalReceived[nodeId]++;
329 
330  if(prio == 0)
331  sendSignalTo(nodeId, 1);
332  else
333  tReg->setPerformState(nodeId, PerformDisconnect);
334 }
335 
336 void
337 copy(Uint32 * & insertPtr,
338  class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
339  abort();
340 }
341 
342 void
343 reportError(void* callbackObj, NodeId nodeId, TransporterError errorCode){
344  char buf[255];
345  sprintf(buf, "reportError (%d, %x)", nodeId, errorCode);
346  ndbout << buf << endl;
347  if(errorCode & 0x8000){
348  tReg->setPerformState(nodeId, PerformDisconnect);
349  abort();
350  }
351 }
352 
356 void
357 reportSendLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
358  char buf[255];
359  sprintf(buf, "reportSendLen(%d, %d)", nodeId, (Uint32)(bytes/count));
360  ndbout << buf << endl;
361 }
362 
366 void
367 reportReceiveLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
368  char buf[255];
369  sprintf(buf, "reportReceiveLen(%d, %d)", nodeId, (Uint32)(bytes/count));
370  ndbout << buf << endl;
371 }
372 
376 void
377 reportConnect(void* callbackObj, NodeId nodeId){
378  char buf[255];
379  sprintf(buf, "reportConnect(%d)", nodeId);
380  ndbout << buf << endl;
381  tReg->setPerformState(nodeId, PerformIO);
382 
383  sendSignalTo(nodeId, 0);
384 }
385 
389 void
390 reportDisconnect(void* callbackObj, NodeId nodeId, Uint32 errNo){
391  char buf[255];
392  sprintf(buf, "reportDisconnect(%d)", nodeId);
393  ndbout << buf << endl;
394  if(signalReceived[nodeId] < 2)
395  tReg->setPerformState(nodeId, PerformConnect);
396 }
397 
398 int
399 checkJobBuffer() {
404  return 0;
405 }
406 
407 void
408 createOSETransporter(void * _conf,
409  NodeId localNodeId,
410  NodeId remoteNodeId,
411  const char * localHostName,
412  const char * remoteHostName){
413  ndbout << "Creating OSE transporter from node "
414  << localNodeId << "(" << localHostName << ") to "
415  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
416 
417  OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;
418 
419  conf->localNodeId = localNodeId;
420  conf->localHostName = localHostName;
421  conf->remoteNodeId = remoteNodeId;
422  conf->remoteHostName = remoteHostName;
423  bool res = tReg->createTransporter(conf);
424  if(res)
425  ndbout << "... -- Success " << endl;
426  else
427  ndbout << "... -- Failure " << endl;
428 }
429 
430 void
431 createTCPTransporter(void * _conf,
432  NodeId localNodeId,
433  NodeId remoteNodeId,
434  const char * localHostName,
435  const char * remoteHostName){
436  ndbout << "Creating TCP transporter from node "
437  << localNodeId << "(" << localHostName << ") to "
438  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
439 
440  TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
441 
442  int port;
443  if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
444  if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
445  if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
446  if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
447  if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
448  if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
449 
450  conf->localNodeId = localNodeId;
451  conf->localHostName = localHostName;
452  conf->remoteNodeId = remoteNodeId;
453  conf->remoteHostName = remoteHostName;
454  conf->port = port;
455  bool res = tReg->createTransporter(conf);
456  if(res)
457  ndbout << "... -- Success " << endl;
458  else
459  ndbout << "... -- Failure " << endl;
460 }
461 
462 void
463 createSCITransporter(void * _conf,
464  NodeId localNodeId,
465  NodeId remoteNodeId,
466  const char * localHostName,
467  const char * remoteHostName){
468 
469 
470  ndbout << "Creating SCI transporter from node "
471  << localNodeId << "(" << localHostName << ") to "
472  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
473 
474 
475  SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
476 
477  conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
478  conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
479 
480 
481  conf->localNodeId = localNodeId;
482  conf->remoteNodeId = remoteNodeId;
483 
484  bool res = tReg->createTransporter(conf);
485  if(res)
486  ndbout << "... -- Success " << endl;
487  else
488  ndbout << "... -- Failure " << endl;
489 }
490 
491 void
492 createSHMTransporter(void * _conf,
493  NodeId localNodeId,
494  NodeId remoteNodeId,
495  const char * localHostName,
496  const char * remoteHostName){
497 
498 
499  ndbout << "Creating SHM transporter from node "
500  << localNodeId << "(" << localHostName << ") to "
501  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
502 
503 
504  SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
505 
506  conf->localNodeId = localNodeId;
507  conf->remoteNodeId = remoteNodeId;
508 
509  bool res = tReg->createTransporter(conf);
510  if(res)
511  ndbout << "... -- Success " << endl;
512  else
513  ndbout << "... -- Failure " << endl;
514 }