MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TransporterRegistry.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include <TransporterRegistry.hpp>
21 #include "TransporterInternalDefinitions.hpp"
22 
23 #include "Transporter.hpp"
24 #include <SocketAuthenticator.hpp>
25 
26 #ifdef NDB_TCP_TRANSPORTER
27 #include "TCP_Transporter.hpp"
28 #include "Loopback_Transporter.hpp"
29 #endif
30 
31 #ifdef NDB_SCI_TRANSPORTER
32 #include "SCI_Transporter.hpp"
33 #endif
34 
35 #ifdef NDB_SHM_TRANSPORTER
36 #include "SHM_Transporter.hpp"
37 extern int g_ndb_shm_signum;
38 #endif
39 
40 #include "NdbOut.hpp"
41 #include <NdbSleep.h>
42 #include <NdbTick.h>
43 #include <InputStream.hpp>
44 #include <OutputStream.hpp>
45 
46 #include <mgmapi/mgmapi.h>
47 #include <mgmapi_internal.h>
48 #include <mgmapi/mgmapi_debug.h>
49 
50 #include <EventLogger.hpp>
51 extern EventLogger * g_eventLogger;
52 
53 struct in_addr
54 TransporterRegistry::get_connect_address(NodeId node_id) const
55 {
56  return theTransporters[node_id]->m_connect_address;
57 }
58 
60 {
61  DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
62  if (m_auth && !m_auth->server_authenticate(sockfd)){
63  NDB_CLOSE_SOCKET(sockfd);
64  DBUG_RETURN(0);
65  }
66 
68  if (!m_transporter_registry->connect_server(sockfd, msg))
69  {
70  NDB_CLOSE_SOCKET(sockfd);
71  DBUG_RETURN(0);
72  }
73 
74  DBUG_RETURN(0);
75 }
76 
78  bool use_default_send_buffer,
79  unsigned _maxTransporters,
80  unsigned sizeOfLongSignalMemory) :
81  m_mgm_handle(0),
82  localNodeId(0),
83  m_transp_count(0),
84  m_use_default_send_buffer(use_default_send_buffer),
85  m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
86  m_total_max_send_buffer(0)
87 {
88  DBUG_ENTER("TransporterRegistry::TransporterRegistry");
89 
90  maxTransporters = _maxTransporters;
91  sendCounter = 1;
92 
93  callbackObj=callback;
94 
95  theTCPTransporters = new TCP_Transporter * [maxTransporters];
96  theSCITransporters = new SCI_Transporter * [maxTransporters];
97  theSHMTransporters = new SHM_Transporter * [maxTransporters];
98  theTransporterTypes = new TransporterType [maxTransporters];
99  theTransporters = new Transporter * [maxTransporters];
100  performStates = new PerformState [maxTransporters];
101  ioStates = new IOState [maxTransporters];
102  m_disconnect_errnum = new int [maxTransporters];
103  m_error_states = new ErrorState [maxTransporters];
104 
105  m_has_extra_wakeup_socket = false;
106 #if defined(HAVE_EPOLL_CREATE)
107  m_epoll_fd = -1;
108  m_epoll_events = new struct epoll_event[maxTransporters];
109  m_epoll_fd = epoll_create(maxTransporters);
110  if (m_epoll_fd == -1 || !m_epoll_events)
111  {
112  /* Failure to allocate data or get epoll socket, abort */
113  perror("Failed to alloc epoll-array or calling epoll_create... falling back to select!");
114  if (m_epoll_fd != -1)
115  {
116  close(m_epoll_fd);
117  m_epoll_fd = -1;
118  }
119  if (m_epoll_events)
120  {
121  delete [] m_epoll_events;
122  m_epoll_events = 0;
123  }
124  }
125  else
126  {
127  memset((char*)m_epoll_events, 0,
128  maxTransporters * sizeof(struct epoll_event));
129  }
130 
131 #endif
132 #ifdef ERROR_INSERT
133  m_blocked.clear();
134  m_blocked_with_data.clear();
135  m_blocked_disconnected.clear();
136 #endif
137  // Initialize member variables
138  nTransporters = 0;
139  nTCPTransporters = 0;
140  nSCITransporters = 0;
141  nSHMTransporters = 0;
142 
143  // Initialize the transporter arrays
144  ErrorState default_error_state = { TE_NO_ERROR, (const char *)~(UintPtr)0 };
145  for (unsigned i=0; i<maxTransporters; i++) {
146  theTCPTransporters[i] = NULL;
147  theSCITransporters[i] = NULL;
148  theSHMTransporters[i] = NULL;
149  theTransporters[i] = NULL;
150  performStates[i] = DISCONNECTED;
151  ioStates[i] = NoHalt;
152  m_disconnect_errnum[i]= 0;
153  m_error_states[i] = default_error_state;
154  }
155 
156  DBUG_VOID_RETURN;
157 }
158 
159 void
161 {
162  if (!m_use_default_send_buffer)
163  return;
164 
165  if (total_send_buffer == 0)
166  total_send_buffer = get_total_max_send_buffer();
167 
168  if (m_send_buffers)
169  {
170  /* Send buffers already allocated -> resize the buffer pages */
171  assert(m_send_buffer_memory);
172 
173  // TODO resize send buffer pages
174 
175  return;
176  }
177 
178  /* Initialize transporter send buffers (initially empty). */
179  m_send_buffers = new SendBuffer[maxTransporters];
180  for (unsigned i = 0; i < maxTransporters; i++)
181  {
182  SendBuffer &b = m_send_buffers[i];
183  b.m_first_page = NULL;
184  b.m_last_page = NULL;
185  b.m_used_bytes = 0;
186  }
187 
188  /* Initialize the page freelist. */
189  Uint64 send_buffer_pages =
190  (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
191  /* Add one extra page of internal fragmentation overhead per transporter. */
192  send_buffer_pages += nTransporters;
193 
194  m_send_buffer_memory =
195  new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
196  if (m_send_buffer_memory == NULL)
197  {
198  ndbout << "Unable to allocate "
199  << send_buffer_pages * SendBufferPage::PGSIZE
200  << " bytes of memory for send buffers, aborting." << endl;
201  abort();
202  }
203 
204  m_page_freelist = NULL;
205  for (unsigned i = 0; i < send_buffer_pages; i++)
206  {
207  SendBufferPage *page =
208  (SendBufferPage *)(m_send_buffer_memory + i * SendBufferPage::PGSIZE);
209  page->m_bytes = 0;
210  page->m_next = m_page_freelist;
211  m_page_freelist = page;
212  }
213 }
214 
216 {
217  DBUG_ENTER("TransporterRegistry::set_mgm_handle");
218  if (m_mgm_handle)
219  ndb_mgm_destroy_handle(&m_mgm_handle);
220  m_mgm_handle= h;
221  ndb_mgm_set_timeout(m_mgm_handle, 5000);
222 #ifndef DBUG_OFF
223  if (h)
224  {
225  char buf[256];
226  DBUG_PRINT("info",("handle set with connectstring: %s",
227  ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
228  }
229  else
230  {
231  DBUG_PRINT("info",("handle set to NULL"));
232  }
233 #endif
234  DBUG_VOID_RETURN;
235 }
236 
238 {
239  DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
240 
241  removeAll();
242 
243  delete[] theTCPTransporters;
244  delete[] theSCITransporters;
245  delete[] theSHMTransporters;
246  delete[] theTransporterTypes;
247  delete[] theTransporters;
248  delete[] performStates;
249  delete[] ioStates;
250  delete[] m_disconnect_errnum;
251  delete[] m_error_states;
252 
253  if (m_send_buffers)
254  delete[] m_send_buffers;
255  m_page_freelist = NULL;
256  if (m_send_buffer_memory)
257  delete[] m_send_buffer_memory;
258 
259 #if defined(HAVE_EPOLL_CREATE)
260  if (m_epoll_events) delete [] m_epoll_events;
261  if (m_epoll_fd != -1) close(m_epoll_fd);
262 #endif
263  if (m_mgm_handle)
264  ndb_mgm_destroy_handle(&m_mgm_handle);
265 
266  if (m_has_extra_wakeup_socket)
267  {
268  my_socket_close(m_extra_wakeup_sockets[0]);
269  my_socket_close(m_extra_wakeup_sockets[1]);
270  }
271 
272  DBUG_VOID_RETURN;
273 }
274 
275 void
277  for(unsigned i = 0; i<maxTransporters; i++){
278  if(theTransporters[i] != NULL)
279  removeTransporter(theTransporters[i]->getRemoteNodeId());
280  }
281 }
282 
283 void
285  for(unsigned i = 0; i<maxTransporters; i++){
286  if(theTransporters[i] != NULL)
287  theTransporters[i]->doDisconnect();
288  }
289 }
290 
291 bool
292 TransporterRegistry::init(NodeId nodeId) {
293  DBUG_ENTER("TransporterRegistry::init");
294  assert(localNodeId == 0 ||
295  localNodeId == nodeId);
296 
297  localNodeId = nodeId;
298 
299  DEBUG("TransporterRegistry started node: " << localNodeId);
300 
301  if (!m_socket_poller.set_max_count(maxTransporters +
302  1 /* wakeup socket */))
303  DBUG_RETURN(false);
304 
305  DBUG_RETURN(true);
306 }
307 
308 bool
309 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
310  BaseString & msg) const
311 {
312  DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
313 
314  // Read "hello" that consists of node id and transporter
315  // type from client
316  SocketInputStream s_input(sockfd);
317  char buf[11+1+11+1]; // <int> <int>
318  if (s_input.gets(buf, sizeof(buf)) == 0) {
319  msg.assfmt("line: %u : Failed to get nodeid from client", __LINE__);
320  DBUG_PRINT("error", ("Failed to read 'hello' from client"));
321  DBUG_RETURN(false);
322  }
323 
324  int nodeId, remote_transporter_type= -1;
325  int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
326  switch (r) {
327  case 2:
328  break;
329  case 1:
330  // we're running version prior to 4.1.9
331  // ok, but with no checks on transporter configuration compatability
332  break;
333  default:
334  msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
335  DBUG_PRINT("error", ("Failed to parse 'hello' from client, buf: '%.*s'",
336  (int)sizeof(buf), buf));
337  DBUG_RETURN(false);
338  }
339 
340  DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d",
341  nodeId, remote_transporter_type));
342 
343 
344  // Check that nodeid is in range before accessing the arrays
345  if (nodeId < 0 ||
346  nodeId >= (int)maxTransporters)
347  {
348  msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
349  DBUG_PRINT("error", ("Out of range nodeId: %d from client",
350  nodeId));
351  DBUG_RETURN(false);
352  }
353 
354  // Check that transporter is allocated
355  Transporter *t= theTransporters[nodeId];
356  if (t == 0)
357  {
358  msg.assfmt("line: %u : Incorrect reply from client: >%s<, node: %u",
359  __LINE__, buf, nodeId);
360  DBUG_PRINT("error", ("No transporter available for node id %d", nodeId));
361  DBUG_RETURN(false);
362  }
363 
364  // Check that the transporter should be connecting
365  if (performStates[nodeId] != TransporterRegistry::CONNECTING)
366  {
367  msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
368  __LINE__, nodeId,
369  getPerformStateString(performStates[nodeId]),
370  performStates[nodeId]);
371 
372  DBUG_PRINT("error", ("Transporter for node id %d in wrong state",
373  nodeId));
374  DBUG_RETURN(false);
375  }
376 
377  // Check transporter type
378  if (remote_transporter_type != -1 &&
379  remote_transporter_type != t->m_type)
380  {
381  g_eventLogger->error("Connection from node: %d uses different transporter "
382  "type: %d, expected type: %d",
383  nodeId, remote_transporter_type, t->m_type);
384  DBUG_RETURN(false);
385  }
386 
387  // Send reply to client
388  SocketOutputStream s_output(sockfd);
389  if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
390  {
391  msg.assfmt("line: %u : Failed to reply to connecting socket (node: %u)",
392  __LINE__, nodeId);
393  DBUG_PRINT("error", ("Send of reply failed"));
394  DBUG_RETURN(false);
395  }
396 
397  // Setup transporter (transporter responsible for closing sockfd)
398  bool res = t->connect_server(sockfd, msg);
399 
400  if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
401  {
402  msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
403  __LINE__, nodeId,
404  getPerformStateString(performStates[nodeId]),
405  performStates[nodeId]);
406  // Connection suceeded, but not connecting anymore, return
407  // false to close the connection
408  DBUG_RETURN(false);
409  }
410 
411  DBUG_RETURN(res);
412 }
413 
414 
415 bool
417 {
418  NodeId remoteNodeId = config->remoteNodeId;
419 
420  assert(localNodeId);
421  assert(config->localNodeId == localNodeId);
422 
423  if (remoteNodeId >= maxTransporters)
424  return false;
425 
426  Transporter* t = theTransporters[remoteNodeId];
427  if(t != NULL)
428  {
429  // Transporter already exist, try to reconfigure it
430  return t->configure(config);
431  }
432 
433  DEBUG("Configuring transporter from " << localNodeId
434  << " to " << remoteNodeId);
435 
436  switch (config->type){
437  case tt_TCP_TRANSPORTER:
438  return createTCPTransporter(config);
439  case tt_SHM_TRANSPORTER:
440  return createSHMTransporter(config);
441  case tt_SCI_TRANSPORTER:
442  return createSCITransporter(config);
443  default:
444  abort();
445  break;
446  }
447  return false;
448 }
449 
450 
451 bool
452 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
453 #ifdef NDB_TCP_TRANSPORTER
454 
455  TCP_Transporter * t = 0;
456  if (config->remoteNodeId == config->localNodeId)
457  {
458  t = new Loopback_Transporter(* this, config);
459  }
460  else
461  {
462  t = new TCP_Transporter(*this, config);
463  }
464 
465  if (t == NULL)
466  return false;
467  else if (!t->initTransporter()) {
468  delete t;
469  return false;
470  }
471 
472  // Put the transporter in the transporter arrays
473  theTCPTransporters[nTCPTransporters] = t;
474  theTransporters[t->getRemoteNodeId()] = t;
475  theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
476  performStates[t->getRemoteNodeId()] = DISCONNECTED;
477  nTransporters++;
478  nTCPTransporters++;
479  m_total_max_send_buffer += t->get_max_send_buffer();
480 
481  return true;
482 #else
483  return false;
484 #endif
485 }
486 
487 bool
488 TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
489 #ifdef NDB_SCI_TRANSPORTER
490 
491  if(!SCI_Transporter::initSCI())
492  abort();
493 
494  SCI_Transporter * t = new SCI_Transporter(*this,
495  config->localHostName,
496  config->remoteHostName,
497  config->s_port,
498  config->isMgmConnection,
499  config->sci.sendLimit,
500  config->sci.bufferSize,
501  config->sci.nLocalAdapters,
502  config->sci.remoteSciNodeId0,
503  config->sci.remoteSciNodeId1,
504  localNodeId,
505  config->remoteNodeId,
506  config->serverNodeId,
507  config->checksum,
508  config->signalId);
509 
510  if (t == NULL)
511  return false;
512  else if (!t->initTransporter()) {
513  delete t;
514  return false;
515  }
516  // Put the transporter in the transporter arrays
517  theSCITransporters[nSCITransporters] = t;
518  theTransporters[t->getRemoteNodeId()] = t;
519  theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
520  performStates[t->getRemoteNodeId()] = DISCONNECTED;
521  nTransporters++;
522  nSCITransporters++;
523  m_total_max_send_buffer += t->get_max_send_buffer();
524 
525  return true;
526 #else
527  return false;
528 #endif
529 }
530 
531 bool
532 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
533  DBUG_ENTER("TransporterRegistry::createTransporter SHM");
534 #ifdef NDB_SHM_TRANSPORTER
535 
536  if (!g_ndb_shm_signum) {
537  g_ndb_shm_signum= config->shm.signum;
538  DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
543  NdbThread_set_shm_sigmask(TRUE);
544  }
545 
546  if(config->shm.signum != g_ndb_shm_signum)
547  return false;
548 
549  SHM_Transporter * t = new SHM_Transporter(*this,
550  config->localHostName,
551  config->remoteHostName,
552  config->s_port,
553  config->isMgmConnection,
554  localNodeId,
555  config->remoteNodeId,
556  config->serverNodeId,
557  config->checksum,
558  config->signalId,
559  config->shm.shmKey,
560  config->shm.shmSize
561  );
562  if (t == NULL)
563  return false;
564  else if (!t->initTransporter()) {
565  delete t;
566  return false;
567  }
568  // Put the transporter in the transporter arrays
569  theSHMTransporters[nSHMTransporters] = t;
570  theTransporters[t->getRemoteNodeId()] = t;
571  theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
572  performStates[t->getRemoteNodeId()] = DISCONNECTED;
573 
574  nTransporters++;
575  nSHMTransporters++;
576  m_total_max_send_buffer += t->get_max_send_buffer();
577 
578  DBUG_RETURN(true);
579 #else
580  DBUG_RETURN(false);
581 #endif
582 }
583 
584 
585 void
586 TransporterRegistry::removeTransporter(NodeId nodeId) {
587 
588  DEBUG("Removing transporter from " << localNodeId
589  << " to " << nodeId);
590 
591  if(theTransporters[nodeId] == NULL)
592  return;
593 
594  theTransporters[nodeId]->doDisconnect();
595 
596  const TransporterType type = theTransporterTypes[nodeId];
597 
598  int ind = 0;
599  switch(type){
600  case tt_TCP_TRANSPORTER:
601 #ifdef NDB_TCP_TRANSPORTER
602  for(; ind < nTCPTransporters; ind++)
603  if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
604  break;
605  ind++;
606  for(; ind<nTCPTransporters; ind++)
607  theTCPTransporters[ind-1] = theTCPTransporters[ind];
608  nTCPTransporters --;
609 #endif
610  break;
611  case tt_SCI_TRANSPORTER:
612 #ifdef NDB_SCI_TRANSPORTER
613  for(; ind < nSCITransporters; ind++)
614  if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
615  break;
616  ind++;
617  for(; ind<nSCITransporters; ind++)
618  theSCITransporters[ind-1] = theSCITransporters[ind];
619  nSCITransporters --;
620 #endif
621  break;
622  case tt_SHM_TRANSPORTER:
623 #ifdef NDB_SHM_TRANSPORTER
624  for(; ind < nSHMTransporters; ind++)
625  if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
626  break;
627  ind++;
628  for(; ind<nSHMTransporters; ind++)
629  theSHMTransporters[ind-1] = theSHMTransporters[ind];
630  nSHMTransporters --;
631 #endif
632  break;
633  }
634 
635  nTransporters--;
636 
637  // Delete the transporter and remove it from theTransporters array
638  delete theTransporters[nodeId];
639  theTransporters[nodeId] = NULL;
640 }
641 
642 SendStatus
644  const SignalHeader * const signalHeader,
645  Uint8 prio,
646  const Uint32 * const signalData,
647  NodeId nodeId,
648  const LinearSectionPtr ptr[3]){
649 
650 
651  Transporter *t = theTransporters[nodeId];
652  if(t != NULL &&
653  (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
654  ((signalHeader->theReceiversBlockNumber == 252) ||
655  (signalHeader->theReceiversBlockNumber == 4002)))) {
656 
657  if(t->isConnected()){
658  Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
659  if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
660  Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
661  if(insertPtr != 0){
662  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
663  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
664  return SEND_OK;
665  }
666 
667  int sleepTime = 2;
668 
673  for(int i = 0; i<50; i++){
674  if((nSHMTransporters+nSCITransporters) == 0)
675  NdbSleep_MilliSleep(sleepTime);
676  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
677  if(insertPtr != 0){
678  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
679  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
680  break;
681  }
682  }
683 
684  if(insertPtr != 0){
688  report_error(nodeId, TE_SEND_BUFFER_FULL);
689  return SEND_OK;
690  }
691 
692  WARNING("Signal to " << nodeId << " lost(buffer)");
693  report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
694  return SEND_BUFFER_FULL;
695  } else {
696  return SEND_MESSAGE_TOO_BIG;
697  }
698  } else {
699 #ifdef ERROR_INSERT
700  if (m_blocked.get(nodeId))
701  {
702  /* Looks like it disconnected while blocked. We'll pretend
703  * not to notice for now
704  */
705  WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
706  return SEND_OK;
707  }
708 #endif
709  DEBUG("Signal to " << nodeId << " lost(disconnect) ");
710  return SEND_DISCONNECTED;
711  }
712  } else {
713  DEBUG("Discarding message to block: "
714  << signalHeader->theReceiversBlockNumber
715  << " node: " << nodeId);
716 
717  if(t == NULL)
718  return SEND_UNKNOWN_NODE;
719 
720  return SEND_BLOCKED;
721  }
722 }
723 
724 SendStatus
726  const SignalHeader * const signalHeader,
727  Uint8 prio,
728  const Uint32 * const signalData,
729  NodeId nodeId,
730  class SectionSegmentPool & thePool,
731  const SegmentedSectionPtr ptr[3]){
732 
733 
734  Transporter *t = theTransporters[nodeId];
735  if(t != NULL &&
736  (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
737  ((signalHeader->theReceiversBlockNumber == 252)||
738  (signalHeader->theReceiversBlockNumber == 4002)))) {
739 
740  if(t->isConnected()){
741  Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
742  if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
743  Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
744  if(insertPtr != 0){
745  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
746  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
747  return SEND_OK;
748  }
749 
750 
755  int sleepTime = 2;
756  for(int i = 0; i<50; i++){
757  if((nSHMTransporters+nSCITransporters) == 0)
758  NdbSleep_MilliSleep(sleepTime);
759  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
760  if(insertPtr != 0){
761  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
762  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
763  break;
764  }
765  }
766 
767  if(insertPtr != 0){
771  report_error(nodeId, TE_SEND_BUFFER_FULL);
772  return SEND_OK;
773  }
774 
775  WARNING("Signal to " << nodeId << " lost(buffer)");
776  report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
777  return SEND_BUFFER_FULL;
778  } else {
779  return SEND_MESSAGE_TOO_BIG;
780  }
781  } else {
782 #ifdef ERROR_INSERT
783  if (m_blocked.get(nodeId))
784  {
785  /* Looks like it disconnected while blocked. We'll pretend
786  * not to notice for now
787  */
788  WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
789  return SEND_OK;
790  }
791 #endif
792  DEBUG("Signal to " << nodeId << " lost(disconnect) ");
793  return SEND_DISCONNECTED;
794  }
795  } else {
796  DEBUG("Discarding message to block: "
797  << signalHeader->theReceiversBlockNumber
798  << " node: " << nodeId);
799 
800  if(t == NULL)
801  return SEND_UNKNOWN_NODE;
802 
803  return SEND_BLOCKED;
804  }
805 }
806 
807 
808 SendStatus
810  const SignalHeader * const signalHeader,
811  Uint8 prio,
812  const Uint32 * const signalData,
813  NodeId nodeId,
814  const GenericSectionPtr ptr[3]){
815 
816 
817  Transporter *t = theTransporters[nodeId];
818  if(t != NULL &&
819  (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
820  ((signalHeader->theReceiversBlockNumber == 252) ||
821  (signalHeader->theReceiversBlockNumber == 4002)))) {
822 
823  if(t->isConnected()){
824  Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
825  if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
826  Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
827  if(insertPtr != 0){
828  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
829  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
830  return SEND_OK;
831  }
832 
833 
838  int sleepTime = 2;
839  for(int i = 0; i<50; i++){
840  if((nSHMTransporters+nSCITransporters) == 0)
841  NdbSleep_MilliSleep(sleepTime);
842  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
843  if(insertPtr != 0){
844  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
845  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
846  break;
847  }
848  }
849 
850  if(insertPtr != 0){
854  report_error(nodeId, TE_SEND_BUFFER_FULL);
855  return SEND_OK;
856  }
857 
858  WARNING("Signal to " << nodeId << " lost(buffer)");
859  report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
860  return SEND_BUFFER_FULL;
861  } else {
862  return SEND_MESSAGE_TOO_BIG;
863  }
864  } else {
865  DEBUG("Signal to " << nodeId << " lost(disconnect) ");
866  return SEND_DISCONNECTED;
867  }
868  } else {
869  DEBUG("Discarding message to block: "
870  << signalHeader->theReceiversBlockNumber
871  << " node: " << nodeId);
872 
873  if(t == NULL)
874  return SEND_UNKNOWN_NODE;
875 
876  return SEND_BLOCKED;
877  }
878 }
879 
880 void
881 TransporterRegistry::external_IO(Uint32 timeOutMillis) {
882  //-----------------------------------------------------------
883  // Most of the time we will send the buffers here and then wait
884  // for new signals. Thus we start by sending without timeout
885  // followed by the receive part where we expect to sleep for
886  // a while.
887  //-----------------------------------------------------------
888  if(pollReceive(timeOutMillis)){
889  performReceive();
890  }
891  performSend();
892 }
893 
894 bool
895 TransporterRegistry::setup_wakeup_socket()
896 {
897  if (m_has_extra_wakeup_socket)
898  {
899  return true;
900  }
901 
902  if (my_socketpair(m_extra_wakeup_sockets))
903  {
904  perror("socketpair failed!");
905  return false;
906  }
907 
908  if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
909  !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
910  {
911  goto err;
912  }
913 
914 #if defined(HAVE_EPOLL_CREATE)
915  if (m_epoll_fd != -1)
916  {
917  int sock = m_extra_wakeup_sockets[0].fd;
918  struct epoll_event event_poll;
919  bzero(&event_poll, sizeof(event_poll));
920  event_poll.data.u32 = 0;
921  event_poll.events = EPOLLIN;
922  int ret_val = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, sock, &event_poll);
923  if (ret_val != 0)
924  {
925  int error= errno;
926  fprintf(stderr, "Failed to add extra sock %u to epoll-set: %u\n",
927  sock, error);
928  fflush(stderr);
929  goto err;
930  }
931  }
932 #endif
933  m_has_extra_wakeup_socket = true;
934  return true;
935 
936 err:
937  my_socket_close(m_extra_wakeup_sockets[0]);
938  my_socket_close(m_extra_wakeup_sockets[1]);
939  my_socket_invalidate(m_extra_wakeup_sockets+0);
940  my_socket_invalidate(m_extra_wakeup_sockets+1);
941  return false;
942 }
943 
944 void
945 TransporterRegistry::wakeup()
946 {
947  if (m_has_extra_wakeup_socket)
948  {
949  static char c = 37;
950  my_send(m_extra_wakeup_sockets[1], &c, 1, 0);
951  }
952 }
953 
954 Uint32
955 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
956  NodeBitmask& mask)
957 {
958  Uint32 retVal = 0;
959 
964  if (!mask.isclear())
965  {
966  timeOutMillis = 0;
967  retVal = 1;
968  }
969 
970  if (nSCITransporters > 0)
971  {
972  timeOutMillis=0;
973  }
974 
975 #ifdef NDB_SHM_TRANSPORTER
976  if (nSHMTransporters > 0)
977  {
978  Uint32 res = poll_SHM(0, mask);
979  if(res)
980  {
981  retVal |= res;
982  timeOutMillis = 0;
983  }
984  }
985 #endif
986 
987 #ifdef NDB_TCP_TRANSPORTER
988 #if defined(HAVE_EPOLL_CREATE)
989  if (likely(m_epoll_fd != -1))
990  {
991  Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
992 
993  if (num_trps)
994  {
995  tcpReadSelectReply = epoll_wait(m_epoll_fd, m_epoll_events,
996  num_trps, timeOutMillis);
997  retVal |= tcpReadSelectReply;
998  }
999 
1000  int num_socket_events = tcpReadSelectReply;
1001  if (num_socket_events > 0)
1002  {
1003  for (int i = 0; i < num_socket_events; i++)
1004  {
1005  const Uint32 trpid = m_epoll_events[i].data.u32;
1006 #ifdef ERROR_INSERT
1007  if (m_blocked.get(trpid))
1008  {
1009  /* Don't pull from socket now, wait till unblocked */
1010  m_blocked_with_data.set(trpid);
1011  continue;
1012  }
1013 #endif
1014  mask.set(trpid);
1015  }
1016  }
1017  else if (num_socket_events < 0)
1018  {
1019  assert(errno == EINTR);
1020  }
1021  }
1022  else
1023 #endif
1024  {
1025  if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
1026  {
1027  retVal |= poll_TCP(timeOutMillis, mask);
1028  }
1029  else
1030  tcpReadSelectReply = 0;
1031  }
1032 #endif
1033 #ifdef NDB_SCI_TRANSPORTER
1034  if (nSCITransporters > 0)
1035  retVal |= poll_SCI(timeOutMillis, mask);
1036 #endif
1037 #ifdef NDB_SHM_TRANSPORTER
1038  if (nSHMTransporters > 0)
1039  {
1040  int res = poll_SHM(0, mask);
1041  retVal |= res;
1042  }
1043 #endif
1044  return retVal;
1045 }
1046 
1047 
1048 #ifdef NDB_SCI_TRANSPORTER
1049 Uint32
1050 TransporterRegistry::poll_SCI(Uint32 timeOutMillis, NodeBitmask& mask)
1051 {
1052  Uint32 retVal = 0;
1053  for (int i = 0; i < nSCITransporters; i++)
1054  {
1055  SCI_Transporter * t = theSCITransporters[i];
1056  Uint32 node_id = t->getRemoteNodeId();
1057  if (t->isConnected() && is_connected(node_id))
1058  {
1059  if (t->hasDataToRead())
1060  {
1061  mask.set(node_id);
1062  retVal = 1;
1063  }
1064  }
1065  }
1066  return retVal;
1067 }
1068 #endif
1069 
1070 
1071 #ifdef NDB_SHM_TRANSPORTER
1072 static int g_shm_counter = 0;
1073 Uint32
1074 TransporterRegistry::poll_SHM(Uint32 timeOutMillis, NodeBitmask& mask)
1075 {
1076  Uint32 retVal = 0;
1077  for (int j = 0; j < 100; j++)
1078  {
1079  for (int i = 0; i<nSHMTransporters; i++)
1080  {
1081  SHM_Transporter * t = theSHMTransporters[i];
1082  Uint32 node_id = t->getRemoteNodeId();
1083  if (t->isConnected() && is_connected(node_id))
1084  {
1085  if (t->hasDataToRead())
1086  {
1087  j = 100;
1088  mask.set(node_id);
1089  retVal = 1;
1090  }
1091  }
1092  }
1093  }
1094  return retVal;
1095 }
1096 #endif
1097 
1098 #ifdef NDB_TCP_TRANSPORTER
1099 
1107 Uint32
1108 TransporterRegistry::poll_TCP(Uint32 timeOutMillis, NodeBitmask& mask)
1109 {
1110  m_socket_poller.clear();
1111 
1112  if (m_has_extra_wakeup_socket)
1113  {
1114  const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
1115 
1116  // Poll the wakup-socket for read
1117  m_socket_poller.add(socket, true, false, false);
1118  }
1119 
1120  Uint16 idx[MAX_NODES];
1121  for (int i = 0; i < nTCPTransporters; i++)
1122  {
1123  TCP_Transporter * t = theTCPTransporters[i];
1124  const NDB_SOCKET_TYPE socket = t->getSocket();
1125  Uint32 node_id = t->getRemoteNodeId();
1126 
1127  if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
1128  {
1129  idx[i] = m_socket_poller.add(socket, true, false, false);
1130  }
1131  else
1132  {
1133  idx[i] = MAX_NODES + 1;
1134  }
1135  }
1136 
1137  tcpReadSelectReply = m_socket_poller.poll_unsafe(timeOutMillis);
1138 
1139  if (tcpReadSelectReply > 0)
1140  {
1141  if (m_extra_wakeup_sockets)
1142  {
1143  if (m_socket_poller.has_read(0))
1144  mask.set((Uint32)0);
1145  }
1146 
1147  for (int i = 0; i < nTCPTransporters; i++)
1148  {
1149  TCP_Transporter * t = theTCPTransporters[i];
1150  if (idx[i] != MAX_NODES + 1)
1151  {
1152  Uint32 node_id = t->getRemoteNodeId();
1153 #ifdef ERROR_INSERT
1154  if (m_blocked.get(i))
1155  {
1156  /* Don't pull from socket now, wait till unblocked */
1157  m_blocked_with_data.set(i);
1158  continue;
1159  }
1160 #endif
1161  if (m_socket_poller.has_read(idx[i]))
1162  mask.set(node_id);
1163  }
1164  }
1165  }
1166 
1167  return tcpReadSelectReply;
1168 }
1169 #endif
1170 
1171 #if defined(HAVE_EPOLL_CREATE)
1172 bool
1173 TransporterRegistry::change_epoll(TCP_Transporter *t, bool add)
1174 {
1175  struct epoll_event event_poll;
1176  bzero(&event_poll, sizeof(event_poll));
1177  NDB_SOCKET_TYPE sock_fd = t->getSocket();
1178  int node_id = t->getRemoteNodeId();
1179  int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
1180  int ret_val, error;
1181 
1182  if (!my_socket_valid(sock_fd))
1183  return FALSE;
1184 
1185  event_poll.data.u32 = t->getRemoteNodeId();
1186  event_poll.events = EPOLLIN;
1187  ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
1188  if (!ret_val)
1189  goto ok;
1190  error= errno;
1191  if (error == ENOENT && !add)
1192  {
1193  /*
1194  * Could be that socket was closed premature to this call.
1195  * Not a problem that this occurs.
1196  */
1197  goto ok;
1198  }
1199  if (!add || (add && (error != ENOMEM)))
1200  {
1201  /*
1202  * Serious problems, we are either using wrong parameters,
1203  * have permission problems or the socket doesn't support
1204  * epoll!!
1205  */
1206  ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
1207  " node %u to epoll-set,"
1208  " errno: %u %s",
1209  add ? "ADD" : "DEL",
1210  m_epoll_fd,
1211  MY_SOCKET_FORMAT_VALUE(sock_fd),
1212  node_id,
1213  error,
1214  strerror(error));
1215  abort();
1216  }
1217  ndbout << "We lacked memory to add the socket for node id ";
1218  ndbout << node_id << endl;
1219  return TRUE;
1220 
1221 ok:
1222  return FALSE;
1223 }
1224 
1225 #endif
1226 
1230 void
1232 {
1233  bool hasReceived = false;
1234 
1235  if (m_has_data_transporters.get(0))
1236  {
1237  m_has_data_transporters.clear(Uint32(0));
1238  consume_extra_sockets();
1239  }
1240 
1241 #ifdef ERROR_INSERT
1242  if (!m_blocked.isclear())
1243  {
1244  if (m_has_data_transporters.isclear())
1245  {
1246  /* poll sees data, but we want to ignore for now
1247  * sleep a little to avoid busy loop
1248  */
1249  NdbSleep_MilliSleep(1);
1250  }
1251  }
1252 #endif
1253 
1254 #ifdef NDB_TCP_TRANSPORTER
1255  Uint32 id = 0;
1256  while ((id = m_has_data_transporters.find(id + 1)) != BitmaskImpl::NotFound)
1257  {
1258  bool hasdata = false;
1259  TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
1260  if (is_connected(id))
1261  {
1262  if (t->isConnected())
1263  {
1264  t->doReceive();
1265  if (hasReceived)
1266  callbackObj->checkJobBuffer();
1267  hasReceived = true;
1268  Uint32 * ptr;
1269  Uint32 sz = t->getReceiveData(&ptr);
1270  callbackObj->transporter_recv_from(id);
1271  Uint32 szUsed = unpack(ptr, sz, id, ioStates[id]);
1272  t->updateReceiveDataPtr(szUsed);
1273  hasdata = t->hasReceiveData();
1274  }
1275  }
1276  // If transporter still have data, make sure that it's remember to next time
1277  m_has_data_transporters.set(id, hasdata);
1278  }
1279 #endif
1280 
1281 #ifdef NDB_SCI_TRANSPORTER
1282  //performReceive
1283  //do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
1284  for (int i=0; i<nSCITransporters; i++)
1285  {
1286  SCI_Transporter *t = theSCITransporters[i];
1287  const NodeId nodeId = t->getRemoteNodeId();
1288  if(is_connected(nodeId))
1289  {
1290  if(t->isConnected() && t->checkConnected())
1291  {
1292  if (hasReceived)
1293  callbackObj->checkJobBuffer();
1294  hasReceived = true;
1295  Uint32 * readPtr, * eodPtr;
1296  t->getReceivePtr(&readPtr, &eodPtr);
1297  callbackObj->transporter_recv_from(nodeId);
1298  Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
1299  t->updateReceivePtr(newPtr);
1300  }
1301  }
1302  }
1303 #endif
1304 #ifdef NDB_SHM_TRANSPORTER
1305  for (int i=0; i<nSHMTransporters; i++)
1306  {
1307  SHM_Transporter *t = theSHMTransporters[i];
1308  const NodeId nodeId = t->getRemoteNodeId();
1309  if(is_connected(nodeId)){
1310  if(t->isConnected() && t->checkConnected())
1311  {
1312  if (hasReceived)
1313  callbackObj->checkJobBuffer();
1314  hasReceived = true;
1315  Uint32 * readPtr, * eodPtr;
1316  t->getReceivePtr(&readPtr, &eodPtr);
1317  callbackObj->transporter_recv_from(nodeId);
1318  Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
1319  t->updateReceivePtr(newPtr);
1320  }
1321  }
1322  }
1323 #endif
1324 }
1325 
1330 int
1332 {
1333  Transporter *t = get_transporter(nodeId);
1334  if (t && t->isConnected() && is_connected(nodeId))
1335  {
1336  return t->doSend();
1337  }
1338 
1339  return 0;
1340 }
1341 
1342 void
1343 TransporterRegistry::consume_extra_sockets()
1344 {
1345  char buf[4096];
1346  ssize_t ret;
1347  int err;
1348  NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
1349  do
1350  {
1351  ret = my_recv(sock, buf, sizeof(buf), 0);
1352  err = my_socket_errno();
1353  } while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
1354 }
1355 
1356 void
1358 {
1359  int i;
1360  sendCounter = 1;
1361 
1362 #ifdef NDB_TCP_TRANSPORTER
1363  for (i = m_transp_count; i < nTCPTransporters; i++)
1364  {
1365  TCP_Transporter *t = theTCPTransporters[i];
1366  if (t && t->has_data_to_send() &&
1367  t->isConnected() && is_connected(t->getRemoteNodeId()))
1368  {
1369  t->doSend();
1370  }
1371  }
1372  for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
1373  {
1374  TCP_Transporter *t = theTCPTransporters[i];
1375  if (t && t->has_data_to_send() &&
1376  t->isConnected() && is_connected(t->getRemoteNodeId()))
1377  {
1378  t->doSend();
1379  }
1380  }
1381  m_transp_count++;
1382  if (m_transp_count == nTCPTransporters) m_transp_count = 0;
1383 #endif
1384 #ifdef NDB_SCI_TRANSPORTER
1385  //scroll through the SCI transporters,
1386  // get each transporter, check if connected, send data
1387  for (i=0; i<nSCITransporters; i++) {
1388  SCI_Transporter *t = theSCITransporters[i];
1389  const NodeId nodeId = t->getRemoteNodeId();
1390 
1391  if(is_connected(nodeId))
1392  {
1393  if(t->isConnected() && t->has_data_to_send())
1394  {
1395  t->doSend();
1396  } //if
1397  } //if
1398  }
1399 #endif
1400 
1401 #ifdef NDB_SHM_TRANSPORTER
1402  for (i=0; i<nSHMTransporters; i++)
1403  {
1404  SHM_Transporter *t = theSHMTransporters[i];
1405  const NodeId nodeId = t->getRemoteNodeId();
1406  if(is_connected(nodeId))
1407  {
1408  if(t->isConnected())
1409  {
1410  t->doSend();
1411  }
1412  }
1413  }
1414 #endif
1415 }
1416 
1417 int
1419  int tSendCounter = sendCounter;
1420  sendCounter = tSendCounter + 1;
1421  if (tSendCounter >= sendLimit) {
1422  performSend();
1423  sendCounter = 1;
1424  return 1;
1425  }//if
1426  return 0;
1427 }//TransporterRegistry::forceSendCheck()
1428 
1429 #ifdef DEBUG_TRANSPORTER
1430 void
1431 TransporterRegistry::printState(){
1432  ndbout << "-- TransporterRegistry -- " << endl << endl
1433  << "Transporters = " << nTransporters << endl;
1434  for(int i = 0; i<maxTransporters; i++)
1435  if(theTransporters[i] != NULL){
1436  const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
1437  ndbout << "Transporter: " << remoteNodeId
1438  << " PerformState: " << performStates[remoteNodeId]
1439  << " IOState: " << ioStates[remoteNodeId] << endl;
1440  }
1441 }
1442 #endif
1443 
1444 #ifdef ERROR_INSERT
1445 bool
1446 TransporterRegistry::isBlocked(NodeId nodeId)
1447 {
1448  return m_blocked.get(nodeId);
1449 }
1450 
1451 void
1452 TransporterRegistry::blockReceive(NodeId nodeId)
1453 {
1454  /* Check that node is not already blocked?
1455  * Stop pulling from its socket (but track received data etc)
1456  */
1457  /* Shouldn't already be blocked with data */
1458  assert(!m_blocked.get(nodeId));
1459 
1460  m_blocked.set(nodeId);
1461 
1462  if (m_has_data_transporters.get(nodeId))
1463  {
1464  assert(!m_blocked_with_data.get(nodeId));
1465  m_blocked_with_data.set(nodeId);
1466  m_has_data_transporters.clear(nodeId);
1467  }
1468 }
1469 
1470 void
1471 TransporterRegistry::unblockReceive(NodeId nodeId)
1472 {
1473  /* Check that node is blocked?
1474  * Resume pulling from its socket
1475  * Ensure in-flight data is processed if there was some
1476  */
1477  assert(m_blocked.get(nodeId));
1478  assert(!m_has_data_transporters.get(nodeId));
1479 
1480  m_blocked.clear(nodeId);
1481 
1482  if (m_blocked_with_data.get(nodeId))
1483  {
1484  m_has_data_transporters.set(nodeId);
1485  }
1486 
1487  if (m_blocked_disconnected.get(nodeId))
1488  {
1489  /* Process disconnect notification/handling now */
1490  m_blocked_disconnected.clear(nodeId);
1491 
1492  report_disconnect(nodeId, m_disconnect_errors[nodeId]);
1493  }
1494 }
1495 #endif
1496 
1497 IOState
1499  return ioStates[nodeId];
1500 }
1501 
1502 void
1503 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
1504  if (ioStates[nodeId] == state)
1505  return;
1506 
1507  DEBUG("TransporterRegistry::setIOState("
1508  << nodeId << ", " << state << ")");
1509 
1510  ioStates[nodeId] = state;
1511 }
1512 
1513 extern "C" void *
1514 run_start_clients_C(void * me)
1515 {
1516  ((TransporterRegistry*) me)->start_clients_thread();
1517  return 0;
1518 }
1519 
1526 void
1528 {
1529  PerformState &curr_state = performStates[node_id];
1530  switch(curr_state){
1531  case DISCONNECTED:
1532  break;
1533  case CONNECTED:
1534  return;
1535  case CONNECTING:
1536  return;
1537  case DISCONNECTING:
1538  break;
1539  }
1540  DBUG_ENTER("TransporterRegistry::do_connect");
1541  DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
1542 
1543  /*
1544  No one else should be using the transporter now, reset
1545  its send buffer
1546  */
1547  callbackObj->reset_send_buffer(node_id);
1548 
1549  curr_state= CONNECTING;
1550  DBUG_VOID_RETURN;
1551 }
1552 
1559 void
1560 TransporterRegistry::do_disconnect(NodeId node_id, int errnum)
1561 {
1562  PerformState &curr_state = performStates[node_id];
1563  switch(curr_state){
1564  case DISCONNECTED:
1565  return;
1566  case CONNECTED:
1567  break;
1568  case CONNECTING:
1569  break;
1570  case DISCONNECTING:
1571  return;
1572  }
1573  DBUG_ENTER("TransporterRegistry::do_disconnect");
1574  DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1575  curr_state= DISCONNECTING;
1576  m_disconnect_errnum[node_id] = errnum;
1577  DBUG_VOID_RETURN;
1578 }
1579 
1580 void
1581 TransporterRegistry::report_connect(NodeId node_id)
1582 {
1583  DBUG_ENTER("TransporterRegistry::report_connect");
1584  DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1585 
1586  /*
1587  The send buffers was reset when this connection
1588  was set to CONNECTING. In order to make sure no stray
1589  signals has been written to the send buffer since then
1590  call 'reset_send_buffer' with the "should_be_empty" flag
1591  set
1592  */
1593  callbackObj->reset_send_buffer(node_id, true);
1594 
1595  performStates[node_id] = CONNECTED;
1596 #if defined(HAVE_EPOLL_CREATE)
1597  if (likely(m_epoll_fd != -1))
1598  {
1599  if (change_epoll((TCP_Transporter*)theTransporters[node_id],
1600  TRUE))
1601  {
1602  performStates[node_id] = DISCONNECTING;
1603  DBUG_VOID_RETURN;
1604  }
1605  }
1606 #endif
1607  callbackObj->reportConnect(node_id);
1608  DBUG_VOID_RETURN;
1609 }
1610 
1611 void
1612 TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
1613 {
1614  DBUG_ENTER("TransporterRegistry::report_disconnect");
1615  DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1616 
1617 #ifdef ERROR_INSERT
1618  if (m_blocked.get(node_id))
1619  {
1620  /* We are simulating real latency, so control events experience
1621  * it too
1622  */
1623  m_blocked_disconnected.set(node_id);
1624  m_disconnect_errors[node_id] = errnum;
1625  DBUG_VOID_RETURN;
1626  }
1627 #endif
1628 
1629  performStates[node_id] = DISCONNECTED;
1630  m_has_data_transporters.clear(node_id);
1631  callbackObj->reportDisconnect(node_id, errnum);
1632  DBUG_VOID_RETURN;
1633 }
1634 
1642 void
1643 TransporterRegistry::report_error(NodeId nodeId, TransporterError errorCode,
1644  const char *errorInfo)
1645 {
1646  if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
1647  m_error_states[nodeId].m_info == (const char *)~(UintPtr)0)
1648  {
1649  m_error_states[nodeId].m_code = errorCode;
1650  m_error_states[nodeId].m_info = errorInfo;
1651  }
1652 }
1653 
1659 void
1661 {
1662  for (int i= 0, n= 0; n < nTransporters; i++){
1663  Transporter * t = theTransporters[i];
1664  if (!t)
1665  continue;
1666  n++;
1667 
1668  const NodeId nodeId = t->getRemoteNodeId();
1669 
1670  TransporterError code = m_error_states[nodeId].m_code;
1671  const char *info = m_error_states[nodeId].m_info;
1672  if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
1673  {
1674  callbackObj->reportError(nodeId, code, info);
1675  m_error_states[nodeId].m_code = TE_NO_ERROR;
1676  m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
1677  }
1678 
1679  switch(performStates[nodeId]){
1680  case CONNECTED:
1681  case DISCONNECTED:
1682  break;
1683  case CONNECTING:
1684  if(t->isConnected())
1685  report_connect(nodeId);
1686  break;
1687  case DISCONNECTING:
1688  if(!t->isConnected())
1689  report_disconnect(nodeId, m_disconnect_errnum[nodeId]);
1690  break;
1691  }
1692  }
1693 }
1694 
1695 // run as own thread
1696 void
1698 {
1699  int persist_mgm_count= 0;
1700  DBUG_ENTER("TransporterRegistry::start_clients_thread");
1701  while (m_run_start_clients_thread) {
1702  NdbSleep_MilliSleep(100);
1703  persist_mgm_count++;
1704  if(persist_mgm_count==50)
1705  {
1706  ndb_mgm_check_connection(m_mgm_handle);
1707  persist_mgm_count= 0;
1708  }
1709  for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
1710  Transporter * t = theTransporters[i];
1711  if (!t)
1712  continue;
1713  n++;
1714 
1715  const NodeId nodeId = t->getRemoteNodeId();
1716  switch(performStates[nodeId]){
1717  case CONNECTING:
1718  if(!t->isConnected() && !t->isServer) {
1719  bool connected= false;
1724  if (t->get_s_port())
1725  {
1726  DBUG_PRINT("info", ("connecting to node %d using port %d",
1727  nodeId, t->get_s_port()));
1728  connected= t->connect_client();
1729  }
1730 
1734  if( !connected && t->get_s_port() <= 0) { // Port is dynamic
1735  int server_port= 0;
1736  struct ndb_mgm_reply mgm_reply;
1737 
1738  DBUG_PRINT("info", ("connection to node %d should use "
1739  "dynamic port",
1740  nodeId));
1741 
1742  if(!ndb_mgm_is_connected(m_mgm_handle))
1743  ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
1744 
1745  if(ndb_mgm_is_connected(m_mgm_handle))
1746  {
1747  DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
1748  nodeId));
1749 
1750  int res=
1751  ndb_mgm_get_connection_int_parameter(m_mgm_handle,
1752  t->getRemoteNodeId(),
1753  t->getLocalNodeId(),
1754  CFG_CONNECTION_SERVER_PORT,
1755  &server_port,
1756  &mgm_reply);
1757  DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
1758  server_port,t->getRemoteNodeId(),
1759  t->getLocalNodeId(),res));
1760  if( res >= 0 )
1761  {
1762  DBUG_PRINT("info", ("got port %d to use for connection to %d",
1763  server_port, nodeId));
1768  if (server_port)
1769  t->set_s_port(server_port);
1770  }
1771  else if(ndb_mgm_is_connected(m_mgm_handle))
1772  {
1773  DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
1774  res));
1775  g_eventLogger->info("Failed to get dynamic port, res: %d",
1776  res);
1777  ndb_mgm_disconnect(m_mgm_handle);
1778  }
1779  else
1780  {
1781  DBUG_PRINT("info", ("mgmd close connection early"));
1782  g_eventLogger->info
1783  ("Management server closed connection early. "
1784  "It is probably being shut down (or has problems). "
1785  "We will retry the connection. %d %s %s line: %d",
1786  ndb_mgm_get_latest_error(m_mgm_handle),
1787  ndb_mgm_get_latest_error_desc(m_mgm_handle),
1788  ndb_mgm_get_latest_error_msg(m_mgm_handle),
1789  ndb_mgm_get_latest_error_line(m_mgm_handle)
1790  );
1791  }
1792  }
1801  }
1802  }
1803  break;
1804  case DISCONNECTING:
1805  if(t->isConnected())
1806  t->doDisconnect();
1807  break;
1808  case DISCONNECTED:
1809  {
1810  if (t->isConnected())
1811  {
1812  g_eventLogger->warning("Found connection to %u in state DISCONNECTED "
1813  " while being connected, disconnecting!",
1814  t->getRemoteNodeId());
1815  t->doDisconnect();
1816  }
1817  break;
1818  }
1819  default:
1820  break;
1821  }
1822  }
1823  }
1824  DBUG_VOID_RETURN;
1825 }
1826 
1827 struct NdbThread*
1828 TransporterRegistry::start_clients()
1829 {
1830  m_run_start_clients_thread= true;
1831  m_start_clients_thread= NdbThread_Create(run_start_clients_C,
1832  (void**)this,
1833  0, // default stack size
1834  "ndb_start_clients",
1835  NDB_THREAD_PRIO_LOW);
1836  if (m_start_clients_thread == 0)
1837  {
1838  m_run_start_clients_thread= false;
1839  }
1840  return m_start_clients_thread;
1841 }
1842 
1843 bool
1844 TransporterRegistry::stop_clients()
1845 {
1846  if (m_start_clients_thread) {
1847  m_run_start_clients_thread= false;
1848  void* status;
1849  NdbThread_WaitFor(m_start_clients_thread, &status);
1850  NdbThread_Destroy(&m_start_clients_thread);
1851  }
1852  return true;
1853 }
1854 
1855 void
1856 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
1857  const char *interf,
1858  int s_port)
1859 {
1860  DBUG_ENTER("TransporterRegistry::add_transporter_interface");
1861  DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
1862  if (interf && strlen(interf) == 0)
1863  interf= 0;
1864 
1865  for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1866  {
1867  Transporter_interface &tmp= m_transporter_interface[i];
1868  if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
1869  continue;
1870  if (interf != 0 && tmp.m_interface != 0 &&
1871  strcmp(interf, tmp.m_interface) == 0)
1872  {
1873  DBUG_VOID_RETURN; // found match, no need to insert
1874  }
1875  if (interf == 0 && tmp.m_interface == 0)
1876  {
1877  DBUG_VOID_RETURN; // found match, no need to insert
1878  }
1879  }
1880  Transporter_interface t;
1881  t.m_remote_nodeId= remoteNodeId;
1882  t.m_s_service_port= s_port;
1883  t.m_interface= interf;
1884  m_transporter_interface.push_back(t);
1885  DBUG_PRINT("exit",("interface and port added"));
1886  DBUG_VOID_RETURN;
1887 }
1888 
1889 bool
1890 TransporterRegistry::start_service(SocketServer& socket_server)
1891 {
1892  DBUG_ENTER("TransporterRegistry::start_service");
1893  if (m_transporter_interface.size() > 0 &&
1894  localNodeId == 0)
1895  {
1896  g_eventLogger->error("INTERNAL ERROR: not initialized");
1897  DBUG_RETURN(false);
1898  }
1899 
1900  for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1901  {
1902  Transporter_interface &t= m_transporter_interface[i];
1903 
1904  unsigned short port= (unsigned short)t.m_s_service_port;
1905  if(t.m_s_service_port<0)
1906  port= -t.m_s_service_port; // is a dynamic port
1907  TransporterService *transporter_service =
1908  new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
1909  if(!socket_server.setup(transporter_service,
1910  &port, t.m_interface))
1911  {
1912  DBUG_PRINT("info", ("Trying new port"));
1913  port= 0;
1914  if(t.m_s_service_port>0
1915  || !socket_server.setup(transporter_service,
1916  &port, t.m_interface))
1917  {
1918  /*
1919  * If it wasn't a dynamically allocated port, or
1920  * our attempts at getting a new dynamic port failed
1921  */
1922  g_eventLogger->error("Unable to setup transporter service port: %s:%d!\n"
1923  "Please check if the port is already used,\n"
1924  "(perhaps the node is already running)",
1925  t.m_interface ? t.m_interface : "*", t.m_s_service_port);
1926  delete transporter_service;
1927  DBUG_RETURN(false);
1928  }
1929  }
1930  t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
1931  DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
1932  transporter_service->setTransporterRegistry(this);
1933  }
1934  DBUG_RETURN(true);
1935 }
1936 
1937 #ifdef NDB_SHM_TRANSPORTER
1938 extern "C"
1939 RETSIGTYPE
1940 shm_sig_handler(int signo)
1941 {
1942  g_shm_counter++;
1943 }
1944 #endif
1945 
1946 void
1948 {
1949  DBUG_ENTER("TransporterRegistry::startReceiving");
1950 
1951 #ifdef NDB_SHM_TRANSPORTER
1952  m_shm_own_pid = getpid();
1953  if (g_ndb_shm_signum)
1954  {
1955  DBUG_PRINT("info",("Install signal handler for signum %d",
1956  g_ndb_shm_signum));
1957  struct sigaction sa;
1958  NdbThread_set_shm_sigmask(FALSE);
1959  sigemptyset(&sa.sa_mask);
1960  sa.sa_handler = shm_sig_handler;
1961  sa.sa_flags = 0;
1962  int ret;
1963  while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR)
1964  ;
1965  if(ret != 0)
1966  {
1967  DBUG_PRINT("error",("Install failed"));
1968  g_eventLogger->error("Failed to install signal handler for"
1969  " SHM transporter, signum %d, errno: %d (%s)",
1970  g_ndb_shm_signum, errno, strerror(errno));
1971  }
1972  }
1973 #endif // NDB_SHM_TRANSPORTER
1974  DBUG_VOID_RETURN;
1975 }
1976 
1977 void
1984  disconnectAll();
1985 }
1986 
1987 void
1989 }
1990 
1991 void
1992 TransporterRegistry::stopSending(){
1993 }
1994 
1995 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
1996  out << "-- Signal Header --" << endl;
1997  out << "theLength: " << sh.theLength << endl;
1998  out << "gsn: " << sh.theVerId_signalNumber << endl;
1999  out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
2000  out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
2001  out << "sendersSig: " << sh.theSendersSignalId << endl;
2002  out << "theSignalId: " << sh.theSignalId << endl;
2003  out << "trace: " << (int)sh.theTrace << endl;
2004  return out;
2005 }
2006 
2007 Transporter*
2008 TransporterRegistry::get_transporter(NodeId nodeId) {
2009  assert(nodeId < maxTransporters);
2010  return theTransporters[nodeId];
2011 }
2012 
2013 
2014 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
2015 {
2016  DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
2017 
2018  Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
2019 
2020  if(!mgm_nodeid)
2021  {
2022  g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2023  return false;
2024  }
2025  Transporter * t = theTransporters[mgm_nodeid];
2026  if (!t)
2027  {
2028  g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2029  return false;
2030  }
2031 
2032  bool res = t->connect_client(connect_ndb_mgmd(h));
2033  if (res == true)
2034  {
2035  performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
2036  }
2037  DBUG_RETURN(res);
2038 }
2039 
2040 
2041 
2047 {
2048  struct ndb_mgm_reply mgm_reply;
2049  NDB_SOCKET_TYPE sockfd;
2050  my_socket_invalidate(&sockfd);
2051 
2052  DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
2053 
2054  if ( h==NULL || *h == NULL )
2055  {
2056  g_eventLogger->error("Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
2057  DBUG_RETURN(sockfd);
2058  }
2059 
2060  for(unsigned int i=0;i < m_transporter_interface.size();i++)
2061  {
2062  if (m_transporter_interface[i].m_s_service_port >= 0)
2063  continue;
2064 
2065  DBUG_PRINT("info", ("Setting dynamic port %d for connection from node %d",
2066  m_transporter_interface[i].m_s_service_port,
2067  m_transporter_interface[i].m_remote_nodeId));
2068 
2069  if (ndb_mgm_set_connection_int_parameter(*h,
2070  localNodeId,
2071  m_transporter_interface[i].m_remote_nodeId,
2072  CFG_CONNECTION_SERVER_PORT,
2073  m_transporter_interface[i].m_s_service_port,
2074  &mgm_reply) < 0)
2075  {
2076  g_eventLogger->error("Could not set dynamic port for %d->%d (%s:%d)",
2077  localNodeId,
2078  m_transporter_interface[i].m_remote_nodeId,
2079  __FILE__, __LINE__);
2081  DBUG_RETURN(sockfd);
2082  }
2083  }
2084 
2089  DBUG_PRINT("info", ("Converting handle to transporter"));
2090  sockfd= ndb_mgm_convert_to_transporter(h);
2091  if (!my_socket_valid(sockfd))
2092  {
2093  g_eventLogger->error("Failed to convert to transporter (%s: %d)",
2094  __FILE__, __LINE__);
2096  }
2097  DBUG_RETURN(sockfd);
2098 }
2099 
2105 {
2107  NDB_SOCKET_TYPE s;
2108  my_socket_invalidate(&s);
2109 
2110  DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
2111 
2112  if ( h == NULL )
2113  {
2114  DBUG_RETURN(s);
2115  }
2116 
2120  {
2121  BaseString cs;
2122  cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
2124  }
2125 
2126  if(ndb_mgm_connect(h, 0, 0, 0)<0)
2127  {
2128  DBUG_PRINT("info", ("connection to mgmd failed"));
2130  DBUG_RETURN(s);
2131  }
2132 
2133  DBUG_RETURN(connect_ndb_mgmd(&h));
2134 }
2135 
2140 Uint32 *
2141 TransporterRegistry::getWritePtr(TransporterSendBufferHandle *handle,
2142  NodeId node, Uint32 lenBytes, Uint32 prio)
2143 {
2144  Transporter *t = theTransporters[node];
2145  Uint32 *insertPtr = handle->getWritePtr(node, lenBytes, prio,
2146  t->get_max_send_buffer());
2147 
2148  if (insertPtr == 0) {
2149  //-------------------------------------------------
2150  // Buffer was completely full. We have severe problems.
2151  // We will attempt to wait for a small time
2152  //-------------------------------------------------
2153  if(t->send_is_possible(10)) {
2154  //-------------------------------------------------
2155  // Send is possible after the small timeout.
2156  //-------------------------------------------------
2157  if(!handle->forceSend(node)){
2158  return 0;
2159  } else {
2160  //-------------------------------------------------
2161  // Since send was successful we will make a renewed
2162  // attempt at inserting the signal into the buffer.
2163  //-------------------------------------------------
2164  insertPtr = handle->getWritePtr(node, lenBytes, prio,
2165  t->get_max_send_buffer());
2166  }//if
2167  } else {
2168  return 0;
2169  }//if
2170  }
2171  return insertPtr;
2172 }
2173 
2174 void
2175 TransporterRegistry::updateWritePtr(TransporterSendBufferHandle *handle,
2176  NodeId node, Uint32 lenBytes, Uint32 prio)
2177 {
2178  Transporter *t = theTransporters[node];
2179 
2180  Uint32 used = handle->updateWritePtr(node, lenBytes, prio);
2181  t->update_status_overloaded(used);
2182 
2183  if(t->send_limit_reached(used)) {
2184  //-------------------------------------------------
2185  // Buffer is full and we are ready to send. We will
2186  // not wait since the signal is already in the buffer.
2187  // Force flag set has the same indication that we
2188  // should always send. If it is not possible to send
2189  // we will not worry since we will soon be back for
2190  // a renewed trial.
2191  //-------------------------------------------------
2192  if(t->send_is_possible(0)) {
2193  //-------------------------------------------------
2194  // Send was possible, attempt at a send.
2195  //-------------------------------------------------
2196  handle->forceSend(node);
2197  }//if
2198  }
2199 }
2200 
2201 Uint32
2202 TransporterRegistry::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
2203  Uint32 max)
2204 {
2205  assert(m_use_default_send_buffer);
2206 
2207  if (max == 0)
2208  return 0;
2209 
2210  Uint32 count = 0;
2211  SendBuffer *b = m_send_buffers + node;
2212  SendBufferPage *page = b->m_first_page;
2213  while (page != NULL && count < max)
2214  {
2215  dst[count].iov_base = page->m_data+page->m_start;
2216  dst[count].iov_len = page->m_bytes;
2217  assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2218  page = page->m_next;
2219  count++;
2220  }
2221 
2222  return count;
2223 }
2224 
2225 Uint32
2226 TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
2227 {
2228  assert(m_use_default_send_buffer);
2229 
2230  SendBuffer *b = m_send_buffers + node;
2231  Uint32 used_bytes = b->m_used_bytes;
2232 
2233  if (bytes == 0)
2234  return used_bytes;
2235 
2236  used_bytes -= bytes;
2237  b->m_used_bytes = used_bytes;
2238 
2239  SendBufferPage *page = b->m_first_page;
2240  while (bytes && bytes >= page->m_bytes)
2241  {
2242  SendBufferPage * tmp = page;
2243  bytes -= page->m_bytes;
2244  page = page->m_next;
2245  release_page(tmp);
2246  }
2247 
2248  if (used_bytes == 0)
2249  {
2250  b->m_first_page = 0;
2251  b->m_last_page = 0;
2252  }
2253  else
2254  {
2255  page->m_start += bytes;
2256  page->m_bytes -= bytes;
2257  assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2258  b->m_first_page = page;
2259  }
2260 
2261  return used_bytes;
2262 }
2263 
2264 bool
2265 TransporterRegistry::has_data_to_send(NodeId node)
2266 {
2267  assert(m_use_default_send_buffer);
2268 
2269  SendBuffer *b = m_send_buffers + node;
2270  return (b->m_first_page != NULL && b->m_first_page->m_bytes);
2271 }
2272 
2273 void
2274 TransporterRegistry::reset_send_buffer(NodeId node, bool should_be_empty)
2275 {
2276  assert(m_use_default_send_buffer);
2277 
2278  // Make sure that buffer is already empty if the "should_be_empty"
2279  // flag is set. This is done to quickly catch any stray signals
2280  // written to the send buffer while not being connected
2281  if (should_be_empty && !has_data_to_send(node))
2282  return;
2283  assert(!should_be_empty);
2284 
2285  SendBuffer *b = m_send_buffers + node;
2286  SendBufferPage *page = b->m_first_page;
2287  while (page != NULL)
2288  {
2289  SendBufferPage *next = page->m_next;
2290  release_page(page);
2291  page = next;
2292  }
2293  b->m_first_page = NULL;
2294  b->m_last_page = NULL;
2295  b->m_used_bytes = 0;
2296 }
2297 
2298 TransporterRegistry::SendBufferPage *
2299 TransporterRegistry::alloc_page()
2300 {
2301  SendBufferPage *page = m_page_freelist;
2302  if (page != NULL)
2303  {
2304  m_page_freelist = page->m_next;
2305  return page;
2306  }
2307 
2308  ndbout << "ERROR: out of send buffers in kernel." << endl;
2309  return NULL;
2310 }
2311 
2312 void
2313 TransporterRegistry::release_page(SendBufferPage *page)
2314 {
2315  assert(page != NULL);
2316  page->m_next = m_page_freelist;
2317  m_page_freelist = page;
2318 }
2319 
2320 Uint32 *
2321 TransporterRegistry::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
2322  Uint32 max_use)
2323 {
2324  assert(m_use_default_send_buffer);
2325 
2326  SendBuffer *b = m_send_buffers + node;
2327 
2328  /* First check if we have room in already allocated page. */
2329  SendBufferPage *page = b->m_last_page;
2330  if (page != NULL && page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
2331  {
2332  return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
2333  }
2334 
2335  if (b->m_used_bytes + lenBytes > max_use)
2336  return NULL;
2337 
2338  /* Allocate a new page. */
2339  page = alloc_page();
2340  if (page == NULL)
2341  return NULL;
2342  page->m_next = NULL;
2343  page->m_bytes = 0;
2344  page->m_start = 0;
2345 
2346  if (b->m_last_page == NULL)
2347  {
2348  b->m_first_page = page;
2349  b->m_last_page = page;
2350  }
2351  else
2352  {
2353  assert(b->m_first_page != NULL);
2354  b->m_last_page->m_next = page;
2355  b->m_last_page = page;
2356  }
2357  return (Uint32 *)(page->m_data);
2358 }
2359 
2360 Uint32
2361 TransporterRegistry::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2362 {
2363  assert(m_use_default_send_buffer);
2364 
2365  SendBuffer *b = m_send_buffers + node;
2366  SendBufferPage *page = b->m_last_page;
2367  assert(page != NULL);
2368  assert(page->m_bytes + lenBytes <= page->max_data_bytes());
2369  page->m_bytes += lenBytes;
2370  b->m_used_bytes += lenBytes;
2371  return b->m_used_bytes;
2372 }
2373 
2374 bool
2375 TransporterRegistry::forceSend(NodeId node)
2376 {
2377  Transporter *t = get_transporter(node);
2378  if (t)
2379  return t->doSend();
2380  else
2381  return false;
2382 }
2383 
2384 
2385 void
2386 TransporterRegistry::print_transporters(const char* where, NdbOut& out)
2387 {
2388  out << where << " >>" << endl;
2389 
2390  for(unsigned i = 0; i < maxTransporters; i++){
2391  if(theTransporters[i] == NULL)
2392  continue;
2393 
2394  const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
2395 
2396  out << i << " "
2397  << getPerformStateString(remoteNodeId) << " to node: "
2398  << remoteNodeId << " at "
2399  << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
2400  }
2401 
2402  out << "<<" << endl;
2403 
2404  for (size_t i= 0; i < m_transporter_interface.size(); i++){
2405  Transporter_interface tf= m_transporter_interface[i];
2406 
2407  out << i
2408  << " remote node: " << tf.m_remote_nodeId
2409  << " port: " << tf.m_s_service_port
2410  << " interface: " << tf.m_interface << endl;
2411  }
2412 }
2413 
2414