MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TransporterFacade.hpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #ifndef TransporterFacade_H
19 #define TransporterFacade_H
20 
21 #include <kernel_types.h>
22 #include <ndb_limits.h>
23 #include <NdbThread.h>
24 #include <TransporterRegistry.hpp>
25 #include <NdbMutex.h>
26 #include "DictCache.hpp"
27 #include <BlockNumbers.h>
28 #include <mgmapi.h>
29 
30 class ClusterMgr;
31 class ArbitMgr;
33 
34 class Ndb;
35 class NdbApiSignal;
36 class NdbWaiter;
37 class trp_client;
38 
39 extern "C" {
40  void* runSendRequest_C(void*);
41  void* runReceiveResponse_C(void*);
42 }
43 
45 {
46 public:
51  STATIC_CONST( MAX_NO_THREADS = 4711 );
53  virtual ~TransporterFacade();
54 
55  int start_instance(NodeId, const ndb_mgm_configuration*);
56  void stop_instance();
57 
58  /*
59  (Re)configure the TransporterFacade
60  to a specific configuration
61  */
62  bool configure(NodeId, const ndb_mgm_configuration *);
63 
69  Uint32 open_clnt(trp_client*, int blockNo = -1);
70  int close_clnt(trp_client*);
71 
72  Uint32 get_active_ndb_objects() const;
73 
74  // Only sends to nodes which are alive
75 private:
76  int sendSignal(const NdbApiSignal * signal, NodeId nodeId);
77  int sendSignal(const NdbApiSignal*, NodeId,
78  const LinearSectionPtr ptr[3], Uint32 secs);
79  int sendSignal(const NdbApiSignal*, NodeId,
80  const GenericSectionPtr ptr[3], Uint32 secs);
81  int sendFragmentedSignal(const NdbApiSignal*, NodeId,
82  const LinearSectionPtr ptr[3], Uint32 secs);
83  int sendFragmentedSignal(const NdbApiSignal*, NodeId,
84  const GenericSectionPtr ptr[3], Uint32 secs);
85 public:
86 
90  void ext_set_max_api_reg_req_interval(Uint32 ms);
91  void ext_update_connections();
92  struct in_addr ext_get_connect_address(Uint32 nodeId);
93  void ext_forceHB();
94  bool ext_isConnected(NodeId aNodeId);
95  void ext_doConnect(int aNodeId);
96 
97  // Is node available for running transactions
98 private:
99  bool get_node_alive(NodeId nodeId) const;
100  bool getIsNodeSendable(NodeId nodeId) const;
101 
102 public:
103  Uint32 getMinDbNodeVersion() const;
104 
105  // My own processor id
106  NodeId ownId() const;
107 
108  void connected();
109 
110  void doConnect(int NodeId);
111  void reportConnected(int NodeId);
112  void doDisconnect(int NodeId);
113  void reportDisconnected(int NodeId);
114 
115  NodeId get_an_alive_node();
116  void trp_node_status(NodeId, Uint32 event);
117 
121  void for_each(trp_client* clnt,
122  const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]);
123 
124  void lock_mutex();
125  void unlock_mutex();
126 
127  // Improving the API performance
128  void forceSend(Uint32 block_number);
129  void checkForceSend(Uint32 block_number);
130 
131  TransporterRegistry* get_registry() { return theTransporterRegistry;};
132 
133 /*
134  When a thread has sent its signals and is ready to wait for reception
135  of these it does normally always wait on a conditional mutex and
136  the actual reception is handled by the receiver thread in the NDB API.
137  With the below new methods and variables each thread has the possibility
138  of becoming owner of the "right" to poll for signals. Effectually this
139  means that the thread acts temporarily as a receiver thread.
140  For the thread that succeeds in grabbing this "ownership" it will avoid
141  a number of expensive calls to conditional mutex and even more expensive
142  context switches to wake up.
143  When an owner of the poll "right" has completed its own task it is likely
144  that there are others still waiting. In this case we pick one of the
145  threads as new owner of the poll "right". Since we want to switch owner
146  as seldom as possible we always pick the last thread which is likely to
147  be the last to complete its reception.
148 */
149  void start_poll(trp_client*);
150  void do_poll(trp_client* clnt, Uint32 wait_time);
151  void complete_poll(trp_client*);
152  void wakeup(trp_client*);
153 
154  void external_poll(Uint32 wait_time);
155 
156  trp_client* get_poll_owner(bool) const { return m_poll_owner;}
157  trp_client* remove_last_from_poll_queue();
158  void add_to_poll_queue(trp_client* clnt);
159  void remove_from_poll_queue(trp_client* clnt);
160 
161  trp_client * m_poll_owner;
162  trp_client * m_poll_queue_head; // First in queue
163  trp_client * m_poll_queue_tail; // Last in queue
164  /* End poll owner stuff */
165 
166  // heart beat received from a node (e.g. a signal came)
167  void hb_received(NodeId n);
168  void set_auto_reconnect(int val);
169  int get_auto_reconnect() const;
170 
171  /* TransporterCallback interface. */
172  void deliver_signal(SignalHeader * const header,
173  Uint8 prio,
174  Uint32 * const signalData,
175  LinearSectionPtr ptr[3]);
176  int checkJobBuffer();
177  void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
178  void reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes);
179  void reportConnect(NodeId nodeId);
180  void reportDisconnect(NodeId nodeId, Uint32 errNo);
181  void reportError(NodeId nodeId, TransporterError errorCode,
182  const char *info = 0);
183  void transporter_recv_from(NodeId node);
184  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
185  {
186  return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
187  }
188  Uint32 bytes_sent(NodeId node, Uint32 bytes)
189  {
190  return theTransporterRegistry->bytes_sent(node, bytes);
191  }
192  bool has_data_to_send(NodeId node)
193  {
194  return theTransporterRegistry->has_data_to_send(node);
195  }
196  void reset_send_buffer(NodeId node, bool should_be_empty)
197  {
198  theTransporterRegistry->reset_send_buffer(node, should_be_empty);
199  }
200 
201 private:
202 
203  friend class trp_client;
204  friend class ClusterMgr;
205  friend class ArbitMgr;
206  friend class Ndb_cluster_connection;
207  friend class Ndb_cluster_connection_impl;
208 
209  bool isConnected(NodeId aNodeId);
210  void doStop();
211 
212  TransporterRegistry* theTransporterRegistry;
213  SocketServer m_socket_server;
214  int sendPerformedLastInterval;
215  NodeId theOwnId;
216  NodeId theStartNodeId;
217 
218  ClusterMgr* theClusterMgr;
219 
220  // Improving the API response time
221  int checkCounter;
222  Uint32 currentSendLimit;
223 
224  void calculateSendLimit();
225 
226  // Declarations for the receive and send thread
227  int theStopReceive;
228 
229  void threadMainSend(void);
230  NdbThread* theSendThread;
231  void threadMainReceive(void);
232  NdbThread* theReceiveThread;
233 
234  friend void* runSendRequest_C(void*);
235  friend void* runReceiveResponse_C(void*);
236 
237  bool do_connect_mgm(NodeId, const ndb_mgm_configuration*);
238 
242 private:
243 
244  struct ThreadData {
245  STATIC_CONST( ACTIVE = (1 << 16) | 1 );
246  STATIC_CONST( INACTIVE = (1 << 16) );
247  STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );
248 
249  ThreadData(Uint32 initialSize = 32);
250 
251  Uint32 m_use_cnt;
252  Uint32 m_firstFree;
253  Vector<Uint32> m_statusNext;
254  Vector<trp_client*> m_objectExecute;
255 
256  int open(trp_client*);
257  int close(int number);
258  void expand(Uint32 size);
259 
260  inline trp_client* get(Uint16 blockNo) const {
261  blockNo -= MIN_API_BLOCK_NO;
262  if(likely (blockNo < m_objectExecute.size()))
263  {
264  return m_objectExecute.getBase()[blockNo];
265  }
266  return 0;
267  }
268  } m_threads;
269 
270  Uint32 m_fixed2dynamic[NO_API_FIXED_BLOCKS];
271  Uint32 m_fragmented_signal_id;
272 
273 public:
274  NdbMutex* theMutexPtr;
275 
276 public:
277  GlobalDictCache *m_globalDictCache;
278 };
279 
280 inline
281 void
282 TransporterFacade::lock_mutex()
283 {
284  NdbMutex_Lock(theMutexPtr);
285 }
286 
287 inline
288 void
289 TransporterFacade::unlock_mutex()
290 {
291  NdbMutex_Unlock(theMutexPtr);
292 }
293 
294 #include "ClusterMgr.hpp"
295 #include "ndb_cluster_connection_impl.hpp"
296 
297 inline
298 unsigned Ndb_cluster_connection_impl::get_connect_count() const
299 {
300  if (m_transporter_facade->theClusterMgr)
301  return m_transporter_facade->theClusterMgr->m_connect_count;
302  return 0;
303 }
304 
305 inline
306 unsigned Ndb_cluster_connection_impl::get_min_db_version() const
307 {
308  return m_transporter_facade->getMinDbNodeVersion();
309 }
310 
311 inline
312 bool
313 TransporterFacade::get_node_alive(NodeId n) const {
314  if (theClusterMgr)
315  {
316  return theClusterMgr->getNodeInfo(n).m_alive;
317  }
318  return 0;
319 }
320 
321 inline
322 void
323 TransporterFacade::hb_received(NodeId n) {
324  theClusterMgr->hb_received(n);
325 }
326 
327 inline
328 Uint32
329 TransporterFacade::getMinDbNodeVersion() const
330 {
331  if (theClusterMgr)
332  return theClusterMgr->minDbVersion;
333  else
334  return 0;
335 }
336 
337 inline
338 const trp_node &
339 trp_client::getNodeInfo(Uint32 nodeId) const
340 {
341  return m_facade->theClusterMgr->getNodeInfo(nodeId);
342 }
343 
354 {
355 private :
356  const Uint32* data;
357  Uint32 len;
358  bool read;
359 public :
360  LinearSectionIterator(const Uint32* _data, Uint32 _len)
361  {
362  data= (_len == 0)? NULL:_data;
363  len= _len;
364  read= false;
365  }
366 
368  {};
369 
370  void reset()
371  {
372  /* Reset iterator */
373  read= false;
374  }
375 
376  const Uint32* getNextWords(Uint32& sz)
377  {
378  if (likely(!read))
379  {
380  read= true;
381  sz= len;
382  return data;
383  }
384  sz= 0;
385  return NULL;
386  }
387 };
388 
389 
401 {
402 private :
403  NdbApiSignal* firstSignal;
404  NdbApiSignal* currentSignal;
405 public :
407  {
408  firstSignal= currentSignal= signal;
409  }
410 
412  {};
413 
414  void reset()
415  {
416  /* Reset iterator */
417  currentSignal= firstSignal;
418  }
419 
420  const Uint32* getNextWords(Uint32& sz);
421 };
422 
423 /*
424  * GenericSectionIteratorReader
425  * Helper class to simplify reading data from
426  * GenericSectionIterator implementations
427  */
428 
430 {
431 private :
433  const Uint32* chunkPtr;
434  Uint32 chunkRemain;
435 public :
437  {
438  gsi = _gsi;
439  chunkPtr = NULL;
440  chunkRemain = 0;
441  }
442 
443  void copyNWords(Uint32* dest, Uint32 n)
444  {
445  while (n)
446  {
447  if (chunkRemain == 0)
448  {
449  /* Get next contiguous stretch of words from
450  * the iterator
451  */
452  chunkPtr = gsi->getNextWords(chunkRemain);
453  if (!chunkRemain)
454  abort(); // Must have the words the caller asks for
455  }
456  else
457  {
458  /* Have some words from the iterator, copy some/
459  * all of them
460  */
461  Uint32 wordsToCopy = MIN(chunkRemain, n);
462  memcpy(dest, chunkPtr, wordsToCopy << 2);
463  chunkPtr += wordsToCopy;
464  chunkRemain -= wordsToCopy;
465 
466  dest += wordsToCopy;
467  n -= wordsToCopy;
468  }
469  }
470  }
471 };
472 
473 
474 
475 
476 #endif // TransporterFacade_H