MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TransporterFacade.cpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 #include <ndb_limits.h>
20 #include "TransporterFacade.hpp"
21 #include "trp_client.hpp"
22 #include "ClusterMgr.hpp"
23 #include <IPCConfig.hpp>
24 #include <TransporterCallback.hpp>
25 #include <TransporterRegistry.hpp>
26 #include "NdbApiSignal.hpp"
27 #include "NdbWaiter.hpp"
28 #include <NdbOut.hpp>
29 #include <NdbEnv.h>
30 #include <NdbSleep.h>
31 
32 #include <kernel/GlobalSignalNumbers.h>
33 #include <mgmapi_config_parameters.h>
34 #include <mgmapi_configuration.hpp>
35 #include <NdbConfig.h>
36 #include <ndb_version.h>
37 #include <SignalLoggerManager.hpp>
38 #include <kernel/ndb_limits.h>
39 #include <signaldata/AlterTable.hpp>
40 #include <signaldata/SumaImpl.hpp>
41 #include <signaldata/AllocNodeId.hpp>
42 
43 //#define REPORT_TRANSPORTER
44 //#define API_TRACE
45 
46 static int numberToIndex(int number)
47 {
48  return number - MIN_API_BLOCK_NO;
49 }
50 
51 static int indexToNumber(int index)
52 {
53  return index + MIN_API_BLOCK_NO;
54 }
55 
56 #if defined DEBUG_TRANSPORTER
57 #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
58 #else
59 #define TRP_DEBUG(t)
60 #endif
61 
62 /*****************************************************************************
63  * Call back functions
64  *****************************************************************************/
65 void
67  TransporterError errorCode, const char *info)
68 {
69 #ifdef REPORT_TRANSPORTER
70  ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s",
71  (int)nodeId, (int)errorCode, info ? info : "");
72 #endif
73  if(errorCode & TE_DO_DISCONNECT) {
74  ndbout_c("reportError (%d, %d) %s", (int)nodeId, (int)errorCode,
75  info ? info : "");
76  doDisconnect(nodeId);
77  }
78 }
79 
83 void
84 TransporterFacade::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
85 {
86 #ifdef REPORT_TRANSPORTER
87  ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)",
88  (int)nodeId, (Uint32)(bytes/count));
89 #endif
90  (void)nodeId;
91  (void)count;
92  (void)bytes;
93 }
94 
98 void
99 TransporterFacade::reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes)
100 {
101 #ifdef REPORT_TRANSPORTER
102  ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)",
103  (int)nodeId, (Uint32)(bytes/count));
104 #endif
105  (void)nodeId;
106  (void)count;
107  (void)bytes;
108 }
109 
113 void
115 {
116 #ifdef REPORT_TRANSPORTER
117  ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
118 #endif
119  reportConnected(nodeId);
120 }
121 
125 void
126 TransporterFacade::reportDisconnect(NodeId nodeId, Uint32 error){
127 #ifdef REPORT_TRANSPORTER
128  ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
129 #endif
130  reportDisconnected(nodeId);
131 }
132 
133 void
135 {
136  hb_received(nodeId);
137 }
138 
139 /****************************************************************************
140  *
141  *****************************************************************************/
142 
146 int
148 {
149  return 0;
150 }
151 
152 #ifdef API_TRACE
153 static const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";
154 static const char * apiSignalLog = 0;
155 static SignalLoggerManager signalLogger;
156 static
157 inline
158 bool
159 setSignalLog(){
160  signalLogger.flushSignalLog();
161 
162  const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);
163  if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
164  return true;
165  } else if(tmp == 0 && apiSignalLog == 0){
166  return false;
167  } else if(tmp == 0 && apiSignalLog != 0){
168  signalLogger.setOutputStream(0);
169  apiSignalLog = tmp;
170  return false;
171  } else if(tmp !=0){
172  if (strcmp(tmp, "-") == 0)
173  signalLogger.setOutputStream(stdout);
174 #ifndef DBUG_OFF
175  else if (strcmp(tmp, "+") == 0)
176  signalLogger.setOutputStream(DBUG_FILE);
177 #endif
178  else
179  signalLogger.setOutputStream(fopen(tmp, "w"));
180  apiSignalLog = tmp;
181  return true;
182  }
183  return false;
184 }
185 inline
186 bool
187 TRACE_GSN(Uint32 gsn)
188 {
189  switch(gsn){
190 #ifndef TRACE_APIREGREQ
191  case GSN_API_REGREQ:
192  case GSN_API_REGCONF:
193  return false;
194 #endif
195 #if 1
196  case GSN_SUB_GCP_COMPLETE_REP:
197  case GSN_SUB_GCP_COMPLETE_ACK:
198  return false;
199 #endif
200  default:
201  return true;
202  }
203 }
204 #endif
205 
209 void
211  Uint8 prio, Uint32 * const theData,
212  LinearSectionPtr ptr[3])
213 {
214  Uint32 tRecBlockNo = header->theReceiversBlockNumber;
215 
216 #ifdef API_TRACE
217  if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
218  signalLogger.executeSignal(* header,
219  prio,
220  theData,
221  ownId(),
222  ptr, header->m_noOfSections);
223  signalLogger.flushSignalLog();
224  }
225 #endif
226 
227  if (tRecBlockNo >= MIN_API_BLOCK_NO)
228  {
229  trp_client * clnt = m_threads.get(tRecBlockNo);
230  if (clnt != 0)
231  {
243  NdbApiSignal tmpSignal(*header);
244  NdbApiSignal * tSignal = &tmpSignal;
245  tSignal->setDataPtr(theData);
246  clnt->trp_deliver_signal(tSignal, ptr);
247  }//if
248  }
249  else if (tRecBlockNo == API_PACKED)
250  {
257  Uint32 Tlength = header->theLength;
258  Uint32 Tsent = 0;
263  while (Tsent < Tlength) {
264  Uint32 Theader = theData[Tsent];
265  Tsent++;
266  Uint32 TpacketLen = (Theader & 0x1F) + 3;
267  tRecBlockNo = Theader >> 16;
268  if (TpacketLen <= 25)
269  {
270  if ((TpacketLen + Tsent) <= Tlength)
271  {
276  header->theLength = TpacketLen;
277  header->theReceiversBlockNumber = tRecBlockNo;
278  Uint32* tDataPtr = &theData[Tsent];
279  Tsent += TpacketLen;
280  if (tRecBlockNo >= MIN_API_BLOCK_NO)
281  {
282  trp_client * clnt = m_threads.get(tRecBlockNo);
283  if(clnt != 0)
284  {
285  NdbApiSignal tmpSignal(*header);
286  NdbApiSignal * tSignal = &tmpSignal;
287  tSignal->setDataPtr(tDataPtr);
288  clnt->trp_deliver_signal(tSignal, 0);
289  }
290  }
291  }
292  }
293  }
294  return;
295  }
296  else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
297  tRecBlockNo <= MAX_API_FIXED_BLOCK_NO)
298  {
299  Uint32 dynamic= m_fixed2dynamic[tRecBlockNo - MIN_API_FIXED_BLOCK_NO];
300  trp_client * clnt = m_threads.get(dynamic);
301  if (clnt != 0)
302  {
303  NdbApiSignal tmpSignal(*header);
304  NdbApiSignal * tSignal = &tmpSignal;
305  tSignal->setDataPtr(theData);
306  clnt->trp_deliver_signal(tSignal, ptr);
307  }//if
308  }
309  else
310  {
311  // Ignore all other block numbers.
312  if(header->theVerId_signalNumber != GSN_API_REGREQ)
313  {
314  TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
315  ndbout << "BLOCK NO: " << tRecBlockNo << " sig "
316  << header->theVerId_signalNumber << endl;
317  abort();
318  }
319  }
320 }
321 
322 // These symbols are needed, but not used in the API
323 void
325  const SegmentedSectionPtr ptr[3],
326  unsigned i){
327  abort();
328 }
329 
330 void
331 copy(Uint32 * & insertPtr,
332  class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
333  abort();
334 }
335 
343 int
345  const ndb_mgm_configuration* conf)
346 {
347  assert(theOwnId == 0);
348  theOwnId = nodeId;
349 
350 #if defined SIGPIPE && !defined _WIN32
351  (void)signal(SIGPIPE, SIG_IGN);
352 #endif
353 
354  theTransporterRegistry = new TransporterRegistry(this);
355  if (theTransporterRegistry == NULL)
356  return -1;
357 
358  if (!theTransporterRegistry->init(nodeId))
359  return -1;
360 
361  if (theClusterMgr == NULL)
362  theClusterMgr = new ClusterMgr(*this);
363 
364  if (theClusterMgr == NULL)
365  return -1;
366 
367  if (!configure(nodeId, conf))
368  return -1;
369 
370  if (!theTransporterRegistry->start_service(m_socket_server))
371  return -1;
372 
373  theReceiveThread = NdbThread_Create(runReceiveResponse_C,
374  (void**)this,
375  0, // Use default stack size
376  "ndb_receive",
377  NDB_THREAD_PRIO_LOW);
378 
379  theSendThread = NdbThread_Create(runSendRequest_C,
380  (void**)this,
381  0, // Use default stack size
382  "ndb_send",
383  NDB_THREAD_PRIO_LOW);
384 
385  theClusterMgr->startThread();
386 
387  return 0;
388 }
389 
396 void
398  DBUG_ENTER("TransporterFacade::stop_instance");
399  doStop();
400  DBUG_VOID_RETURN;
401 }
402 
403 void
404 TransporterFacade::doStop(){
405  DBUG_ENTER("TransporterFacade::doStop");
410  if (theClusterMgr != NULL) theClusterMgr->doStop();
411 
415  void *status;
416  theStopReceive = 1;
417  if (theReceiveThread) {
418  NdbThread_WaitFor(theReceiveThread, &status);
419  NdbThread_Destroy(&theReceiveThread);
420  }
421  if (theSendThread) {
422  NdbThread_WaitFor(theSendThread, &status);
423  NdbThread_Destroy(&theSendThread);
424  }
425  DBUG_VOID_RETURN;
426 }
427 
428 extern "C"
429 void*
430 runSendRequest_C(void * me)
431 {
432  ((TransporterFacade*) me)->threadMainSend();
433  return 0;
434 }
435 
436 void TransporterFacade::threadMainSend(void)
437 {
438  theTransporterRegistry->startSending();
439  if (theTransporterRegistry->start_clients() == 0){
440  ndbout_c("Unable to start theTransporterRegistry->start_clients");
441  exit(0);
442  }
443 
444  m_socket_server.startServer();
445 
446  while(!theStopReceive) {
447  NdbSleep_MilliSleep(10);
448  NdbMutex_Lock(theMutexPtr);
449  if (sendPerformedLastInterval == 0) {
450  theTransporterRegistry->performSend();
451  }
452  sendPerformedLastInterval = 0;
453  NdbMutex_Unlock(theMutexPtr);
454  }
455  theTransporterRegistry->stopSending();
456 
457  m_socket_server.stopServer();
458  m_socket_server.stopSessions(true);
459 
460  theTransporterRegistry->stop_clients();
461 }
462 
463 extern "C"
464 void*
465 runReceiveResponse_C(void * me)
466 {
467  ((TransporterFacade*) me)->threadMainReceive();
468  return 0;
469 }
470 
471 /*
472  The receiver thread is changed to only wake up once every 10 milliseconds
473  to poll. It will first check that nobody owns the poll "right" before
474  polling. This means that methods using the receiveResponse and
475  sendRecSignal will have a slightly longer response time if they are
476  executed without any parallel key lookups. Currently also scans are
477  affected but this is to be fixed.
478 */
479 void TransporterFacade::threadMainReceive(void)
480 {
481  theTransporterRegistry->startReceiving();
482 #ifdef NDB_SHM_TRANSPORTER
483  NdbThread_set_shm_sigmask(TRUE);
484 #endif
485  while(!theStopReceive)
486  {
487  theClusterMgr->lock();
488  theTransporterRegistry->update_connections();
489  theClusterMgr->unlock();
490  NdbSleep_MilliSleep(100);
491  }//while
492  theTransporterRegistry->stopReceiving();
493 }
494 /*
495  This method is called by worker thread that owns the poll "rights".
496  It waits for events and if something arrives it takes care of it
497  and returns to caller. It will quickly come back here if not all
498  data was received for the worker thread.
499 */
500 void TransporterFacade::external_poll(Uint32 wait_time)
501 {
502  NdbMutex_Unlock(theMutexPtr);
503 
504 #ifdef NDB_SHM_TRANSPORTER
505  /*
506  If shared memory transporters are used we need to set our sigmask
507  such that we wake up also on interrupts on the shared memory
508  interrupt signal.
509  */
510  NdbThread_set_shm_sigmask(FALSE);
511 #endif
512 
513  const int res = theTransporterRegistry->pollReceive(wait_time);
514 
515 #ifdef NDB_SHM_TRANSPORTER
516  NdbThread_set_shm_sigmask(TRUE);
517 #endif
518 
519  NdbMutex_Lock(theMutexPtr);
520  if (res > 0)
521  {
522  theTransporterRegistry->performReceive();
523  }
524 }
525 
526 TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
527  m_poll_owner(NULL),
528  m_poll_queue_head(NULL),
529  m_poll_queue_tail(NULL),
530  theTransporterRegistry(0),
531  theOwnId(0),
532  theStartNodeId(1),
533  theClusterMgr(NULL),
534  checkCounter(4),
535  currentSendLimit(1),
536  theStopReceive(0),
537  theSendThread(NULL),
538  theReceiveThread(NULL),
539  m_fragmented_signal_id(0),
540  m_globalDictCache(cache)
541 {
542  DBUG_ENTER("TransporterFacade::TransporterFacade");
543  theMutexPtr = NdbMutex_CreateWithName("TTFM");
544  sendPerformedLastInterval = 0;
545 
546  for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
547  m_fixed2dynamic[i]= RNIL;
548 
549 #ifdef API_TRACE
550  apiSignalLog = 0;
551 #endif
552 
553  theClusterMgr = new ClusterMgr(*this);
554 
555  DBUG_VOID_RETURN;
556 }
557 
558 
559 /* Return true if node with "nodeId" is a MGM node */
560 static bool is_mgmd(Uint32 nodeId,
561  const ndb_mgm_configuration * conf)
562 {
563  ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_NODE);
564  if (iter.find(CFG_NODE_ID, nodeId))
565  abort();
566  Uint32 type;
567  if(iter.get(CFG_TYPE_OF_SECTION, &type))
568  abort();
569 
570  return (type == NODE_TYPE_MGM);
571 }
572 
573 
574 bool
575 TransporterFacade::do_connect_mgm(NodeId nodeId,
576  const ndb_mgm_configuration* conf)
577 {
578  // Allow other MGM nodes to connect
579  DBUG_ENTER("TransporterFacade::do_connect_mgm");
580  ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_CONNECTION);
581  for(iter.first(); iter.valid(); iter.next())
582  {
583  Uint32 nodeId1, nodeId2;
584  if (iter.get(CFG_CONNECTION_NODE_1, &nodeId1) ||
585  iter.get(CFG_CONNECTION_NODE_2, &nodeId2))
586  DBUG_RETURN(false);
587 
588  // Skip connections where this node is not involved
589  if (nodeId1 != nodeId && nodeId2 != nodeId)
590  continue;
591 
592  // If both sides are MGM, open connection
593  if(is_mgmd(nodeId1, conf) && is_mgmd(nodeId2, conf))
594  {
595  Uint32 remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
596  DBUG_PRINT("info", ("opening connection to node %d", remoteNodeId));
597  doConnect(remoteNodeId);
598  }
599  }
600 
601  DBUG_RETURN(true);
602 }
603 
604 bool
606  const ndb_mgm_configuration* conf)
607 {
608  DBUG_ENTER("TransporterFacade::configure");
609 
610  assert(theOwnId == nodeId);
611  assert(theTransporterRegistry);
612  assert(theClusterMgr);
613 
614  // Configure transporters
616  * conf,
617  * theTransporterRegistry,
618  true))
619  DBUG_RETURN(false);
620 
621  // Configure cluster manager
622  theClusterMgr->configure(nodeId, conf);
623 
624  ndb_mgm_configuration_iterator iter(* conf, CFG_SECTION_NODE);
625  if(iter.find(CFG_NODE_ID, nodeId))
626  DBUG_RETURN(false);
627 
628  // Configure send buffers
629  Uint32 total_send_buffer = 0;
630  iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
631  theTransporterRegistry->allocate_send_buffers(total_send_buffer);
632 
633  Uint32 auto_reconnect=1;
634  iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);
635 
636  const char * priospec = 0;
637  if (iter.get(CFG_HB_THREAD_PRIO, &priospec) == 0)
638  {
639  NdbThread_SetHighPrioProperties(priospec);
640  }
641 
645  if (theClusterMgr->m_auto_reconnect == -1)
646  {
647  theClusterMgr->m_auto_reconnect = auto_reconnect;
648  }
649 
650 #ifdef API_TRACE
651  signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
652 #endif
653 
654  // Open connection between MGM servers
655  if (!do_connect_mgm(nodeId, conf))
656  DBUG_RETURN(false);
657 
661  doConnect(nodeId);
662 
663  DBUG_RETURN(true);
664 }
665 
666 void
668  const NdbApiSignal* aSignal,
669  const LinearSectionPtr ptr[3])
670 {
671  Uint32 sz = m_threads.m_statusNext.size();
672  for (Uint32 i = 0; i < sz ; i ++)
673  {
674  trp_client * clnt = m_threads.m_objectExecute[i];
675  if (clnt != 0 && clnt != sender)
676  {
677  clnt->trp_deliver_signal(aSignal, ptr);
678  }
679  }
680 }
681 
682 void
683 TransporterFacade::connected()
684 {
685  DBUG_ENTER("TransporterFacade::connected");
686  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theOwnId));
687  signal.theVerId_signalNumber = GSN_ALLOC_NODEID_CONF;
688  signal.theReceiversBlockNumber = 0;
689  signal.theTrace = 0;
690  signal.theLength = AllocNodeIdConf::SignalLength;
691 
692  AllocNodeIdConf * rep = CAST_PTR(AllocNodeIdConf, signal.getDataPtrSend());
693  rep->senderRef = 0;
694  rep->senderData = 0;
695  rep->nodeId = theOwnId;
696  rep->secret_lo = 0;
697  rep->secret_hi = 0;
698 
699  Uint32 sz = m_threads.m_statusNext.size();
700  for (Uint32 i = 0; i < sz ; i ++)
701  {
702  trp_client * clnt = m_threads.m_objectExecute[i];
703  if (clnt != 0)
704  {
705  clnt->trp_deliver_signal(&signal, 0);
706  }
707  }
708  DBUG_VOID_RETURN;
709 }
710 
711 int
712 TransporterFacade::close_clnt(trp_client* clnt)
713 {
714  int ret = -1;
715  if (clnt)
716  {
717  NdbMutex_Lock(theMutexPtr);
718  if (m_threads.get(clnt->m_blockNo) == clnt)
719  {
720  m_threads.close(clnt->m_blockNo);
721  ret = 0;
722  }
723  else
724  {
725  assert(0);
726  }
727  NdbMutex_Unlock(theMutexPtr);
728  }
729  return ret;
730 }
731 
732 Uint32
734 {
735  DBUG_ENTER("TransporterFacade::open");
736  Guard g(theMutexPtr);
737  int r= m_threads.open(clnt);
738  if (r < 0)
739  {
740  DBUG_RETURN(0);
741  }
742 
743  if (unlikely(blockNo != -1))
744  {
745  // Using fixed block number, add fixed->dymamic mapping
746  Uint32 fixed_index = blockNo - MIN_API_FIXED_BLOCK_NO;
747 
748  assert(blockNo >= MIN_API_FIXED_BLOCK_NO &&
749  fixed_index <= NO_API_FIXED_BLOCKS);
750 
751  m_fixed2dynamic[fixed_index]= r;
752  }
753 
754  if (theOwnId > 0)
755  {
756  r = numberToRef(r, theOwnId);
757  }
758  else
759  {
760  r = numberToRef(r, 0);
761  }
762  DBUG_RETURN(r);
763 }
764 
765 TransporterFacade::~TransporterFacade()
766 {
767  DBUG_ENTER("TransporterFacade::~TransporterFacade");
768 
769  delete theClusterMgr;
770  NdbMutex_Lock(theMutexPtr);
771  delete theTransporterRegistry;
772  NdbMutex_Unlock(theMutexPtr);
773  NdbMutex_Destroy(theMutexPtr);
774 #ifdef API_TRACE
775  signalLogger.setOutputStream(0);
776 #endif
777  DBUG_VOID_RETURN;
778 }
779 
780 void
781 TransporterFacade::calculateSendLimit()
782 {
783  Uint32 Ti;
784  Uint32 TthreadCount = 0;
785 
786  Uint32 sz = m_threads.m_statusNext.size();
787  for (Ti = 0; Ti < sz; Ti++) {
788  if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
789  TthreadCount++;
790  m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
791  }
792  }
793  currentSendLimit = TthreadCount;
794  if (currentSendLimit == 0) {
795  currentSendLimit = 1;
796  }
797  checkCounter = currentSendLimit << 2;
798 }
799 
800 
801 //-------------------------------------------------
802 // Force sending but still report the sending to the
803 // adaptive algorithm.
804 //-------------------------------------------------
805 void TransporterFacade::forceSend(Uint32 block_number) {
806  checkCounter--;
807  m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
808  sendPerformedLastInterval = 1;
809  if (checkCounter < 0) {
810  calculateSendLimit();
811  }
812  theTransporterRegistry->forceSendCheck(0);
813 }
814 
815 //-------------------------------------------------
816 // Improving API performance
817 //-------------------------------------------------
818 void
819 TransporterFacade::checkForceSend(Uint32 block_number) {
820  m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
821  //-------------------------------------------------
822  // This code is an adaptive algorithm to discover when
823  // the API should actually send its buffers. The reason
824  // is that the performance is highly dependent on the
825  // size of the writes over the communication network.
826  // Thus we try to ensure that the send size is as big
827  // as possible. At the same time we don't want response
828  // time to increase so therefore we have to keep track of
829  // how the users are performing adaptively.
830  //-------------------------------------------------
831 
832  if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
833  sendPerformedLastInterval = 1;
834  }
835  checkCounter--;
836  if (checkCounter < 0) {
837  calculateSendLimit();
838  }
839 }
840 
841 
842 /******************************************************************************
843  * SEND SIGNAL METHODS
844  *****************************************************************************/
845 int
846 TransporterFacade::sendSignal(const NdbApiSignal * aSignal, NodeId aNode)
847 {
848  const Uint32* tDataPtr = aSignal->getConstDataPtrSend();
849  Uint32 Tlen = aSignal->theLength;
850  Uint32 TBno = aSignal->theReceiversBlockNumber;
851 #ifdef API_TRACE
852  if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
853  SignalHeader tmp = * aSignal;
854  tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
855  LinearSectionPtr ptr[3];
856  signalLogger.sendSignal(tmp,
857  1,
858  tDataPtr,
859  aNode, ptr, 0);
860  signalLogger.flushSignalLog();
861  }
862 #endif
863  if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
864  SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
865  1, // JBB
866  tDataPtr,
867  aNode,
868  (LinearSectionPtr*)0);
869  //if (ss != SEND_OK) ndbout << ss << endl;
870  if (ss == SEND_OK)
871  {
872  assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
873  aSignal->readSignalNumber() == GSN_API_REGREQ ||
874  (aSignal->readSignalNumber() == GSN_CONNECT_REP &&
875  aNode == ownId()));
876  }
877  return (ss == SEND_OK ? 0 : -1);
878  }
879  else
880  {
881  ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno;
882  ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl;
883  assert(0);
884  }//if
885  return -1; // Node Dead
886 }
887 
900 {
901 private :
902  GenericSectionIterator* realIterator; /* Real underlying iterator */
903  Uint32 realIterWords; /* Total size of underlying */
904  Uint32 realCurrPos; /* Current pos in underlying */
905  Uint32 rangeStart; /* Sub range start in underlying */
906  Uint32 rangeLen; /* Sub range len in underlying */
907  Uint32 rangeRemain; /* Remaining words in underlying */
908  const Uint32* lastReadPtr; /* Ptr to last chunk obtained from
909  * underlying */
910  Uint32 lastReadPtrLen; /* Remaining words in last chunk
911  * obtained from underlying */
912 public:
913  /* Constructor
914  * The instance is constructed with the sub-range set to be the
915  * full range of the underlying iterator
916  */
918  {
919  realIterator= ptr.sectionIter;
920  realIterWords= ptr.sz;
921  realCurrPos= 0;
922  rangeStart= 0;
923  rangeLen= rangeRemain= realIterWords;
924  lastReadPtr= NULL;
925  lastReadPtrLen= 0;
926  moveToPos(0);
927 
928  assert(checkInvariants());
929  }
930 
931 private:
937  bool checkInvariants()
938  {
939  assert( (realIterator != NULL) || (realIterWords == 0) );
940  assert( realCurrPos <= realIterWords );
941  assert( rangeStart <= realIterWords );
942  assert( (rangeStart+rangeLen) <= realIterWords);
943  assert( rangeRemain <= rangeLen );
944 
945  /* Can only have a null readptr if nothing is left */
946  assert( (lastReadPtr != NULL) || (rangeRemain == 0));
947 
948  /* If we have a non-null readptr and some remaining
949  * words the readptr must have some words
950  */
951  assert( (lastReadPtr == NULL) ||
952  ((rangeRemain == 0) || (lastReadPtrLen != 0)));
953  return true;
954  }
955 
964  void moveToPos(Uint32 pos)
965  {
966  assert(pos <= realIterWords);
967 
968  if (pos < realCurrPos)
969  {
970  /* Need to reset, and advance from the start */
971  realIterator->reset();
972  realCurrPos= 0;
973  lastReadPtr= NULL;
974  lastReadPtrLen= 0;
975  }
976 
977  if ((lastReadPtr == NULL) &&
978  (realIterWords != 0) &&
979  (pos != realIterWords))
980  lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
981 
982  if (pos == realCurrPos)
983  return;
984 
985  /* Advance until we get a chunk which contains the pos */
986  while (pos >= realCurrPos + lastReadPtrLen)
987  {
988  realCurrPos+= lastReadPtrLen;
989  lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
990  assert(lastReadPtr != NULL);
991  }
992 
993  const Uint32 chunkOffset= pos - realCurrPos;
994  lastReadPtr+= chunkOffset;
995  lastReadPtrLen-= chunkOffset;
996  realCurrPos= pos;
997  }
998 
999 public:
1007  bool setRange(Uint32 start, Uint32 len)
1008  {
1009  assert(checkInvariants());
1010  if (start+len > realIterWords)
1011  return false;
1012  moveToPos(start);
1013 
1014  rangeStart= start;
1015  rangeLen= rangeRemain= len;
1016 
1017  assert(checkInvariants());
1018  return true;
1019  }
1020 
1027  void reset()
1028  {
1029  /* Reset iterator to last specified range */
1030  assert(checkInvariants());
1031  moveToPos(rangeStart);
1032  rangeRemain= rangeLen;
1033  assert(checkInvariants());
1034  }
1035 
1041  const Uint32* getNextWords(Uint32& sz)
1042  {
1043  assert(checkInvariants());
1044  const Uint32* currPtr= NULL;
1045 
1046  if (rangeRemain)
1047  {
1048  assert(lastReadPtr != NULL);
1049  assert(lastReadPtrLen != 0);
1050  currPtr= lastReadPtr;
1051 
1052  sz= MIN(rangeRemain, lastReadPtrLen);
1053 
1054  if (sz == lastReadPtrLen)
1055  /* Will return everything in this chunk, move iterator to
1056  * next
1057  */
1058  lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
1059  else
1060  {
1061  /* Not returning all of this chunk, just advance within it */
1062  lastReadPtr+= sz;
1063  lastReadPtrLen-= sz;
1064  }
1065  realCurrPos+= sz;
1066  rangeRemain-= sz;
1067  }
1068  else
1069  {
1070  sz= 0;
1071  }
1072 
1073  assert(checkInvariants());
1074  return currPtr;
1075  }
1076 };
1077 
1078 /* Max fragmented signal chunk size (words) is max round number
1079  * of NDB_SECTION_SEGMENT_SZ words with some slack left for 'main'
1080  * part of signal etc.
1081  */
1082 #define CHUNK_SZ ((((MAX_SEND_MESSAGE_BYTESIZE >> 2) / NDB_SECTION_SEGMENT_SZ) - 2 ) \
1083  * NDB_SECTION_SEGMENT_SZ)
1084 
1111 int
1112 TransporterFacade::sendFragmentedSignal(const NdbApiSignal* inputSignal,
1113  NodeId aNode,
1114  const GenericSectionPtr ptr[3],
1115  Uint32 secs)
1116 {
1117  NdbApiSignal copySignal(* inputSignal);
1118  NdbApiSignal* aSignal = &copySignal;
1119 
1120  unsigned i;
1121  Uint32 totalSectionLength= 0;
1122  for (i= 0; i < secs; i++)
1123  totalSectionLength+= ptr[i].sz;
1124 
1125  /* If there's no need to fragment, send normally */
1126  if (totalSectionLength <= CHUNK_SZ)
1127  return sendSignal(aSignal, aNode, ptr, secs);
1128 
1129  // TODO : Consider tracing fragment signals?
1130 #ifdef API_TRACE
1131  if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1132  SignalHeader tmp = * aSignal;
1133  tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1134  signalLogger.sendSignal(tmp,
1135  1,
1136  aSignal->getConstDataPtrSend(),
1137  aNode, ptr, 0);
1138  signalLogger.flushSignalLog();
1139  for (Uint32 i = 0; i<secs; i++)
1140  ptr[i].sectionIter->reset();
1141  }
1142 #endif
1143 
1144  NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
1145  GenericSectionPtr tmp_ptr[3];
1146  GenericSectionPtr empty= {0, NULL};
1147  Uint32 unique_id= m_fragmented_signal_id++; // next unique id
1148 
1149  /* Init tmp_ptr array from ptr[] array, make sure we have
1150  * 0 length for missing sections
1151  */
1152  for (i= 0; i < 3; i++)
1153  tmp_ptr[i]= (i < secs)? ptr[i] : empty;
1154 
1155  /* Create our section iterator adapters */
1156  FragmentedSectionIterator sec0(tmp_ptr[0]);
1157  FragmentedSectionIterator sec1(tmp_ptr[1]);
1158  FragmentedSectionIterator sec2(tmp_ptr[2]);
1159 
1160  /* Replace caller's iterators with ours */
1161  tmp_ptr[0].sectionIter= &sec0;
1162  tmp_ptr[1].sectionIter= &sec1;
1163  tmp_ptr[2].sectionIter= &sec2;
1164 
1165  unsigned start_i= 0;
1166  unsigned this_chunk_sz= 0;
1167  unsigned fragment_info= 0;
1168  Uint32 *tmp_signal_data= tmp_signal.getDataPtrSend();
1169  for (i= 0; i < secs;) {
1170  unsigned remaining_sec_sz= tmp_ptr[i].sz;
1171  tmp_signal_data[i-start_i]= i;
1172  if (this_chunk_sz + remaining_sec_sz <= CHUNK_SZ)
1173  {
1174  /* This section fits whole, move onto next */
1175  this_chunk_sz+= remaining_sec_sz;
1176  i++;
1177  }
1178  else
1179  {
1180  /* This section doesn't fit, truncate it */
1181  unsigned send_sz= CHUNK_SZ - this_chunk_sz;
1182  if (i != start_i)
1183  {
1184  /* We ensure that the first piece of a new section which is
1185  * being truncated is a multiple of NDB_SECTION_SEGMENT_SZ
1186  * (to simplify reassembly). Subsequent non-truncated pieces
1187  * will be CHUNK_SZ which is a multiple of NDB_SECTION_SEGMENT_SZ
1188  * The final piece does not need to be a multiple of
1189  * NDB_SECTION_SEGMENT_SZ
1190  *
1191  * Note that this can push this_chunk_sz above CHUNK_SZ
1192  * Should probably round-down, but need to be careful of
1193  * 'can't fit any' cases. Instead, CHUNK_SZ is defined
1194  * with some slack below MAX_SENT_MESSAGE_BYTESIZE
1195  */
1196  send_sz=
1197  NDB_SECTION_SEGMENT_SZ
1198  *((send_sz+NDB_SECTION_SEGMENT_SZ-1)
1199  /NDB_SECTION_SEGMENT_SZ);
1200  if (send_sz > remaining_sec_sz)
1201  send_sz= remaining_sec_sz;
1202  }
1203 
1204  /* Modify tmp generic section ptr to describe truncated
1205  * section
1206  */
1207  tmp_ptr[i].sz= send_sz;
1208  FragmentedSectionIterator* fragIter=
1209  (FragmentedSectionIterator*) tmp_ptr[i].sectionIter;
1210  const Uint32 total_sec_sz= ptr[i].sz;
1211  const Uint32 start= (total_sec_sz - remaining_sec_sz);
1212  bool ok= fragIter->setRange(start, send_sz);
1213  assert(ok);
1214  if (!ok)
1215  return -1;
1216 
1217  if (fragment_info < 2) // 1 = first fragment signal
1218  // 2 = middle fragments
1219  fragment_info++;
1220 
1221  // send tmp_signal
1222  tmp_signal_data[i-start_i+1]= unique_id;
1223  tmp_signal.setLength(i-start_i+2);
1224  tmp_signal.m_fragmentInfo= fragment_info;
1225  tmp_signal.m_noOfSections= i-start_i+1;
1226  // do prepare send
1227  {
1228  SendStatus ss = theTransporterRegistry->prepareSend
1229  (&tmp_signal,
1230  1, /*JBB*/
1231  tmp_signal_data,
1232  aNode,
1233  &tmp_ptr[start_i]);
1234  assert(ss != SEND_MESSAGE_TOO_BIG);
1235  if (ss != SEND_OK) return -1;
1236  if (ss == SEND_OK)
1237  {
1238  assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1239  tmp_signal.readSignalNumber() == GSN_API_REGREQ);
1240  }
1241  }
1242  // setup variables for next signal
1243  start_i= i;
1244  this_chunk_sz= 0;
1245  assert(remaining_sec_sz >= send_sz);
1246  Uint32 remaining= remaining_sec_sz - send_sz;
1247  tmp_ptr[i].sz= remaining;
1248  /* Set sub-range iterator to cover remaining words */
1249  ok= fragIter->setRange(start+send_sz, remaining);
1250  assert(ok);
1251  if (!ok)
1252  return -1;
1253 
1254  if (remaining == 0)
1255  /* This section's done, move onto the next */
1256  i++;
1257  }
1258  }
1259 
1260  unsigned a_sz= aSignal->getLength();
1261 
1262  if (fragment_info > 0) {
1263  // update the original signal to include section info
1264  Uint32 *a_data= aSignal->getDataPtrSend();
1265  unsigned tmp_sz= i-start_i;
1266  memcpy(a_data+a_sz,
1267  tmp_signal_data,
1268  tmp_sz*sizeof(Uint32));
1269  a_data[a_sz+tmp_sz]= unique_id;
1270  aSignal->setLength(a_sz+tmp_sz+1);
1271 
1272  // send last fragment
1273  aSignal->m_fragmentInfo= 3; // 3 = last fragment
1274  aSignal->m_noOfSections= i-start_i;
1275  } else {
1276  aSignal->m_noOfSections= secs;
1277  }
1278 
1279  // send aSignal
1280  int ret;
1281  {
1282  SendStatus ss = theTransporterRegistry->prepareSend
1283  (aSignal,
1284  1/*JBB*/,
1285  aSignal->getConstDataPtrSend(),
1286  aNode,
1287  &tmp_ptr[start_i]);
1288  assert(ss != SEND_MESSAGE_TOO_BIG);
1289  if (ss == SEND_OK)
1290  {
1291  assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1292  aSignal->readSignalNumber() == GSN_API_REGREQ);
1293  }
1294  ret = (ss == SEND_OK ? 0 : -1);
1295  }
1296  aSignal->m_noOfSections = 0;
1297  aSignal->m_fragmentInfo = 0;
1298  aSignal->setLength(a_sz);
1299  return ret;
1300 }
1301 
1302 int
1303 TransporterFacade::sendFragmentedSignal(const NdbApiSignal* aSignal,
1304  NodeId aNode,
1305  const LinearSectionPtr ptr[3],
1306  Uint32 secs)
1307 {
1308  /* Use the GenericSection variant of sendFragmentedSignal */
1309  GenericSectionPtr tmpPtr[3];
1310  LinearSectionPtr linCopy[3];
1311  const LinearSectionPtr empty= {0, NULL};
1312 
1313  /* Make sure all of linCopy is initialised */
1314  for (Uint32 j=0; j<3; j++)
1315  linCopy[j]= (j < secs)? ptr[j] : empty;
1316 
1317  LinearSectionIterator zero (linCopy[0].p, linCopy[0].sz);
1318  LinearSectionIterator one (linCopy[1].p, linCopy[1].sz);
1319  LinearSectionIterator two (linCopy[2].p, linCopy[2].sz);
1320 
1321  /* Build GenericSectionPtr array using iterators */
1322  tmpPtr[0].sz= linCopy[0].sz;
1323  tmpPtr[0].sectionIter= &zero;
1324  tmpPtr[1].sz= linCopy[1].sz;
1325  tmpPtr[1].sectionIter= &one;
1326  tmpPtr[2].sz= linCopy[2].sz;
1327  tmpPtr[2].sectionIter= &two;
1328 
1329  return sendFragmentedSignal(aSignal, aNode, tmpPtr, secs);
1330 }
1331 
1332 
1333 int
1334 TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
1335  const LinearSectionPtr ptr[3], Uint32 secs)
1336 {
1337  Uint32 save = aSignal->m_noOfSections;
1338  const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = secs;
1339 #ifdef API_TRACE
1340  if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1341  SignalHeader tmp = * aSignal;
1342  tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1343  signalLogger.sendSignal(tmp,
1344  1,
1345  aSignal->getConstDataPtrSend(),
1346  aNode, ptr, secs);
1347  signalLogger.flushSignalLog();
1348  }
1349 #endif
1350  SendStatus ss = theTransporterRegistry->prepareSend
1351  (aSignal,
1352  1, // JBB
1353  aSignal->getConstDataPtrSend(),
1354  aNode,
1355  ptr);
1356  assert(ss != SEND_MESSAGE_TOO_BIG);
1357  const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = save;
1358  if (ss == SEND_OK)
1359  {
1360  assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1361  aSignal->readSignalNumber() == GSN_API_REGREQ);
1362  }
1363  return (ss == SEND_OK ? 0 : -1);
1364 }
1365 
1366 int
1367 TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
1368  const GenericSectionPtr ptr[3], Uint32 secs)
1369 {
1370  Uint32 save = aSignal->m_noOfSections;
1371  const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = secs;
1372 #ifdef API_TRACE
1373  if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1374  SignalHeader tmp = * aSignal;
1375  tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1376  signalLogger.sendSignal(tmp,
1377  1,
1378  aSignal->getConstDataPtrSend(),
1379  aNode, ptr, secs);
1380  signalLogger.flushSignalLog();
1381  for (Uint32 i = 0; i<secs; i++)
1382  ptr[i].sectionIter->reset();
1383  }
1384 #endif
1385  SendStatus ss = theTransporterRegistry->prepareSend
1386  (aSignal,
1387  1, // JBB
1388  aSignal->getConstDataPtrSend(),
1389  aNode,
1390  ptr);
1391  assert(ss != SEND_MESSAGE_TOO_BIG);
1392  const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = save;
1393  if (ss == SEND_OK)
1394  {
1395  assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1396  aSignal->readSignalNumber() == GSN_API_REGREQ);
1397  }
1398  return (ss == SEND_OK ? 0 : -1);
1399 }
1400 
1401 /******************************************************************************
1402  * CONNECTION METHODS Etc
1403  ******************************************************************************/
1404 void
1405 TransporterFacade::doConnect(int aNodeId){
1406  theTransporterRegistry->setIOState(aNodeId, NoHalt);
1407  theTransporterRegistry->do_connect(aNodeId);
1408 }
1409 
1410 void
1411 TransporterFacade::doDisconnect(int aNodeId)
1412 {
1413  theTransporterRegistry->do_disconnect(aNodeId);
1414 }
1415 
1416 void
1417 TransporterFacade::reportConnected(int aNodeId)
1418 {
1419  theClusterMgr->reportConnected(aNodeId);
1420  return;
1421 }
1422 
1423 void
1424 TransporterFacade::reportDisconnected(int aNodeId)
1425 {
1426  theClusterMgr->reportDisconnected(aNodeId);
1427  return;
1428 }
1429 
1430 NodeId
1431 TransporterFacade::ownId() const
1432 {
1433  return theOwnId;
1434 }
1435 
1436 bool
1437 TransporterFacade::isConnected(NodeId aNodeId){
1438  return theTransporterRegistry->is_connected(aNodeId);
1439 }
1440 
1441 NodeId
1442 TransporterFacade::get_an_alive_node()
1443 {
1444  DBUG_ENTER("TransporterFacade::get_an_alive_node");
1445  DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
1446 #ifdef VM_TRACE
1447  const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
1448  if (p != 0 && *p != 0)
1449  return atoi(p);
1450 #endif
1451  NodeId i;
1452  for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
1453  if (get_node_alive(i)){
1454  DBUG_PRINT("info", ("Node %d is alive", i));
1455  theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1456  DBUG_RETURN(i);
1457  }
1458  }
1459  for (i = 1; i < theStartNodeId; i++) {
1460  if (get_node_alive(i)){
1461  DBUG_PRINT("info", ("Node %d is alive", i));
1462  theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1463  DBUG_RETURN(i);
1464  }
1465  }
1466  DBUG_RETURN((NodeId)0);
1467 }
1468 
1469 TransporterFacade::ThreadData::ThreadData(Uint32 size){
1470  m_use_cnt = 0;
1471  m_firstFree = END_OF_LIST;
1472  expand(size);
1473 }
1474 
1475 void
1476 TransporterFacade::ThreadData::expand(Uint32 size){
1477  trp_client * oe = 0;
1478 
1479  const Uint32 sz = m_statusNext.size();
1480  m_objectExecute.fill(sz + size, oe);
1481  for(Uint32 i = 0; i<size; i++){
1482  m_statusNext.push_back(sz + i + 1);
1483  }
1484 
1485  m_statusNext.back() = m_firstFree;
1486  m_firstFree = m_statusNext.size() - size;
1487 }
1488 
1489 
1490 int
1491 TransporterFacade::ThreadData::open(trp_client * clnt)
1492 {
1493  Uint32 nextFree = m_firstFree;
1494 
1495  if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
1496  return -1;
1497  }
1498 
1499  if(nextFree == END_OF_LIST){
1500  expand(10);
1501  nextFree = m_firstFree;
1502  }
1503 
1504  m_use_cnt++;
1505  m_firstFree = m_statusNext[nextFree];
1506 
1507  m_statusNext[nextFree] = INACTIVE;
1508  m_objectExecute[nextFree] = clnt;
1509 
1510  return indexToNumber(nextFree);
1511 }
1512 
1513 int
1514 TransporterFacade::ThreadData::close(int number){
1515  number= numberToIndex(number);
1516  assert(m_objectExecute[number] != 0);
1517  m_statusNext[number] = m_firstFree;
1518  assert(m_use_cnt);
1519  m_use_cnt--;
1520  m_firstFree = number;
1521  m_objectExecute[number] = 0;
1522  return 0;
1523 }
1524 
1525 Uint32
1526 TransporterFacade::get_active_ndb_objects() const
1527 {
1528  return m_threads.m_use_cnt;
1529 }
1530 
1531 
1532 void
1533 TransporterFacade::start_poll(trp_client* clnt)
1534 {
1535  lock_mutex();
1536  clnt->m_poll.m_locked = true;
1537 }
1538 
1539 void
1540 TransporterFacade::do_poll(trp_client* clnt, Uint32 wait_time)
1541 {
1542  clnt->m_poll.m_waiting = true;
1543  assert(clnt->m_poll.m_locked == true);
1544  trp_client* owner = m_poll_owner;
1545  if (owner != NULL && owner != clnt)
1546  {
1547  /*
1548  We didn't get hold of the poll "right". We will sleep on a
1549  conditional mutex until the thread owning the poll "right"
1550  will wake us up after all data is received. If no data arrives
1551  we will wake up eventually due to the timeout.
1552  After receiving all data we take the object out of the cond wait
1553  queue if it hasn't happened already. It is usually already out of the
1554  queue but at time-out it could be that the object is still there.
1555  */
1556  assert(clnt->m_poll.m_poll_owner == false);
1557  add_to_poll_queue(clnt);
1558  NdbCondition_WaitTimeout(clnt->m_poll.m_condition, theMutexPtr,
1559  wait_time);
1560  if (clnt != m_poll_owner && clnt->m_poll.m_waiting)
1561  {
1562  remove_from_poll_queue(clnt);
1563  }
1564  }
1565  else
1566  {
1567  /*
1568  We got the poll "right" and we poll until data is received. After
1569  receiving data we will check if all data is received, if not we
1570  poll again.
1571  */
1572  assert(owner == clnt || clnt->m_poll.m_poll_owner == false);
1573  m_poll_owner = clnt;
1574  clnt->m_poll.m_poll_owner = true;
1575  external_poll(wait_time);
1576  }
1577 }
1578 
1579 void
1580 TransporterFacade::wakeup(trp_client* clnt)
1581 {
1582  if (clnt->m_poll.m_waiting)
1583  {
1584  clnt->m_poll.m_waiting = false;
1585  if (m_poll_owner != clnt)
1586  {
1587  remove_from_poll_queue(clnt);
1588  NdbCondition_Signal(clnt->m_poll.m_condition);
1589  }
1590  }
1591 }
1592 
1593 void
1594 TransporterFacade::complete_poll(trp_client* clnt)
1595 {
1596  clnt->m_poll.m_waiting = false;
1597  if (!clnt->m_poll.m_locked)
1598  {
1599  assert(clnt->m_poll.m_poll_owner == false);
1600  return;
1601  }
1602 
1603  /*
1604  When completing the poll for this thread we must return the poll
1605  ownership if we own it. We will give it to the last thread that
1606  came here (the most recent) which is likely to be the one also
1607  last to complete. We will remove that thread from the conditional
1608  wait queue and set him as the new owner of the poll "right".
1609  We will wait however with the signal until we have unlocked the
1610  mutex for performance reasons.
1611  See Stevens book on Unix NetworkProgramming: The Sockets Networking
1612  API Volume 1 Third Edition on page 703-704 for a discussion on this
1613  subject.
1614  */
1615  trp_client* new_owner = 0;
1616  if (m_poll_owner == clnt)
1617  {
1618  assert(clnt->m_poll.m_poll_owner == true);
1619  m_poll_owner = new_owner = remove_last_from_poll_queue();
1620  }
1621  if (new_owner)
1622  {
1623  assert(new_owner->m_poll.m_poll_owner == false);
1624  assert(new_owner->m_poll.m_locked == true);
1625  assert(new_owner->m_poll.m_waiting == true);
1626  NdbCondition_Signal(new_owner->m_poll.m_condition);
1627  new_owner->m_poll.m_poll_owner = true;
1628  }
1629  clnt->m_poll.m_locked = false;
1630  clnt->m_poll.m_poll_owner = false;
1631  unlock_mutex();
1632 }
1633 
1634 void
1635 TransporterFacade::add_to_poll_queue(trp_client* clnt)
1636 {
1637  assert(clnt != 0);
1638  assert(clnt->m_poll.m_prev == 0);
1639  assert(clnt->m_poll.m_next == 0);
1640  assert(clnt->m_poll.m_locked == true);
1641  assert(clnt->m_poll.m_poll_owner == false);
1642 
1643  if (m_poll_queue_head == 0)
1644  {
1645  assert(m_poll_queue_tail == 0);
1646  m_poll_queue_head = clnt;
1647  m_poll_queue_tail = clnt;
1648  }
1649  else
1650  {
1651  assert(m_poll_queue_tail->m_poll.m_next == 0);
1652  m_poll_queue_tail->m_poll.m_next = clnt;
1653  clnt->m_poll.m_prev = m_poll_queue_tail;
1654  m_poll_queue_tail = clnt;
1655  }
1656 }
1657 
1658 void
1659 TransporterFacade::remove_from_poll_queue(trp_client* clnt)
1660 {
1661  assert(clnt != 0);
1662  assert(clnt->m_poll.m_locked == true);
1663  assert(clnt->m_poll.m_poll_owner == false);
1664 
1665  if (clnt->m_poll.m_prev != 0)
1666  {
1667  clnt->m_poll.m_prev->m_poll.m_next = clnt->m_poll.m_next;
1668  }
1669  else
1670  {
1671  assert(m_poll_queue_head == clnt);
1672  m_poll_queue_head = clnt->m_poll.m_next;
1673  }
1674 
1675  if (clnt->m_poll.m_next != 0)
1676  {
1677  clnt->m_poll.m_next->m_poll.m_prev = clnt->m_poll.m_prev;
1678  }
1679  else
1680  {
1681  assert(m_poll_queue_tail == clnt);
1682  m_poll_queue_tail = clnt->m_poll.m_prev;
1683  }
1684 
1685  if (m_poll_queue_head == 0)
1686  assert(m_poll_queue_tail == 0);
1687  else if (m_poll_queue_tail == 0)
1688  assert(m_poll_queue_head == 0);
1689 
1690  clnt->m_poll.m_prev = 0;
1691  clnt->m_poll.m_next = 0;
1692 }
1693 
1694 trp_client*
1695 TransporterFacade::remove_last_from_poll_queue()
1696 {
1697  trp_client * clnt = m_poll_queue_tail;
1698  if (clnt == 0)
1699  return 0;
1700 
1701  remove_from_poll_queue(clnt);
1702  return clnt;
1703 }
1704 
1705 template class Vector<trp_client*>;
1706 
1707 #include "SignalSender.hpp"
1708 
1709 const Uint32*
1710 SignalSectionIterator::getNextWords(Uint32& sz)
1711 {
1712  if (likely(currentSignal != NULL))
1713  {
1714  NdbApiSignal* signal= currentSignal;
1715  currentSignal= currentSignal->next();
1716  sz= signal->getLength();
1717  return signal->getDataPtrSend();
1718  }
1719  sz= 0;
1720  return NULL;
1721 }
1722 
1723 #ifdef UNIT_TEST
1724 
1725 // Unit test code starts
1726 #include <random.h>
1727 
1728 #define VERIFY(x) if ((x) == 0) { printf("VERIFY failed at Line %u : %s\n",__LINE__, #x); return -1; }
1729 
1730 /* Verify that word[n] == bias + n */
1731 int
1732 verifyIteratorContents(GenericSectionIterator& gsi, int dataWords, int bias)
1733 {
1734  int pos= 0;
1735 
1736  while (pos < dataWords)
1737  {
1738  const Uint32* readPtr=NULL;
1739  Uint32 len= 0;
1740 
1741  readPtr= gsi.getNextWords(len);
1742 
1743  VERIFY(readPtr != NULL);
1744  VERIFY(len != 0);
1745  VERIFY(len <= (Uint32) (dataWords - pos));
1746 
1747  for (int j=0; j < (int) len; j++)
1748  VERIFY(readPtr[j] == (Uint32) (bias ++));
1749 
1750  pos += len;
1751  }
1752 
1753  return 0;
1754 }
1755 
1756 int
1757 checkGenericSectionIterator(GenericSectionIterator& iter, int size, int bias)
1758 {
1759  /* Verify contents */
1760  VERIFY(verifyIteratorContents(iter, size, bias) == 0);
1761 
1762  Uint32 sz;
1763 
1764  /* Check that iterator is empty now */
1765  VERIFY(iter.getNextWords(sz) == NULL);
1766  VERIFY(sz == 0);
1767 
1768  VERIFY(iter.getNextWords(sz) == NULL);
1769  VERIFY(sz == 0);
1770 
1771  iter.reset();
1772 
1773  /* Verify reset put us back to the start */
1774  VERIFY(verifyIteratorContents(iter, size, bias) == 0);
1775 
1776  /* Verify no more words available */
1777  VERIFY(iter.getNextWords(sz) == NULL);
1778  VERIFY(sz == 0);
1779 
1780  return 0;
1781 }
1782 
1783 int
1784 checkIterator(GenericSectionIterator& iter, int size, int bias)
1785 {
1786  /* Test iterator itself, and then FragmentedSectionIterator
1787  * adaptation
1788  */
1789  VERIFY(checkGenericSectionIterator(iter, size, bias) == 0);
1790 
1791  /* Now we'll test the FragmentedSectionIterator on the iterator
1792  * we were passed
1793  */
1794  const int subranges= 20;
1795 
1796  iter.reset();
1797  GenericSectionPtr ptr;
1798  ptr.sz= size;
1799  ptr.sectionIter= &iter;
1800  FragmentedSectionIterator fsi(ptr);
1801 
1802  for (int s=0; s< subranges; s++)
1803  {
1804  Uint32 start= 0;
1805  Uint32 len= 0;
1806  if (size > 0)
1807  {
1808  start= (Uint32) myRandom48(size);
1809  if (0 != (size-start))
1810  len= (Uint32) myRandom48(size-start);
1811  }
1812 
1813  /*
1814  printf("Range (0-%u) = (%u + %u)\n",
1815  size, start, len);
1816  */
1817  fsi.setRange(start, len);
1818  VERIFY(checkGenericSectionIterator(fsi, len, bias + start) == 0);
1819  }
1820 
1821  return 0;
1822 }
1823 
1824 
1825 
1826 int
1827 testLinearSectionIterator()
1828 {
1829  /* Test Linear section iterator of various
1830  * lengths with section[n] == bias + n
1831  */
1832  const int totalSize= 200000;
1833  const int bias= 13;
1834 
1835  Uint32 data[totalSize];
1836  for (int i=0; i<totalSize; i++)
1837  data[i]= bias + i;
1838 
1839  for (int len= 0; len < 50000; len++)
1840  {
1841  LinearSectionIterator something(data, len);
1842 
1843  VERIFY(checkIterator(something, len, bias) == 0);
1844  }
1845 
1846  return 0;
1847 }
1848 
1849 NdbApiSignal*
1850 createSignalChain(NdbApiSignal*& poolHead, int length, int bias)
1851 {
1852  /* Create signal chain, with word[n] == bias+n */
1853  NdbApiSignal* chainHead= NULL;
1854  NdbApiSignal* chainTail= NULL;
1855  int pos= 0;
1856  int signals= 0;
1857 
1858  while (pos < length)
1859  {
1860  int offset= pos % NdbApiSignal::MaxSignalWords;
1861 
1862  if (offset == 0)
1863  {
1864  if (poolHead == NULL)
1865  return 0;
1866 
1867  NdbApiSignal* newSig= poolHead;
1868  poolHead= poolHead->next();
1869  signals++;
1870 
1871  newSig->next(NULL);
1872 
1873  if (chainHead == NULL)
1874  {
1875  chainHead= chainTail= newSig;
1876  }
1877  else
1878  {
1879  chainTail->next(newSig);
1880  chainTail= newSig;
1881  }
1882  }
1883 
1884  chainTail->getDataPtrSend()[offset]= (bias + pos);
1885  chainTail->setLength(offset + 1);
1886  pos ++;
1887  }
1888 
1889  return chainHead;
1890 }
1891 
1892 int
1893 testSignalSectionIterator()
1894 {
1895  /* Create a pool of signals, build
1896  * signal chains from it, test
1897  * the iterator against the signal chains
1898  */
1899  const int totalNumSignals= 1000;
1900  NdbApiSignal* poolHead= NULL;
1901 
1902  /* Allocate some signals */
1903  for (int i=0; i < totalNumSignals; i++)
1904  {
1905  NdbApiSignal* sig= new NdbApiSignal((BlockReference) 0);
1906 
1907  if (poolHead == NULL)
1908  {
1909  poolHead= sig;
1910  sig->next(NULL);
1911  }
1912  else
1913  {
1914  sig->next(poolHead);
1915  poolHead= sig;
1916  }
1917  }
1918 
1919  const int bias= 7;
1920  for (int dataWords= 1;
1921  dataWords <= (int)(totalNumSignals *
1922  NdbApiSignal::MaxSignalWords);
1923  dataWords ++)
1924  {
1925  NdbApiSignal* signalChain= NULL;
1926 
1927  VERIFY((signalChain= createSignalChain(poolHead, dataWords, bias)) != NULL );
1928 
1929  SignalSectionIterator ssi(signalChain);
1930 
1931  VERIFY(checkIterator(ssi, dataWords, bias) == 0);
1932 
1933  /* Now return the signals to the pool */
1934  while (signalChain != NULL)
1935  {
1936  NdbApiSignal* sig= signalChain;
1937  signalChain= signalChain->next();
1938 
1939  sig->next(poolHead);
1940  poolHead= sig;
1941  }
1942  }
1943 
1944  /* Free signals from pool */
1945  while (poolHead != NULL)
1946  {
1947  NdbApiSignal* sig= poolHead;
1948  poolHead= sig->next();
1949  delete(sig);
1950  }
1951 
1952  return 0;
1953 }
1954 
1955 int main(int arg, char** argv)
1956 {
1957  /* Test Section Iterators
1958  * ----------------------
1959  * To run this code :
1960  * cd storage/ndb/src/ndbapi
1961  * make testSectionIterators
1962  * ./testSectionIterators
1963  *
1964  * Will print "OK" in success case
1965  */
1966 
1967 
1968  VERIFY(testLinearSectionIterator() == 0);
1969  VERIFY(testSignalSectionIterator() == 0);
1970 
1971  printf("OK\n");
1972 
1973  return 0;
1974 }
1975 #endif
1976 
1977 void
1978 TransporterFacade::set_auto_reconnect(int val)
1979 {
1980  theClusterMgr->m_auto_reconnect = val;
1981 }
1982 
1983 int
1984 TransporterFacade::get_auto_reconnect() const
1985 {
1986  return theClusterMgr->m_auto_reconnect;
1987 }
1988 
1989 void
1991 {
1992  theClusterMgr->set_max_api_reg_req_interval(interval);
1993 }
1994 
1995 void
1996 TransporterFacade::ext_update_connections()
1997 {
1998  theClusterMgr->lock();
1999  theTransporterRegistry->update_connections();
2000  theClusterMgr->unlock();
2001 }
2002 
2003 struct in_addr
2004 TransporterFacade::ext_get_connect_address(Uint32 nodeId)
2005 {
2006  return theTransporterRegistry->get_connect_address(nodeId);
2007 }
2008 
2009 void
2010 TransporterFacade::ext_forceHB()
2011 {
2012  theClusterMgr->forceHB();
2013 }
2014 
2015 bool
2016 TransporterFacade::ext_isConnected(NodeId aNodeId)
2017 {
2018  bool val;
2019  theClusterMgr->lock();
2020  val = theClusterMgr->theNodes[aNodeId].is_connected();
2021  theClusterMgr->unlock();
2022  return val;
2023 }
2024 
2025 void
2026 TransporterFacade::ext_doConnect(int aNodeId)
2027 {
2028  theClusterMgr->lock();
2029  assert(theClusterMgr->theNodes[aNodeId].is_connected() == false);
2030  doConnect(aNodeId);
2031  theClusterMgr->unlock();
2032 }
2033