18 #include <ndb_global.h> 
   19 #include <ndb_limits.h> 
   20 #include "TransporterFacade.hpp" 
   21 #include "trp_client.hpp" 
   22 #include "ClusterMgr.hpp" 
   23 #include <IPCConfig.hpp> 
   24 #include <TransporterCallback.hpp> 
   25 #include <TransporterRegistry.hpp> 
   26 #include "NdbApiSignal.hpp" 
   27 #include "NdbWaiter.hpp" 
   32 #include <kernel/GlobalSignalNumbers.h> 
   33 #include <mgmapi_config_parameters.h> 
   34 #include <mgmapi_configuration.hpp> 
   35 #include <NdbConfig.h> 
   36 #include <ndb_version.h> 
   37 #include <SignalLoggerManager.hpp> 
   38 #include <kernel/ndb_limits.h> 
   39 #include <signaldata/AlterTable.hpp> 
   40 #include <signaldata/SumaImpl.hpp> 
   41 #include <signaldata/AllocNodeId.hpp> 
   46 static int numberToIndex(
int number)
 
   48   return number - MIN_API_BLOCK_NO;
 
   51 static int indexToNumber(
int index)
 
   53   return index + MIN_API_BLOCK_NO;
 
   56 #if defined DEBUG_TRANSPORTER 
   57 #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl; 
   67                                TransporterError errorCode, 
const char *info)
 
   69 #ifdef REPORT_TRANSPORTER 
   70   ndbout_c(
"REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s", 
 
   71            (
int)nodeId, (
int)errorCode, info ? info : 
"");
 
   73   if(errorCode & TE_DO_DISCONNECT) {
 
   74     ndbout_c(
"reportError (%d, %d) %s", (
int)nodeId, (
int)errorCode,
 
   86 #ifdef REPORT_TRANSPORTER 
   87   ndbout_c(
"REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)", 
 
   88            (
int)nodeId, (Uint32)(bytes/count));
 
  101 #ifdef REPORT_TRANSPORTER 
  102   ndbout_c(
"REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)", 
 
  103            (
int)nodeId, (Uint32)(bytes/count));
 
  116 #ifdef REPORT_TRANSPORTER 
  117   ndbout_c(
"REPORT_TRANSP: API reportConnect (nodeId=%d)", (
int)nodeId);
 
  119   reportConnected(nodeId);
 
  127 #ifdef REPORT_TRANSPORTER 
  128   ndbout_c(
"REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (
int)nodeId);
 
  130   reportDisconnected(nodeId);
 
  153 static const char * API_SIGNAL_LOG = 
"API_SIGNAL_LOG";
 
  154 static const char * apiSignalLog   = 0;
 
  160   signalLogger.flushSignalLog();
 
  162   const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (
char *)0, 0);
 
  163   if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
 
  165   } 
else if(tmp == 0 && apiSignalLog == 0){
 
  167   } 
else if(tmp == 0 && apiSignalLog != 0){
 
  172     if (strcmp(tmp, 
"-") == 0)
 
  175     else if (strcmp(tmp, 
"+") == 0)
 
  187 TRACE_GSN(Uint32 gsn)
 
  190 #ifndef TRACE_APIREGREQ 
  192   case GSN_API_REGCONF:
 
  196   case GSN_SUB_GCP_COMPLETE_REP:
 
  197   case GSN_SUB_GCP_COMPLETE_ACK:
 
  211                                   Uint8 prio, Uint32 * 
const theData,
 
  214   Uint32 tRecBlockNo = header->theReceiversBlockNumber;
 
  217   if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
 
  222                                ptr, header->m_noOfSections);
 
  223     signalLogger.flushSignalLog();
 
  227   if (tRecBlockNo >= MIN_API_BLOCK_NO)
 
  229     trp_client * clnt = m_threads.get(tRecBlockNo);
 
  245       tSignal->setDataPtr(theData);
 
  246       clnt->trp_deliver_signal(tSignal, ptr);
 
  249   else if (tRecBlockNo == API_PACKED)
 
  257     Uint32 Tlength = header->theLength;
 
  263     while (Tsent < Tlength) {
 
  264       Uint32 Theader = theData[Tsent];
 
  266       Uint32 TpacketLen = (Theader & 0x1F) + 3;
 
  267       tRecBlockNo = Theader >> 16;
 
  268       if (TpacketLen <= 25)
 
  270         if ((TpacketLen + Tsent) <= Tlength)
 
  276           header->theLength = TpacketLen;
 
  277           header->theReceiversBlockNumber = tRecBlockNo;
 
  278           Uint32* tDataPtr = &theData[Tsent];
 
  280           if (tRecBlockNo >= MIN_API_BLOCK_NO)
 
  282             trp_client * clnt = m_threads.get(tRecBlockNo);
 
  287               tSignal->setDataPtr(tDataPtr);
 
  288               clnt->trp_deliver_signal(tSignal, 0);
 
  296   else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
 
  297            tRecBlockNo <= MAX_API_FIXED_BLOCK_NO) 
 
  299     Uint32 dynamic= m_fixed2dynamic[tRecBlockNo - MIN_API_FIXED_BLOCK_NO];
 
  305       tSignal->setDataPtr(theData);
 
  306       clnt->trp_deliver_signal(tSignal, ptr);
 
  312     if(header->theVerId_signalNumber != GSN_API_REGREQ)
 
  314       TRP_DEBUG( 
"TransporterFacade received signal to unknown block no." );
 
  315       ndbout << 
"BLOCK NO: "  << tRecBlockNo << 
" sig "  
  316              << header->theVerId_signalNumber  << endl;
 
  331 copy(Uint32 * & insertPtr, 
 
  347   assert(theOwnId == 0);
 
  350 #if defined SIGPIPE && !defined _WIN32 
  351   (void)signal(SIGPIPE, SIG_IGN);
 
  355   if (theTransporterRegistry == NULL)
 
  358   if (!theTransporterRegistry->init(nodeId))
 
  361   if (theClusterMgr == NULL)
 
  364   if (theClusterMgr == NULL)
 
  370   if (!theTransporterRegistry->start_service(m_socket_server))
 
  373   theReceiveThread = NdbThread_Create(runReceiveResponse_C,
 
  377                                       NDB_THREAD_PRIO_LOW);
 
  379   theSendThread = NdbThread_Create(runSendRequest_C,
 
  383                                    NDB_THREAD_PRIO_LOW);
 
  385   theClusterMgr->startThread();
 
  398   DBUG_ENTER(
"TransporterFacade::stop_instance");
 
  404 TransporterFacade::doStop(){
 
  405   DBUG_ENTER(
"TransporterFacade::doStop");
 
  410   if (theClusterMgr != NULL) theClusterMgr->doStop();
 
  417   if (theReceiveThread) {
 
  418     NdbThread_WaitFor(theReceiveThread, &status);
 
  419     NdbThread_Destroy(&theReceiveThread);
 
  422     NdbThread_WaitFor(theSendThread, &status);
 
  423     NdbThread_Destroy(&theSendThread);
 
  430 runSendRequest_C(
void * me)
 
  436 void TransporterFacade::threadMainSend(
void)
 
  439   if (theTransporterRegistry->start_clients() == 0){
 
  440     ndbout_c(
"Unable to start theTransporterRegistry->start_clients");
 
  446   while(!theStopReceive) {
 
  447     NdbSleep_MilliSleep(10);
 
  448     NdbMutex_Lock(theMutexPtr);
 
  449     if (sendPerformedLastInterval == 0) {
 
  452     sendPerformedLastInterval = 0;
 
  453     NdbMutex_Unlock(theMutexPtr);
 
  455   theTransporterRegistry->stopSending();
 
  457   m_socket_server.stopServer();
 
  460   theTransporterRegistry->stop_clients();
 
  465 runReceiveResponse_C(
void * me)
 
  479 void TransporterFacade::threadMainReceive(
void)
 
  482 #ifdef NDB_SHM_TRANSPORTER 
  483   NdbThread_set_shm_sigmask(TRUE);
 
  485   while(!theStopReceive)
 
  487     theClusterMgr->lock();
 
  489     theClusterMgr->unlock();
 
  490     NdbSleep_MilliSleep(100);
 
  500 void TransporterFacade::external_poll(Uint32 wait_time)
 
  502   NdbMutex_Unlock(theMutexPtr);
 
  504 #ifdef NDB_SHM_TRANSPORTER 
  510   NdbThread_set_shm_sigmask(FALSE);
 
  513   const int res = theTransporterRegistry->pollReceive(wait_time);
 
  515 #ifdef NDB_SHM_TRANSPORTER 
  516   NdbThread_set_shm_sigmask(TRUE);
 
  519   NdbMutex_Lock(theMutexPtr);
 
  528   m_poll_queue_head(NULL),
 
  529   m_poll_queue_tail(NULL),
 
  530   theTransporterRegistry(0),
 
  538   theReceiveThread(NULL),
 
  539   m_fragmented_signal_id(0),
 
  540   m_globalDictCache(cache)
 
  542   DBUG_ENTER(
"TransporterFacade::TransporterFacade");
 
  543   theMutexPtr = NdbMutex_CreateWithName(
"TTFM");
 
  544   sendPerformedLastInterval = 0;
 
  546   for (
int i = 0; 
i < NO_API_FIXED_BLOCKS; 
i++)
 
  547     m_fixed2dynamic[
i]= RNIL;
 
  560 static bool is_mgmd(Uint32 nodeId,
 
  564   if (iter.find(CFG_NODE_ID, nodeId))
 
  567   if(iter.get(CFG_TYPE_OF_SECTION, &type))
 
  570   return (type == NODE_TYPE_MGM);
 
  575 TransporterFacade::do_connect_mgm(NodeId nodeId,
 
  579   DBUG_ENTER(
"TransporterFacade::do_connect_mgm");
 
  581   for(iter.first(); iter.valid(); iter.next())
 
  583     Uint32 nodeId1, nodeId2;
 
  584     if (iter.get(CFG_CONNECTION_NODE_1, &nodeId1) ||
 
  585         iter.get(CFG_CONNECTION_NODE_2, &nodeId2))
 
  589     if (nodeId1 != nodeId && nodeId2 != nodeId)
 
  593     if(is_mgmd(nodeId1, conf) && is_mgmd(nodeId2, conf))
 
  595       Uint32 remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
 
  596       DBUG_PRINT(
"info", (
"opening connection to node %d", remoteNodeId));
 
  597       doConnect(remoteNodeId);
 
  608   DBUG_ENTER(
"TransporterFacade::configure");
 
  610   assert(theOwnId == nodeId);
 
  611   assert(theTransporterRegistry);
 
  612   assert(theClusterMgr);
 
  617                                         * theTransporterRegistry,
 
  622   theClusterMgr->configure(nodeId, conf);
 
  625   if(iter.find(CFG_NODE_ID, nodeId))
 
  629   Uint32 total_send_buffer = 0;
 
  630   iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
 
  633   Uint32 auto_reconnect=1;
 
  634   iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);
 
  636   const char * priospec = 0;
 
  637   if (iter.get(CFG_HB_THREAD_PRIO, &priospec) == 0)
 
  639     NdbThread_SetHighPrioProperties(priospec);
 
  645   if (theClusterMgr->m_auto_reconnect == -1)
 
  647     theClusterMgr->m_auto_reconnect = auto_reconnect;
 
  651   signalLogger.logOn(
true, 0, SignalLoggerManager::LogInOut);
 
  655   if (!do_connect_mgm(nodeId, conf))
 
  671   Uint32 sz = m_threads.m_statusNext.size();
 
  672   for (Uint32 
i = 0; 
i < sz ; 
i ++) 
 
  675     if (clnt != 0 && clnt != sender)
 
  677       clnt->trp_deliver_signal(aSignal, ptr);
 
  683 TransporterFacade::connected()
 
  685   DBUG_ENTER(
"TransporterFacade::connected");
 
  686   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theOwnId));
 
  687   signal.theVerId_signalNumber = GSN_ALLOC_NODEID_CONF;
 
  688   signal.theReceiversBlockNumber = 0;
 
  690   signal.theLength = AllocNodeIdConf::SignalLength;
 
  695   rep->nodeId = theOwnId;
 
  699   Uint32 sz = m_threads.m_statusNext.size();
 
  700   for (Uint32 
i = 0; 
i < sz ; 
i ++)
 
  705       clnt->trp_deliver_signal(&signal, 0);
 
  712 TransporterFacade::close_clnt(
trp_client* clnt)
 
  717     NdbMutex_Lock(theMutexPtr);
 
  718     if (m_threads.get(clnt->m_blockNo) == clnt)
 
  720       m_threads.close(clnt->m_blockNo);
 
  727     NdbMutex_Unlock(theMutexPtr);
 
  735   DBUG_ENTER(
"TransporterFacade::open");
 
  736   Guard g(theMutexPtr);
 
  737   int r= m_threads.open(clnt);
 
  743   if (unlikely(blockNo != -1))
 
  746     Uint32 fixed_index = blockNo - MIN_API_FIXED_BLOCK_NO;
 
  748     assert(blockNo >= MIN_API_FIXED_BLOCK_NO &&
 
  749            fixed_index <= NO_API_FIXED_BLOCKS);
 
  751     m_fixed2dynamic[fixed_index]= r;
 
  756     r = numberToRef(r, theOwnId);
 
  760     r = numberToRef(r, 0);
 
  765 TransporterFacade::~TransporterFacade()
 
  767   DBUG_ENTER(
"TransporterFacade::~TransporterFacade");
 
  769   delete theClusterMgr;  
 
  770   NdbMutex_Lock(theMutexPtr);
 
  771   delete theTransporterRegistry;
 
  772   NdbMutex_Unlock(theMutexPtr);
 
  773   NdbMutex_Destroy(theMutexPtr);
 
  781 TransporterFacade::calculateSendLimit()
 
  784   Uint32 TthreadCount = 0;
 
  786   Uint32 sz = m_threads.m_statusNext.size();
 
  787   for (Ti = 0; Ti < sz; Ti++) {
 
  788     if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
 
  790       m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
 
  793   currentSendLimit = TthreadCount;
 
  794   if (currentSendLimit == 0) {
 
  795     currentSendLimit = 1;
 
  797   checkCounter = currentSendLimit << 2;
 
  805 void TransporterFacade::forceSend(Uint32 block_number) {
 
  807   m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
 
  808   sendPerformedLastInterval = 1;
 
  809   if (checkCounter < 0) {
 
  810     calculateSendLimit();
 
  819 TransporterFacade::checkForceSend(Uint32 block_number) {  
 
  820   m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
 
  832   if (theTransporterRegistry->
forceSendCheck(currentSendLimit) == 1) {
 
  833     sendPerformedLastInterval = 1;
 
  836   if (checkCounter < 0) {
 
  837     calculateSendLimit();
 
  846 TransporterFacade::sendSignal(
const NdbApiSignal * aSignal, NodeId aNode)
 
  848   const Uint32* tDataPtr = aSignal->getConstDataPtrSend();
 
  849   Uint32 Tlen = aSignal->theLength;
 
  850   Uint32 TBno = aSignal->theReceiversBlockNumber;
 
  852   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
 
  854     tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
 
  860     signalLogger.flushSignalLog();
 
  863   if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
 
  864     SendStatus ss = theTransporterRegistry->
prepareSend(aSignal,
 
  872       assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
 
  873              aSignal->readSignalNumber() == GSN_API_REGREQ ||
 
  874              (aSignal->readSignalNumber() == GSN_CONNECT_REP &&
 
  877     return (ss == SEND_OK ? 0 : -1);
 
  881     ndbout << 
"ERR: SigLen = " << Tlen << 
" BlockRec = " << TBno;
 
  882     ndbout << 
" SignalNo = " << aSignal->theVerId_signalNumber << endl;
 
  903   Uint32 realIterWords;                 
 
  908   const Uint32* lastReadPtr;            
 
  910   Uint32 lastReadPtrLen;                
 
  919     realIterator= ptr.sectionIter;
 
  920     realIterWords= ptr.sz;
 
  923     rangeLen= rangeRemain= realIterWords;
 
  928     assert(checkInvariants());
 
  937   bool checkInvariants()
 
  939     assert( (realIterator != NULL) || (realIterWords == 0) );
 
  940     assert( realCurrPos <= realIterWords );
 
  941     assert( rangeStart <= realIterWords );
 
  942     assert( (rangeStart+rangeLen) <= realIterWords);
 
  943     assert( rangeRemain <= rangeLen );
 
  946     assert( (lastReadPtr != NULL) || (rangeRemain == 0));
 
  951     assert( (lastReadPtr == NULL) || 
 
  952             ((rangeRemain == 0) || (lastReadPtrLen != 0)));
 
  964   void moveToPos(Uint32 pos)
 
  966     assert(pos <= realIterWords);
 
  968     if (pos < realCurrPos)
 
  971       realIterator->reset();
 
  977     if ((lastReadPtr == NULL) && 
 
  978         (realIterWords != 0) &&
 
  979         (pos != realIterWords))
 
  980       lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
 
  982     if (pos == realCurrPos)
 
  986     while (pos >= realCurrPos + lastReadPtrLen)
 
  988       realCurrPos+= lastReadPtrLen;
 
  989       lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
 
  990       assert(lastReadPtr != NULL);
 
  993     const Uint32 chunkOffset= pos - realCurrPos;
 
  994     lastReadPtr+= chunkOffset;
 
  995     lastReadPtrLen-= chunkOffset;
 
 1009     assert(checkInvariants());
 
 1010     if (start+len > realIterWords)
 
 1015     rangeLen= rangeRemain= len;
 
 1017     assert(checkInvariants());
 
 1030     assert(checkInvariants());
 
 1031     moveToPos(rangeStart);
 
 1032     rangeRemain= rangeLen;
 
 1033     assert(checkInvariants());
 
 1043     assert(checkInvariants());
 
 1044     const Uint32* currPtr= NULL;
 
 1048       assert(lastReadPtr != NULL);
 
 1049       assert(lastReadPtrLen != 0);
 
 1050       currPtr= lastReadPtr;
 
 1052       sz= MIN(rangeRemain, lastReadPtrLen);
 
 1054       if (sz == lastReadPtrLen)
 
 1058         lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
 
 1063         lastReadPtrLen-= sz;
 
 1073     assert(checkInvariants());
 
 1082 #define CHUNK_SZ ((((MAX_SEND_MESSAGE_BYTESIZE >> 2) / NDB_SECTION_SEGMENT_SZ) - 2 ) \ 
 1083                   * NDB_SECTION_SEGMENT_SZ) 
 1112 TransporterFacade::sendFragmentedSignal(
const NdbApiSignal* inputSignal,
 
 1121   Uint32 totalSectionLength= 0;
 
 1122   for (i= 0; i < secs; i++)
 
 1123     totalSectionLength+= ptr[i].sz;
 
 1126   if (totalSectionLength <= CHUNK_SZ)
 
 1127     return sendSignal(aSignal, aNode, ptr, secs);
 
 1131   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
 
 1133     tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
 
 1136                             aSignal->getConstDataPtrSend(),
 
 1138     signalLogger.flushSignalLog();
 
 1139     for (Uint32 i = 0; i<secs; i++)
 
 1140       ptr[i].sectionIter->reset();
 
 1147   Uint32 unique_id= m_fragmented_signal_id++; 
 
 1152   for (i= 0; i < 3; i++)
 
 1153     tmp_ptr[i]= (i < secs)? ptr[
i] : empty;
 
 1161   tmp_ptr[0].sectionIter= &sec0;
 
 1162   tmp_ptr[1].sectionIter= &sec1;
 
 1163   tmp_ptr[2].sectionIter= &sec2;
 
 1165   unsigned start_i= 0;
 
 1166   unsigned this_chunk_sz= 0;
 
 1167   unsigned fragment_info= 0;
 
 1168   Uint32 *tmp_signal_data= tmp_signal.getDataPtrSend();
 
 1169   for (i= 0; i < secs;) {
 
 1170     unsigned remaining_sec_sz= tmp_ptr[
i].sz;
 
 1171     tmp_signal_data[i-start_i]= 
i;
 
 1172     if (this_chunk_sz + remaining_sec_sz <= CHUNK_SZ)
 
 1175       this_chunk_sz+= remaining_sec_sz;
 
 1181       unsigned send_sz= CHUNK_SZ - this_chunk_sz;
 
 1197           NDB_SECTION_SEGMENT_SZ
 
 1198           *((send_sz+NDB_SECTION_SEGMENT_SZ-1)
 
 1199             /NDB_SECTION_SEGMENT_SZ);
 
 1200         if (send_sz > remaining_sec_sz)
 
 1201           send_sz= remaining_sec_sz;
 
 1207       tmp_ptr[
i].sz= send_sz;
 
 1210       const Uint32 total_sec_sz= ptr[
i].sz;
 
 1211       const Uint32 start= (total_sec_sz - remaining_sec_sz);
 
 1217       if (fragment_info < 2) 
 
 1222       tmp_signal_data[i-start_i+1]= unique_id;
 
 1223       tmp_signal.setLength(i-start_i+2);
 
 1224       tmp_signal.m_fragmentInfo= fragment_info;
 
 1225       tmp_signal.m_noOfSections= i-start_i+1;
 
 1228         SendStatus ss = theTransporterRegistry->
prepareSend 
 1234         assert(ss != SEND_MESSAGE_TOO_BIG);
 
 1235         if (ss != SEND_OK) 
return -1;
 
 1238           assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
 
 1239                  tmp_signal.readSignalNumber() == GSN_API_REGREQ);
 
 1245       assert(remaining_sec_sz >= send_sz);
 
 1246       Uint32 remaining= remaining_sec_sz - send_sz;
 
 1247       tmp_ptr[
i].sz= remaining;
 
 1249       ok= fragIter->
setRange(start+send_sz, remaining);
 
 1260   unsigned a_sz= aSignal->getLength();
 
 1262   if (fragment_info > 0) {
 
 1264     Uint32 *a_data= aSignal->getDataPtrSend();
 
 1265     unsigned tmp_sz= i-start_i;
 
 1268            tmp_sz*
sizeof(Uint32));
 
 1269     a_data[a_sz+tmp_sz]= unique_id;
 
 1270     aSignal->setLength(a_sz+tmp_sz+1);
 
 1273     aSignal->m_fragmentInfo= 3; 
 
 1274     aSignal->m_noOfSections= i-start_i;
 
 1276     aSignal->m_noOfSections= secs;
 
 1282     SendStatus ss = theTransporterRegistry->
prepareSend 
 1285        aSignal->getConstDataPtrSend(),
 
 1288     assert(ss != SEND_MESSAGE_TOO_BIG);
 
 1291       assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
 
 1292              aSignal->readSignalNumber() == GSN_API_REGREQ);
 
 1294     ret = (ss == SEND_OK ? 0 : -1);
 
 1296   aSignal->m_noOfSections = 0;
 
 1297   aSignal->m_fragmentInfo = 0;
 
 1298   aSignal->setLength(a_sz);
 
 1303 TransporterFacade::sendFragmentedSignal(
const NdbApiSignal* aSignal,
 
 1314   for (Uint32 j=0; j<3; j++)
 
 1315     linCopy[j]= (j < secs)? ptr[j] : empty;
 
 1322   tmpPtr[0].sz= linCopy[0].sz;
 
 1323   tmpPtr[0].sectionIter= &zero;
 
 1324   tmpPtr[1].sz= linCopy[1].sz;
 
 1325   tmpPtr[1].sectionIter= &one;
 
 1326   tmpPtr[2].sz= linCopy[2].sz;
 
 1327   tmpPtr[2].sectionIter= &two;
 
 1329   return sendFragmentedSignal(aSignal, aNode, tmpPtr, secs);
 
 1334 TransporterFacade::sendSignal(
const NdbApiSignal* aSignal, NodeId aNode,
 
 1337   Uint32 save = aSignal->m_noOfSections;
 
 1338   const_cast<NdbApiSignal*
>(aSignal)->m_noOfSections = secs;
 
 1340   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
 
 1342     tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
 
 1345                             aSignal->getConstDataPtrSend(),
 
 1347     signalLogger.flushSignalLog();
 
 1350   SendStatus ss = theTransporterRegistry->
prepareSend 
 1353      aSignal->getConstDataPtrSend(),
 
 1356   assert(ss != SEND_MESSAGE_TOO_BIG);
 
 1357   const_cast<NdbApiSignal*
>(aSignal)->m_noOfSections = save;
 
 1360     assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
 
 1361            aSignal->readSignalNumber() == GSN_API_REGREQ);
 
 1363   return (ss == SEND_OK ? 0 : -1);
 
 1367 TransporterFacade::sendSignal(
const NdbApiSignal* aSignal, NodeId aNode,
 
 1370   Uint32 save = aSignal->m_noOfSections;
 
 1371   const_cast<NdbApiSignal*
>(aSignal)->m_noOfSections = secs;
 
 1373   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
 
 1375     tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
 
 1378                             aSignal->getConstDataPtrSend(),
 
 1380     signalLogger.flushSignalLog();
 
 1381     for (Uint32 i = 0; i<secs; i++)
 
 1382       ptr[i].sectionIter->reset();
 
 1385   SendStatus ss = theTransporterRegistry->
prepareSend 
 1388      aSignal->getConstDataPtrSend(),
 
 1391   assert(ss != SEND_MESSAGE_TOO_BIG);
 
 1392   const_cast<NdbApiSignal*
>(aSignal)->m_noOfSections = save;
 
 1395     assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
 
 1396            aSignal->readSignalNumber() == GSN_API_REGREQ);
 
 1398   return (ss == SEND_OK ? 0 : -1);
 
 1405 TransporterFacade::doConnect(
int aNodeId){
 
 1406   theTransporterRegistry->setIOState(aNodeId, NoHalt);
 
 1411 TransporterFacade::doDisconnect(
int aNodeId)
 
 1417 TransporterFacade::reportConnected(
int aNodeId)
 
 1424 TransporterFacade::reportDisconnected(
int aNodeId)
 
 1431 TransporterFacade::ownId()
 const 
 1437 TransporterFacade::isConnected(NodeId aNodeId){
 
 1438   return theTransporterRegistry->is_connected(aNodeId);
 
 1442 TransporterFacade::get_an_alive_node()
 
 1444   DBUG_ENTER(
"TransporterFacade::get_an_alive_node");
 
 1445   DBUG_PRINT(
"enter", (
"theStartNodeId: %d", theStartNodeId));
 
 1447   const char* p = NdbEnv_GetEnv(
"NDB_ALIVE_NODE_ID", (
char*)0, 0);
 
 1448   if (p != 0 && *p != 0)
 
 1452   for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
 
 1453     if (get_node_alive(i)){
 
 1454       DBUG_PRINT(
"info", (
"Node %d is alive", i));
 
 1455       theStartNodeId = ((i + 1) % MAX_NDB_NODES);
 
 1459   for (i = 1; i < theStartNodeId; i++) {
 
 1460     if (get_node_alive(i)){
 
 1461       DBUG_PRINT(
"info", (
"Node %d is alive", i));
 
 1462       theStartNodeId = ((i + 1) % MAX_NDB_NODES);
 
 1466   DBUG_RETURN((NodeId)0);
 
 1469 TransporterFacade::ThreadData::ThreadData(Uint32 
size){
 
 1471   m_firstFree = END_OF_LIST;
 
 1476 TransporterFacade::ThreadData::expand(Uint32 
size){
 
 1479   const Uint32 sz = m_statusNext.size();
 
 1480   m_objectExecute.fill(sz + size, oe);
 
 1481   for(Uint32 i = 0; i<
size; i++){
 
 1482     m_statusNext.push_back(sz + i + 1);
 
 1485   m_statusNext.back() = m_firstFree;
 
 1486   m_firstFree = m_statusNext.size() - 
size;
 
 1491 TransporterFacade::ThreadData::open(
trp_client * clnt)
 
 1493   Uint32 nextFree = m_firstFree;
 
 1495   if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
 
 1499   if(nextFree == END_OF_LIST){
 
 1501     nextFree = m_firstFree;
 
 1505   m_firstFree = m_statusNext[nextFree];
 
 1507   m_statusNext[nextFree] = INACTIVE;
 
 1508   m_objectExecute[nextFree] = clnt;
 
 1510   return indexToNumber(nextFree);
 
 1514 TransporterFacade::ThreadData::close(
int number){
 
 1515   number= numberToIndex(number);
 
 1516   assert(m_objectExecute[number] != 0);
 
 1517   m_statusNext[number] = m_firstFree;
 
 1520   m_firstFree = number;
 
 1521   m_objectExecute[number] = 0;
 
 1526 TransporterFacade::get_active_ndb_objects()
 const 
 1528   return m_threads.m_use_cnt;
 
 1533 TransporterFacade::start_poll(
trp_client* clnt)
 
 1536   clnt->m_poll.m_locked = 
true;
 
 1540 TransporterFacade::do_poll(
trp_client* clnt, Uint32 wait_time)
 
 1542   clnt->m_poll.m_waiting = 
true;
 
 1543   assert(clnt->m_poll.m_locked == 
true);
 
 1545   if (owner != NULL && owner != clnt)
 
 1556     assert(clnt->m_poll.m_poll_owner == 
false);
 
 1557     add_to_poll_queue(clnt);
 
 1558     NdbCondition_WaitTimeout(clnt->m_poll.m_condition, theMutexPtr,
 
 1560     if (clnt != m_poll_owner && clnt->m_poll.m_waiting)
 
 1562       remove_from_poll_queue(clnt);
 
 1572     assert(owner == clnt || clnt->m_poll.m_poll_owner == 
false);
 
 1573     m_poll_owner = clnt;
 
 1574     clnt->m_poll.m_poll_owner = 
true;
 
 1575     external_poll(wait_time);
 
 1582   if (clnt->m_poll.m_waiting)
 
 1584     clnt->m_poll.m_waiting = 
false;
 
 1585     if (m_poll_owner != clnt)
 
 1587       remove_from_poll_queue(clnt);
 
 1588       NdbCondition_Signal(clnt->m_poll.m_condition);
 
 1594 TransporterFacade::complete_poll(
trp_client* clnt)
 
 1596   clnt->m_poll.m_waiting = 
false;
 
 1597   if (!clnt->m_poll.m_locked)
 
 1599     assert(clnt->m_poll.m_poll_owner == 
false);
 
 1616   if (m_poll_owner == clnt)
 
 1618     assert(clnt->m_poll.m_poll_owner == 
true);
 
 1619     m_poll_owner = new_owner = remove_last_from_poll_queue();
 
 1623     assert(new_owner->m_poll.m_poll_owner == 
false);
 
 1624     assert(new_owner->m_poll.m_locked == 
true);
 
 1625     assert(new_owner->m_poll.m_waiting == 
true);
 
 1626     NdbCondition_Signal(new_owner->m_poll.m_condition);
 
 1627     new_owner->m_poll.m_poll_owner = 
true;
 
 1629   clnt->m_poll.m_locked = 
false;
 
 1630   clnt->m_poll.m_poll_owner = 
false;
 
 1635 TransporterFacade::add_to_poll_queue(
trp_client* clnt)
 
 1638   assert(clnt->m_poll.m_prev == 0);
 
 1639   assert(clnt->m_poll.m_next == 0);
 
 1640   assert(clnt->m_poll.m_locked == 
true);
 
 1641   assert(clnt->m_poll.m_poll_owner == 
false);
 
 1643   if (m_poll_queue_head == 0)
 
 1645     assert(m_poll_queue_tail == 0);
 
 1646     m_poll_queue_head = clnt;
 
 1647     m_poll_queue_tail = clnt;
 
 1651     assert(m_poll_queue_tail->m_poll.m_next == 0);
 
 1652     m_poll_queue_tail->m_poll.m_next = clnt;
 
 1653     clnt->m_poll.m_prev = m_poll_queue_tail;
 
 1654     m_poll_queue_tail = clnt;
 
 1659 TransporterFacade::remove_from_poll_queue(
trp_client* clnt)
 
 1662   assert(clnt->m_poll.m_locked == 
true);
 
 1663   assert(clnt->m_poll.m_poll_owner == 
false);
 
 1665   if (clnt->m_poll.m_prev != 0)
 
 1667     clnt->m_poll.m_prev->m_poll.m_next = clnt->m_poll.m_next;
 
 1671     assert(m_poll_queue_head == clnt);
 
 1672     m_poll_queue_head = clnt->m_poll.m_next;
 
 1675   if (clnt->m_poll.m_next != 0)
 
 1677     clnt->m_poll.m_next->m_poll.m_prev = clnt->m_poll.m_prev;
 
 1681     assert(m_poll_queue_tail == clnt);
 
 1682     m_poll_queue_tail = clnt->m_poll.m_prev;
 
 1685   if (m_poll_queue_head == 0)
 
 1686     assert(m_poll_queue_tail == 0);
 
 1687   else if (m_poll_queue_tail == 0)
 
 1688     assert(m_poll_queue_head == 0);
 
 1690   clnt->m_poll.m_prev = 0;
 
 1691   clnt->m_poll.m_next = 0;
 
 1695 TransporterFacade::remove_last_from_poll_queue()
 
 1701   remove_from_poll_queue(clnt);
 
 1707 #include "SignalSender.hpp" 
 1710 SignalSectionIterator::getNextWords(Uint32& sz)
 
 1712   if (likely(currentSignal != NULL))
 
 1715     currentSignal= currentSignal->next();
 
 1716     sz= signal->getLength();
 
 1717     return signal->getDataPtrSend();
 
 1728 #define VERIFY(x) if ((x) == 0) { printf("VERIFY failed at Line %u : %s\n",__LINE__, #x);  return -1; } 
 1736   while (pos < dataWords)
 
 1738     const Uint32* readPtr=NULL;
 
 1741     readPtr= gsi.getNextWords(len);
 
 1743     VERIFY(readPtr != NULL);
 
 1745     VERIFY(len <= (Uint32) (dataWords - pos));
 
 1747     for (
int j=0; j < (int) len; j++)
 
 1748       VERIFY(readPtr[j] == (Uint32) (bias ++));
 
 1760   VERIFY(verifyIteratorContents(iter, size, bias) == 0);
 
 1765   VERIFY(iter.getNextWords(sz) == NULL);
 
 1768   VERIFY(iter.getNextWords(sz) == NULL);
 
 1774   VERIFY(verifyIteratorContents(iter, size, bias) == 0);
 
 1777   VERIFY(iter.getNextWords(sz) == NULL);
 
 1789   VERIFY(checkGenericSectionIterator(iter, size, bias) == 0);
 
 1794   const int subranges= 20;
 
 1799   ptr.sectionIter= &iter;
 
 1802   for (
int s=0; s< subranges; s++)
 
 1808       start= (Uint32) myRandom48(size);
 
 1809       if (0 != (size-start)) 
 
 1810         len= (Uint32) myRandom48(size-start);
 
 1817     fsi.setRange(start, len);
 
 1818     VERIFY(checkGenericSectionIterator(fsi, len, bias + start) == 0);
 
 1827 testLinearSectionIterator()
 
 1832   const int totalSize= 200000;
 
 1835   Uint32 data[totalSize];
 
 1836   for (
int i=0; i<totalSize; i++)
 
 1839   for (
int len= 0; len < 50000; len++)
 
 1843     VERIFY(checkIterator(something, len, bias) == 0);
 
 1850 createSignalChain(
NdbApiSignal*& poolHead, 
int length, 
int bias)
 
 1858   while (pos < length)
 
 1860     int offset= pos % NdbApiSignal::MaxSignalWords;
 
 1864       if (poolHead == NULL)
 
 1868       poolHead= poolHead->next();
 
 1873       if (chainHead == NULL)
 
 1875         chainHead= chainTail= newSig;
 
 1879         chainTail->next(newSig);
 
 1884     chainTail->getDataPtrSend()[
offset]= (bias + pos);
 
 1885     chainTail->setLength(offset + 1);
 
 1893 testSignalSectionIterator()
 
 1899   const int totalNumSignals= 1000;
 
 1903   for (
int i=0; i < totalNumSignals; i++)
 
 1907     if (poolHead == NULL)
 
 1914       sig->next(poolHead);
 
 1920   for (
int dataWords= 1; 
 
 1921        dataWords <= (int)(totalNumSignals * 
 
 1922                           NdbApiSignal::MaxSignalWords); 
 
 1927     VERIFY((signalChain= createSignalChain(poolHead, dataWords, bias)) != NULL );
 
 1931     VERIFY(checkIterator(ssi, dataWords, bias) == 0);
 
 1934     while (signalChain != NULL)
 
 1937       signalChain= signalChain->next();
 
 1939       sig->next(poolHead);
 
 1945   while (poolHead != NULL)
 
 1948     poolHead= sig->next();
 
 1955 int main(
int arg, 
char** argv)
 
 1968   VERIFY(testLinearSectionIterator() == 0);
 
 1969   VERIFY(testSignalSectionIterator() == 0);
 
 1978 TransporterFacade::set_auto_reconnect(
int val)
 
 1980   theClusterMgr->m_auto_reconnect = val;
 
 1984 TransporterFacade::get_auto_reconnect()
 const 
 1986   return theClusterMgr->m_auto_reconnect;
 
 1992   theClusterMgr->set_max_api_reg_req_interval(interval);
 
 1996 TransporterFacade::ext_update_connections()
 
 1998   theClusterMgr->lock();
 
 2000   theClusterMgr->unlock();
 
 2004 TransporterFacade::ext_get_connect_address(Uint32 nodeId)
 
 2006   return theTransporterRegistry->get_connect_address(nodeId);
 
 2010 TransporterFacade::ext_forceHB()
 
 2012   theClusterMgr->forceHB();
 
 2016 TransporterFacade::ext_isConnected(NodeId aNodeId)
 
 2019   theClusterMgr->lock();
 
 2020   val = theClusterMgr->theNodes[aNodeId].is_connected();
 
 2021   theClusterMgr->unlock();
 
 2026 TransporterFacade::ext_doConnect(
int aNodeId)
 
 2028   theClusterMgr->lock();
 
 2029   assert(theClusterMgr->theNodes[aNodeId].is_connected() == 
false);
 
 2031   theClusterMgr->unlock();