MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TransporterRegistry.hpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 //****************************************************************************
19 //
20 // NAME
21 // TransporterRegistry
22 //
23 // DESCRIPTION
24 // TransporterRegistry (singelton) is the interface to the
25 // transporter layer. It handles transporter states and
26 // holds the transporter arrays.
27 //
28 //***************************************************************************/
29 #ifndef TransporterRegistry_H
30 #define TransporterRegistry_H
31 
32 #if defined(HAVE_EPOLL_CREATE)
33 #include <sys/epoll.h>
34 #endif
35 #include "TransporterDefinitions.hpp"
36 #include "TransporterCallback.hpp"
37 #include <SocketServer.hpp>
38 #include <SocketClient.hpp>
39 
40 #include <NdbTCP.h>
41 
42 #include <mgmapi/mgmapi.h>
43 
44 #include <NodeBitmask.hpp>
45 
46 // A transporter is always in an IOState.
47 // NoHalt is used initially and as long as it is no restrictions on
48 // sending or receiving.
49 enum IOState {
50  NoHalt = 0,
51  HaltInput = 1,
52  HaltOutput = 2,
53  HaltIO = 3
54 };
55 
56 
57 static const char *performStateString[] =
58  { "is connected",
59  "is trying to connect",
60  "does nothing",
61  "is trying to disconnect" };
62 
63 class Transporter;
64 class TCP_Transporter;
65 class SCI_Transporter;
66 class SHM_Transporter;
67 
70 
72  SocketAuthenticator * m_auth;
73  TransporterRegistry * m_transporter_registry;
74 public:
76  {
77  m_auth= auth;
78  m_transporter_registry= 0;
79  }
80  void setTransporterRegistry(TransporterRegistry *t)
81  {
82  m_transporter_registry= t;
83  }
84  SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
85 };
86 
92  friend class SHM_Transporter;
93  friend class SHM_Writer;
94  friend class Transporter;
95  friend class TransporterService;
96 public:
101  bool use_default_send_buffer = true,
102  unsigned maxTransporters = MAX_NTRANSPORTERS,
103  unsigned sizeOfLongSignalMemory = 100);
104 
111  NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; };
112 
113  bool init(NodeId localNodeId);
114 
121  bool connect_server(NDB_SOCKET_TYPE sockfd, BaseString& errormsg) const;
122 
123  bool connect_client(NdbMgmHandle *h);
124 
129  NDB_SOCKET_TYPE connect_ndb_mgmd(SocketClient *sc);
130 
135  NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h);
136 
140  void removeAll();
141 
145  void disconnectAll();
146 
151  virtual ~TransporterRegistry();
152 
153  bool start_service(SocketServer& server);
154  struct NdbThread* start_clients();
155  bool stop_clients();
156  void start_clients_thread();
157  void update_connections();
158 
162  void startReceiving();
163  void stopReceiving();
164 
168  void startSending();
169  void stopSending();
170 
171  // A transporter is always in a PerformState.
172  // PerformIO is used initially and as long as any of the events
173  // PerformConnect, ...
174  enum PerformState {
175  CONNECTED = 0,
176  CONNECTING = 1,
177  DISCONNECTED = 2,
178  DISCONNECTING = 3
179  };
180  const char *getPerformStateString(NodeId nodeId) const
181  { return performStateString[(unsigned)performStates[nodeId]]; };
182 
183  PerformState getPerformState(NodeId nodeId) const { return performStates[nodeId]; }
184 
188  void do_connect(NodeId node_id);
189  void do_disconnect(NodeId node_id, int errnum = 0);
190  bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
191  void report_connect(NodeId node_id);
192  void report_disconnect(NodeId node_id, int errnum);
193  void report_error(NodeId nodeId, TransporterError errorCode,
194  const char *errorInfo = 0);
195 
199  IOState ioState(NodeId nodeId);
200  void setIOState(NodeId nodeId, IOState state);
201 
202 private:
203 
204  bool createTCPTransporter(TransporterConfiguration * config);
205  bool createSCITransporter(TransporterConfiguration * config);
206  bool createSHMTransporter(TransporterConfiguration * config);
207 
208 public:
217 
227  void allocate_send_buffers(Uint64 total_send_buffer);
228 
236  Uint64 get_total_max_send_buffer() { return m_total_max_send_buffer; }
237 
238  bool get_using_default_send_buffer() const{ return m_use_default_send_buffer;}
239 
244  void set_status_overloaded(Uint32 nodeId, bool val);
245  const NodeBitmask& get_status_overloaded() const;
246 
258  SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
259  const SignalHeader * const signalHeader, Uint8 prio,
260  const Uint32 * const signalData,
261  NodeId nodeId,
262  const LinearSectionPtr ptr[3]);
263 
264  SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
265  const SignalHeader * const signalHeader, Uint8 prio,
266  const Uint32 * const signalData,
267  NodeId nodeId,
268  class SectionSegmentPool & pool,
269  const SegmentedSectionPtr ptr[3]);
270  SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
271  const SignalHeader * const signalHeader, Uint8 prio,
272  const Uint32 * const signalData,
273  NodeId nodeId,
274  const GenericSectionPtr ptr[3]);
278  SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
279  const Uint32 * const signalData,
280  NodeId nodeId,
281  const LinearSectionPtr ptr[3])
282  {
283  return prepareSend(this, signalHeader, prio, signalData, nodeId, ptr);
284  }
285  SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
286  const Uint32 * const signalData,
287  NodeId nodeId,
288  class SectionSegmentPool & pool,
289  const SegmentedSectionPtr ptr[3])
290  {
291  return prepareSend(this, signalHeader, prio, signalData, nodeId, pool, ptr);
292  }
293  SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
294  const Uint32 * const signalData,
295  NodeId nodeId,
296  const GenericSectionPtr ptr[3])
297  {
298  return prepareSend(this, signalHeader, prio, signalData, nodeId, ptr);
299  }
300 
307  void external_IO(Uint32 timeOutMillis);
308 
309  inline Uint32 pollReceive(Uint32 timeOutMillis) {
310  return pollReceive(timeOutMillis, m_has_data_transporters);
311  }
312  Uint32 pollReceive(Uint32 timeOutMillis, NodeBitmask& mask);
313  void performReceive();
314  int performSend(NodeId nodeId);
315  void performSend();
316 
322  int forceSendCheck(int sendLimit);
323 
324 #ifdef DEBUG_TRANSPORTER
325  void printState();
326 #endif
327 
328 #ifdef ERROR_INSERT
329  /* Utils for testing latency issues */
330  bool isBlocked(NodeId nodeId);
331  void blockReceive(NodeId nodeId);
332  void unblockReceive(NodeId nodeId);
333 #endif
334 
336  public:
337  NodeId m_remote_nodeId;
338  int m_s_service_port; // signed port number
339  const char *m_interface;
340  };
341  Vector<Transporter_interface> m_transporter_interface;
342  void add_transporter_interface(NodeId remoteNodeId, const char *interf,
343  int s_port); // signed port. <0 is dynamic
344  Transporter* get_transporter(NodeId nodeId);
345  struct in_addr get_connect_address(NodeId node_id) const;
346 protected:
347 
348 private:
349  TransporterCallback *callbackObj;
350 
351  NdbMgmHandle m_mgm_handle;
352 
353  struct NdbThread *m_start_clients_thread;
354  bool m_run_start_clients_thread;
355 
356  int sendCounter;
357  NodeId localNodeId;
358  unsigned maxTransporters;
359  int nTransporters;
360  int nTCPTransporters;
361  int nSCITransporters;
362  int nSHMTransporters;
363 
364 #ifdef ERROR_INSERT
365  Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
366  Bitmask<MAX_NTRANSPORTERS/32> m_blocked_with_data;
367  Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
368  int m_disconnect_errors[MAX_NTRANSPORTERS];
369 #endif
370 
375  NodeBitmask m_has_data_transporters;
376 #if defined(HAVE_EPOLL_CREATE)
377  int m_epoll_fd;
378  struct epoll_event *m_epoll_events;
379  bool change_epoll(TCP_Transporter *t, bool add);
380 #endif
381 
384  TCP_Transporter** theTCPTransporters;
385  SCI_Transporter** theSCITransporters;
386  SHM_Transporter** theSHMTransporters;
387 
391  TransporterType* theTransporterTypes;
392  Transporter** theTransporters;
393 
397  PerformState* performStates;
398  int* m_disconnect_errnum;
399  IOState* ioStates;
400  struct ErrorState {
401  TransporterError m_code;
402  const char *m_info;
403  };
404  struct ErrorState *m_error_states;
405 
409  NodeBitmask m_status_overloaded;
410 
416  Uint32 unpack(Uint32 * readPtr,
417  Uint32 bufferSize,
418  NodeId remoteNodeId,
419  IOState state);
420 
421  Uint32 * unpack(Uint32 * readPtr,
422  Uint32 * eodPtr,
423  NodeId remoteNodeId,
424  IOState state);
425 
426  static Uint32 unpack_length_words(const Uint32 *readPtr, Uint32 maxWords);
433  void removeTransporter(NodeId nodeId);
434 
438  int tcpReadSelectReply;
439  ndb_socket_poller m_socket_poller;
440 
441  Uint32 poll_TCP(Uint32 timeOutMillis, NodeBitmask&);
442  Uint32 poll_SCI(Uint32 timeOutMillis, NodeBitmask&);
443  Uint32 poll_SHM(Uint32 timeOutMillis, NodeBitmask&);
444 
445  int m_shm_own_pid;
446  int m_transp_count;
447 
448 public:
449  bool setup_wakeup_socket();
450  void wakeup();
451 private:
452  bool m_has_extra_wakeup_socket;
453  NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
454  void consume_extra_sockets();
455 
456 
457  Uint32 *getWritePtr(TransporterSendBufferHandle *handle,
458  NodeId node, Uint32 lenBytes, Uint32 prio);
459  void updateWritePtr(TransporterSendBufferHandle *handle,
460  NodeId node, Uint32 lenBytes, Uint32 prio);
461 
468  virtual Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
469  Uint32 max_use);
470  virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
471  virtual bool forceSend(NodeId node);
472 
473 private:
474  /* Send buffer pages. */
475  struct SendBufferPage {
476  /* This is the number of words that will fit in one page of send buffer. */
477  static const Uint32 PGSIZE = 32768;
478  static Uint32 max_data_bytes()
479  {
480  return PGSIZE - offsetof(SendBufferPage, m_data);
481  }
482 
483  /* Send buffer for one transporter is kept in a single-linked list. */
484  struct SendBufferPage *m_next;
485 
486  /* Bytes of send data available in this page. */
487  Uint16 m_bytes;
488  /* Start of unsent data */
489  Uint16 m_start;
490 
491  /* Data; real size is to the end of one page. */
492  char m_data[2];
493  };
494 
495  /* Send buffer for one transporter. */
496  struct SendBuffer {
497  /* Total size of data in buffer, from m_offset_start_data to end. */
498  Uint32 m_used_bytes;
499  /* Linked list of active buffer pages with first and last pointer. */
500  SendBufferPage *m_first_page;
501  SendBufferPage *m_last_page;
502  };
503 
504  SendBufferPage *alloc_page();
505  void release_page(SendBufferPage *page);
506 
507 private:
508  /* True if we are using the default send buffer implementation. */
509  bool m_use_default_send_buffer;
510  /* Send buffers. */
511  SendBuffer *m_send_buffers;
512  /* Linked list of free pages. */
513  SendBufferPage *m_page_freelist;
514  /* Original block of memory for pages (so we can free it at exit). */
515  unsigned char *m_send_buffer_memory;
520  Uint64 m_total_max_send_buffer;
521 
522 public:
523  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
524  Uint32 bytes_sent(NodeId node, Uint32 bytes);
525  bool has_data_to_send(NodeId node);
526 
527  void reset_send_buffer(NodeId node, bool should_be_empty);
528 
529  void print_transporters(const char* where, NdbOut& out = ndbout);
530 
531 };
532 
533 inline void
535 {
536  assert(nodeId < MAX_NODES);
537  m_status_overloaded.set(nodeId, val);
538 }
539 
540 inline const NodeBitmask&
541 TransporterRegistry::get_status_overloaded() const
542 {
543  return m_status_overloaded;
544 }
545 
546 #endif // Define of TransporterRegistry_H