MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ClusterMgr.hpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #ifndef ClusterMgr_H
19 #define ClusterMgr_H
20 
21 #include <ndb_limits.h>
22 #include <NdbThread.h>
23 #include <NdbMutex.h>
24 #include <NdbCondition.h>
25 #include <signaldata/ArbitSignalData.hpp>
26 #include <signaldata/NodeStateSignalData.hpp>
27 #include "trp_client.hpp"
28 #include "trp_node.hpp"
29 #include <signaldata/DisconnectRep.hpp>
30 
31 extern "C" void* runClusterMgr_C(void * me);
32 
33 
37 class ClusterMgr : public trp_client
38 {
39  friend class TransporterFacade;
40  friend class ArbitMgr;
41  friend void* runClusterMgr_C(void * me);
42 public:
44  virtual ~ClusterMgr();
45  void configure(Uint32 nodeId, const ndb_mgm_configuration* config);
46 
47  void reportConnected(NodeId nodeId);
48  void reportDisconnected(NodeId nodeId);
49 
50  bool checkUpgradeCompatability(Uint32 nodeVersion);
51 
52  void doStop();
53  void startThread();
54 
55  void forceHB();
56  void set_max_api_reg_req_interval(unsigned int millisec) {
57  m_max_api_reg_req_interval = millisec;
58  }
59 
60  void lock() { NdbMutex_Lock(clusterMgrThreadMutex); trp_client::lock(); }
61  void unlock() { trp_client::unlock();NdbMutex_Unlock(clusterMgrThreadMutex); }
62 
63 private:
64  void startup();
65  void threadMain();
66 
67  int theStop;
68  class TransporterFacade & theFacade;
69  class ArbitMgr * theArbitMgr;
70 
71 public:
72  enum Cluster_state {
73  CS_waiting_for_clean_cache = 0,
74  CS_waiting_for_first_connect,
75  CS_connected
76  };
77 
78  struct Node : public trp_node
79  {
80  Node();
81 
85  Uint32 hbFrequency; // Heartbeat frequence
86  Uint32 hbCounter; // # milliseconds passed since last hb sent
87  Uint32 hbMissed; // # missed heartbeats
88  };
89 
90  const trp_node & getNodeInfo(NodeId) const;
91  Uint32 getNoOfConnectedNodes() const;
92  void hb_received(NodeId);
93 
94  int m_auto_reconnect;
95  Uint32 m_connect_count;
96 private:
97  Uint32 m_max_api_reg_req_interval;
98  Uint32 noOfAliveNodes;
99  Uint32 noOfConnectedNodes;
100  Uint32 minDbVersion;
101  Node theNodes[MAX_NODES];
102  NdbThread* theClusterMgrThread;
103 
104  NodeBitmask waitForHBFromNodes; // used in forcing HBs
105  NdbCondition* waitForHBCond;
106  bool waitingForHB;
107 
108  enum Cluster_state m_cluster_state;
112  NdbMutex* clusterMgrThreadMutex;
113 
117  void execAPI_REGREQ (const Uint32 * theData);
118  void execAPI_REGCONF (const NdbApiSignal*, const LinearSectionPtr ptr[]);
119  void execAPI_REGREF (const Uint32 * theData);
120  void execCONNECT_REP (const NdbApiSignal*, const LinearSectionPtr ptr[]);
121  void execDISCONNECT_REP(const NdbApiSignal*, const LinearSectionPtr ptr[]);
122  void execNODE_FAILREP (const NdbApiSignal*, const LinearSectionPtr ptr[]);
123  void execNF_COMPLETEREP(const NdbApiSignal*, const LinearSectionPtr ptr[]);
124 
125  void check_wait_for_hb(NodeId nodeId);
126 
127  inline void set_node_alive(trp_node& node, bool alive){
128 
129  // Only DB nodes can be "alive"
130  assert(!alive ||
131  (alive && node.m_info.getType() == NodeInfo::DB));
132 
133  if(node.m_alive && !alive)
134  {
135  assert(noOfAliveNodes);
136  noOfAliveNodes--;
137  }
138  else if(!node.m_alive && alive)
139  {
140  noOfAliveNodes++;
141  }
142  node.m_alive = alive;
143  }
144 
145  void set_node_dead(trp_node&);
146 
147  void print_nodes(const char* where, NdbOut& out = ndbout);
148  void recalcMinDbVersion();
149 
150 public:
154  virtual void trp_deliver_signal(const NdbApiSignal*,
155  const LinearSectionPtr p[3]);
156 };
157 
158 inline
159 const trp_node &
160 ClusterMgr::getNodeInfo(NodeId nodeId) const {
161  // Check array bounds
162  assert(nodeId < MAX_NODES);
163  return theNodes[nodeId];
164 }
165 
166 inline
167 Uint32
168 ClusterMgr::getNoOfConnectedNodes() const {
169  return noOfConnectedNodes;
170 }
171 
172 inline
173 void
174 ClusterMgr::hb_received(NodeId nodeId) {
175  // Check array bounds + don't allow node 0 to be touched
176  assert(nodeId > 0 && nodeId < MAX_NODES);
177  theNodes[nodeId].hbMissed = 0;
178 }
179 
180 /*****************************************************************************/
181 
188 extern "C" void* runArbitMgr_C(void* me);
189 
190 class ArbitMgr
191 {
192 public:
193  ArbitMgr(class ClusterMgr &);
194  ~ArbitMgr();
195 
196  inline void setRank(unsigned n) { theRank = n; }
197  inline void setDelay(unsigned n) { theDelay = n; }
198 
199  void doStart(const Uint32* theData);
200  void doChoose(const Uint32* theData);
201  void doStop(const Uint32* theData);
202 
203  friend void* runArbitMgr_C(void* me);
204 
205 private:
206  class ClusterMgr & m_clusterMgr;
207  unsigned theRank;
208  unsigned theDelay;
209 
210  void threadMain();
211  NdbThread* theThread;
212  NdbMutex* theThreadMutex; // not really needed
213 
214  struct ArbitSignal {
215  GlobalSignalNumber gsn;
216  ArbitSignalData data;
217  NDB_TICKS timestamp;
218 
219  ArbitSignal() {}
220 
221  inline void init(GlobalSignalNumber aGsn, const Uint32* aData) {
222  gsn = aGsn;
223  if (aData != NULL)
224  memcpy(&data, aData, sizeof(data));
225  else
226  memset(&data, 0, sizeof(data));
227  }
228 
229  inline void setTimestamp() {
230  timestamp = NdbTick_CurrentMillisecond();
231  }
232 
233  inline NDB_TICKS getTimediff() {
234  NDB_TICKS now = NdbTick_CurrentMillisecond();
235  return now < timestamp ? 0 : now - timestamp;
236  }
237  };
238 
239  NdbMutex* theInputMutex;
240  NdbCondition* theInputCond;
241  int theInputTimeout;
242  bool theInputFull; // the predicate
243  ArbitSignal theInputBuffer; // shared buffer
244 
245  void sendSignalToThread(ArbitSignal& aSignal);
246 
247  enum State { // thread states
248  StateInit,
249  StateStarted, // thread started
250  StateChoose1, // received one valid REQ
251  StateChoose2, // received two valid REQs
252  StateFinished // finished one way or other
253  };
254  State theState;
255 
256  enum Stop { // stop code in ArbitSignal.data.code
257  StopExit = 1, // at API exit
258  StopRequest = 2, // request from kernel
259  StopRestart = 3 // stop before restart
260  };
261 
262  void threadStart(ArbitSignal& aSignal); // handle thread events
263  void threadChoose(ArbitSignal& aSignal);
264  void threadTimeout();
265  void threadStop(ArbitSignal& aSignal);
266 
267  ArbitSignal theStartReq;
268  ArbitSignal theChooseReq1;
269  ArbitSignal theChooseReq2;
270  ArbitSignal theStopOrd;
271 
272  void sendStartConf(ArbitSignal& aSignal, Uint32);
273  void sendChooseRef(ArbitSignal& aSignal, Uint32);
274  void sendChooseConf(ArbitSignal& aSignal, Uint32);
275  void sendStopRep(ArbitSignal& aSignal, Uint32);
276 
277  void sendSignalToQmgr(ArbitSignal& aSignal);
278 };
279 
280 #endif