MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ClusterMgr.cpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 #include <ndb_limits.h>
20 #include <util/version.h>
21 
22 #include "TransporterFacade.hpp"
23 #include <kernel/GlobalSignalNumbers.h>
24 
25 #include "ClusterMgr.hpp"
26 #include <IPCConfig.hpp>
27 #include "NdbApiSignal.hpp"
28 #include <NdbSleep.h>
29 #include <NdbOut.hpp>
30 #include <NdbTick.h>
31 
32 
33 #include <signaldata/NodeFailRep.hpp>
34 #include <signaldata/NFCompleteRep.hpp>
35 #include <signaldata/ApiRegSignalData.hpp>
36 #include <signaldata/AlterTable.hpp>
37 #include <signaldata/SumaImpl.hpp>
38 
39 #include <mgmapi.h>
40 #include <mgmapi_configuration.hpp>
41 #include <mgmapi_config_parameters.h>
42 
43 int global_flag_skip_invalidate_cache = 0;
44 int global_flag_skip_waiting_for_clean_cache = 0;
45 //#define DEBUG_REG
46 
47 // Just a C wrapper for threadMain
48 extern "C"
49 void*
50 runClusterMgr_C(void * me)
51 {
52  ((ClusterMgr*) me)->threadMain();
53 
54  return NULL;
55 }
56 
57 ClusterMgr::ClusterMgr(TransporterFacade & _facade):
58  theStop(0),
59  theFacade(_facade),
60  theArbitMgr(NULL),
61  m_connect_count(0),
62  m_max_api_reg_req_interval(~0),
63  noOfAliveNodes(0),
64  noOfConnectedNodes(0),
65  minDbVersion(0),
66  theClusterMgrThread(NULL),
67  waitingForHB(false),
68  m_cluster_state(CS_waiting_for_clean_cache)
69 {
70  DBUG_ENTER("ClusterMgr::ClusterMgr");
71  clusterMgrThreadMutex = NdbMutex_Create();
72  waitForHBCond= NdbCondition_Create();
73  m_auto_reconnect = -1;
74 
75  Uint32 ret = this->open(&theFacade, API_CLUSTERMGR);
76  if (unlikely(ret == 0))
77  {
78  ndbout_c("Failed to register ClusterMgr! ret: %d", ret);
79  abort();
80  }
81  DBUG_VOID_RETURN;
82 }
83 
84 ClusterMgr::~ClusterMgr()
85 {
86  DBUG_ENTER("ClusterMgr::~ClusterMgr");
87  doStop();
88  if (theArbitMgr != 0)
89  {
90  delete theArbitMgr;
91  theArbitMgr = 0;
92  }
93  this->close(); // disconnect from TransporterFacade
94  NdbCondition_Destroy(waitForHBCond);
95  NdbMutex_Destroy(clusterMgrThreadMutex);
96  DBUG_VOID_RETURN;
97 }
98 
99 void
100 ClusterMgr::configure(Uint32 nodeId,
102 {
103  ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
104  for(iter.first(); iter.valid(); iter.next()){
105  Uint32 nodeId = 0;
106  if(iter.get(CFG_NODE_ID, &nodeId))
107  continue;
108 
109  // Check array bounds + don't allow node 0 to be touched
110  assert(nodeId > 0 && nodeId < MAX_NODES);
111  trp_node& theNode = theNodes[nodeId];
112  theNode.defined = true;
113 
114  unsigned type;
115  if(iter.get(CFG_TYPE_OF_SECTION, &type))
116  continue;
117 
118  switch(type){
119  case NODE_TYPE_DB:
120  theNode.m_info.m_type = NodeInfo::DB;
121  break;
122  case NODE_TYPE_API:
123  theNode.m_info.m_type = NodeInfo::API;
124  break;
125  case NODE_TYPE_MGM:
126  theNode.m_info.m_type = NodeInfo::MGM;
127  break;
128  default:
129  type = type;
130  break;
131  }
132  }
133 
134  /* Mark all non existing nodes as not defined */
135  for(Uint32 i = 0; i<MAX_NODES; i++) {
136  if (iter.first())
137  continue;
138 
139  if (iter.find(CFG_NODE_ID, i))
140  theNodes[i]= Node();
141  }
142 
143 #if 0
144  print_nodes("init");
145 #endif
146 
147  // Configure arbitrator
148  Uint32 rank = 0;
149  iter.first();
150  iter.find(CFG_NODE_ID, nodeId); // let not found in config mean rank=0
151  iter.get(CFG_NODE_ARBIT_RANK, &rank);
152 
153  if (rank > 0)
154  {
155  // The arbitrator should be active
156  if (!theArbitMgr)
157  theArbitMgr = new ArbitMgr(* this);
158  theArbitMgr->setRank(rank);
159 
160  Uint32 delay = 0;
161  iter.get(CFG_NODE_ARBIT_DELAY, &delay);
162  theArbitMgr->setDelay(delay);
163  }
164  else if (theArbitMgr)
165  {
166  // No arbitrator should be started
167  theArbitMgr->doStop(NULL);
168  delete theArbitMgr;
169  theArbitMgr= NULL;
170  }
171 }
172 
173 void
174 ClusterMgr::startThread() {
175  Guard g(clusterMgrThreadMutex);
176 
177  theStop = -1;
178  theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
179  (void**)this,
180  0, // default stack size
181  "ndb_clustermgr",
182  NDB_THREAD_PRIO_HIGH);
183  Uint32 cnt = 0;
184  while (theStop == -1 && cnt < 60)
185  {
186  NdbCondition_WaitTimeout(waitForHBCond, clusterMgrThreadMutex, 1000);
187  }
188 
189  assert(theStop == 0);
190 }
191 
192 void
193 ClusterMgr::doStop( ){
194  DBUG_ENTER("ClusterMgr::doStop");
195  {
196  Guard g(clusterMgrThreadMutex);
197  if(theStop == 1){
198  DBUG_VOID_RETURN;
199  }
200  }
201 
202  void *status;
203  theStop = 1;
204  if (theClusterMgrThread) {
205  NdbThread_WaitFor(theClusterMgrThread, &status);
206  NdbThread_Destroy(&theClusterMgrThread);
207  }
208 
209  if (theArbitMgr != NULL)
210  {
211  theArbitMgr->doStop(NULL);
212  }
213 
214  DBUG_VOID_RETURN;
215 }
216 
217 void
218 ClusterMgr::forceHB()
219 {
220  theFacade.lock_mutex();
221 
222  if(waitingForHB)
223  {
224  NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
225  theFacade.unlock_mutex();
226  return;
227  }
228 
229  waitingForHB= true;
230 
231  NodeBitmask ndb_nodes;
232  ndb_nodes.clear();
233  waitForHBFromNodes.clear();
234  for(Uint32 i = 1; i < MAX_NDB_NODES; i++)
235  {
236  const trp_node &node= getNodeInfo(i);
237  if(!node.defined)
238  continue;
239  if(node.m_info.getType() == NodeInfo::DB)
240  {
241  ndb_nodes.set(i);
242  waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
243  }
244  }
245  waitForHBFromNodes.bitAND(ndb_nodes);
246  theFacade.unlock_mutex();
247 
248 #ifdef DEBUG_REG
249  char buf[128];
250  ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
251 #endif
252  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
253 
254  signal.theVerId_signalNumber = GSN_API_REGREQ;
255  signal.theReceiversBlockNumber = QMGR;
256  signal.theTrace = 0;
257  signal.theLength = ApiRegReq::SignalLength;
258 
259  ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
260  req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
261  req->version = NDB_VERSION;
262  req->mysql_version = NDB_MYSQL_VERSION_D;
263 
264  {
265  lock();
266  int nodeId= 0;
267  for(int i=0;
268  (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.find(i));
269  i= nodeId+1)
270  {
271 #ifdef DEBUG_REG
272  ndbout << "FORCE HB to " << nodeId << endl;
273 #endif
274  raw_sendSignal(&signal, nodeId);
275  }
276  unlock();
277  }
278  /* Wait for nodes to reply - if any heartbeats was sent */
279  theFacade.lock_mutex();
280  if (!waitForHBFromNodes.isclear())
281  NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
282 
283  waitingForHB= false;
284 #ifdef DEBUG_REG
285  ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
286 #endif
287  theFacade.unlock_mutex();
288 }
289 
290 void
291 ClusterMgr::startup()
292 {
293  assert(theStop == -1);
294  Uint32 nodeId = getOwnNodeId();
295  Node & cm_node = theNodes[nodeId];
296  trp_node & theNode = cm_node;
297  assert(theNode.defined);
298 
299  lock();
300  theFacade.doConnect(nodeId);
301  unlock();
302 
303  for (Uint32 i = 0; i<3000; i++)
304  {
305  lock();
306  theFacade.theTransporterRegistry->update_connections();
307  unlock();
308  if (theNode.is_connected())
309  break;
310  NdbSleep_MilliSleep(20);
311  }
312 
313  assert(theNode.is_connected());
314  Guard g(clusterMgrThreadMutex);
315  theStop = 0;
316  NdbCondition_Broadcast(waitForHBCond);
317 }
318 
319 void
320 ClusterMgr::threadMain()
321 {
322  startup();
323 
324  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
325 
326  signal.theVerId_signalNumber = GSN_API_REGREQ;
327  signal.theTrace = 0;
328  signal.theLength = ApiRegReq::SignalLength;
329 
330  ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
331  req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
332  req->version = NDB_VERSION;
333  req->mysql_version = NDB_MYSQL_VERSION_D;
334 
335  NdbApiSignal nodeFail_signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
336  nodeFail_signal.theVerId_signalNumber = GSN_NODE_FAILREP;
337  nodeFail_signal.theReceiversBlockNumber = API_CLUSTERMGR;
338  nodeFail_signal.theTrace = 0;
339  nodeFail_signal.theLength = NodeFailRep::SignalLengthLong;
340 
341  NDB_TICKS timeSlept = 100;
342  NDB_TICKS now = NdbTick_CurrentMillisecond();
343 
344  while(!theStop)
345  {
346  /* Sleep at 100ms between each heartbeat check */
347  NDB_TICKS before = now;
348  for (Uint32 i = 0; i<10; i++)
349  {
350  NdbSleep_MilliSleep(10);
351  {
352  Guard g(clusterMgrThreadMutex);
356  start_poll();
357  do_poll(0);
358  complete_poll();
359  }
360  }
361  now = NdbTick_CurrentMillisecond();
362  timeSlept = (now - before);
363 
364  if (m_cluster_state == CS_waiting_for_clean_cache &&
365  theFacade.m_globalDictCache)
366  {
367  if (!global_flag_skip_waiting_for_clean_cache)
368  {
369  theFacade.m_globalDictCache->lock();
370  unsigned sz= theFacade.m_globalDictCache->get_size();
371  theFacade.m_globalDictCache->unlock();
372  if (sz)
373  continue;
374  }
375  m_cluster_state = CS_waiting_for_first_connect;
376  }
377 
378 
379  NodeFailRep * nodeFailRep = CAST_PTR(NodeFailRep,
380  nodeFail_signal.getDataPtrSend());
381  nodeFailRep->noOfNodes = 0;
382  NodeBitmask::clear(nodeFailRep->theNodes);
383 
384  trp_client::lock();
385  for (int i = 1; i < MAX_NODES; i++){
390  const NodeId nodeId = i;
391  // Check array bounds + don't allow node 0 to be touched
392  assert(nodeId > 0 && nodeId < MAX_NODES);
393  Node & cm_node = theNodes[nodeId];
394  trp_node & theNode = cm_node;
395 
396  if (!theNode.defined)
397  continue;
398 
399  if (theNode.is_connected() == false){
400  theFacade.doConnect(nodeId);
401  continue;
402  }
403 
404  if (!theNode.compatible){
405  continue;
406  }
407 
408  if (nodeId == getOwnNodeId() && theNode.is_confirmed())
409  {
414  continue;
415  }
416 
417  cm_node.hbCounter += (Uint32)timeSlept;
418  if (cm_node.hbCounter >= m_max_api_reg_req_interval ||
419  cm_node.hbCounter >= cm_node.hbFrequency)
420  {
424  if (cm_node.hbCounter >= cm_node.hbFrequency)
425  {
426  cm_node.hbMissed++;
427  cm_node.hbCounter = 0;
428  }
429 
430  if (theNode.m_info.m_type != NodeInfo::DB)
431  signal.theReceiversBlockNumber = API_CLUSTERMGR;
432  else
433  signal.theReceiversBlockNumber = QMGR;
434 
435 #ifdef DEBUG_REG
436  ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
437 #endif
438  raw_sendSignal(&signal, nodeId);
439  }//if
440 
441  if (cm_node.hbMissed == 4 && cm_node.hbFrequency > 0)
442  {
443  nodeFailRep->noOfNodes++;
444  NodeBitmask::set(nodeFailRep->theNodes, nodeId);
445  }
446  }
447 
448  if (nodeFailRep->noOfNodes)
449  {
450  raw_sendSignal(&nodeFail_signal, getOwnNodeId());
451  }
452  trp_client::unlock();
453  }
454 }
455 
456 void
458  const LinearSectionPtr ptr[3])
459 {
460  const Uint32 gsn = sig->theVerId_signalNumber;
461  const Uint32 * theData = sig->getDataPtr();
462 
463  switch (gsn){
464  case GSN_API_REGREQ:
465  execAPI_REGREQ(theData);
466  break;
467 
468  case GSN_API_REGCONF:
469  execAPI_REGCONF(sig, ptr);
470  break;
471 
472  case GSN_API_REGREF:
473  execAPI_REGREF(theData);
474  break;
475 
476  case GSN_NODE_FAILREP:
477  execNODE_FAILREP(sig, ptr);
478  break;
479 
480  case GSN_NF_COMPLETEREP:
481  execNF_COMPLETEREP(sig, ptr);
482  break;
483  case GSN_ARBIT_STARTREQ:
484  if (theArbitMgr != NULL)
485  theArbitMgr->doStart(theData);
486  break;
487 
488  case GSN_ARBIT_CHOOSEREQ:
489  if (theArbitMgr != NULL)
490  theArbitMgr->doChoose(theData);
491  break;
492 
493  case GSN_ARBIT_STOPORD:
494  if(theArbitMgr != NULL)
495  theArbitMgr->doStop(theData);
496  break;
497 
498  case GSN_ALTER_TABLE_REP:
499  {
500  if (theFacade.m_globalDictCache == NULL)
501  break;
502  const AlterTableRep* rep = (const AlterTableRep*)theData;
503  theFacade.m_globalDictCache->lock();
504  theFacade.m_globalDictCache->
505  alter_table_rep((const char*)ptr[0].p,
506  rep->tableId,
507  rep->tableVersion,
508  rep->changeType == AlterTableRep::CT_ALTERED);
509  theFacade.m_globalDictCache->unlock();
510  break;
511  }
512  case GSN_SUB_GCP_COMPLETE_REP:
513  {
517  theFacade.for_each(this, sig, ptr);
518 
522  {
523  BlockReference ownRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
524  NdbApiSignal tSignal(* sig);
525  Uint32* send= tSignal.getDataPtrSend();
526  memcpy(send, theData, tSignal.getLength() << 2);
527  CAST_PTR(SubGcpCompleteAck, send)->rep.senderRef = ownRef;
528  Uint32 ref= sig->theSendersBlockRef;
529  Uint32 aNodeId= refToNode(ref);
530  tSignal.theReceiversBlockNumber= refToBlock(ref);
531  tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
532  tSignal.theSendersBlockRef = API_CLUSTERMGR;
533  safe_sendSignal(&tSignal, aNodeId);
534  }
535  break;
536  }
537  case GSN_TAKE_OVERTCCONF:
538  {
542  theFacade.for_each(this, sig, ptr);
543  return;
544  }
545  case GSN_CONNECT_REP:
546  {
547  execCONNECT_REP(sig, ptr);
548  return;
549  }
550  case GSN_DISCONNECT_REP:
551  {
552  execDISCONNECT_REP(sig, ptr);
553  return;
554  }
555  default:
556  break;
557 
558  }
559  return;
560 }
561 
562 ClusterMgr::Node::Node()
563  : hbFrequency(0), hbCounter(0)
564 {
565 }
566 
578 void
579 ClusterMgr::recalcMinDbVersion()
580 {
581  Uint32 newMinDbVersion = ~ (Uint32) 0;
582 
583  for (Uint32 i = 0; i < MAX_NODES; i++)
584  {
585  trp_node& node = theNodes[i];
586 
587  if (node.is_connected() &&
588  node.is_confirmed() &&
589  node.m_info.getType() == NodeInfo::DB)
590  {
591  /* Include this node in the set of nodes used to
592  * compute the lowest current DB node version
593  */
594  assert(node.m_info.m_version);
595 
596  if (node.minDbVersion < newMinDbVersion)
597  {
598  newMinDbVersion = node.minDbVersion;
599  }
600  }
601  }
602 
603  /* Now update global min Db version if we have one.
604  * Otherwise set it to 0
605  */
606  newMinDbVersion = (newMinDbVersion == ~ (Uint32) 0) ?
607  0 :
608  newMinDbVersion;
609 
610 //#ifdef DEBUG_MINVER
611 
612 #ifdef DEBUG_MINVER
613  if (newMinDbVersion != minDbVersion)
614  {
615  ndbout << "Previous min Db node version was "
616  << NdbVersion(minDbVersion)
617  << " new min is "
618  << NdbVersion(newMinDbVersion)
619  << endl;
620  }
621  else
622  {
623  ndbout << "MinDbVersion recalculated, but is same : "
624  << NdbVersion(minDbVersion)
625  << endl;
626  }
627 #endif
628 
629  minDbVersion = newMinDbVersion;
630 }
631 
632 /******************************************************************************
633  * API_REGREQ and friends
634  ******************************************************************************/
635 
636 void
637 ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
638  const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
639  const NodeId nodeId = refToNode(apiRegReq->ref);
640 
641 #ifdef DEBUG_REG
642  ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
643 #endif
644 
645  assert(nodeId > 0 && nodeId < MAX_NODES);
646 
647  Node & cm_node = theNodes[nodeId];
648  trp_node & node = cm_node;
649  assert(node.defined == true);
650  assert(node.is_connected() == true);
651 
652  if(node.m_info.m_version != apiRegReq->version){
653  node.m_info.m_version = apiRegReq->version;
654  node.m_info.m_mysql_version = apiRegReq->mysql_version;
655  if (node.m_info.m_version < NDBD_SPLIT_VERSION)
656  node.m_info.m_mysql_version = 0;
657 
658  if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
659  getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
660  node.compatible = false;
661  } else {
662  node.compatible = true;
663  }
664  }
665 
666  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
667  signal.theVerId_signalNumber = GSN_API_REGCONF;
668  signal.theReceiversBlockNumber = API_CLUSTERMGR;
669  signal.theTrace = 0;
670  signal.theLength = ApiRegConf::SignalLength;
671 
672  ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
673  conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
674  conf->version = NDB_VERSION;
675  conf->mysql_version = NDB_MYSQL_VERSION_D;
676  conf->apiHeartbeatFrequency = cm_node.hbFrequency;
677 
678  conf->minDbVersion= 0;
679  conf->nodeState= node.m_state;
680 
681  node.set_confirmed(true);
682  if (safe_sendSignal(&signal, nodeId) != 0)
683  node.set_confirmed(false);
684 }
685 
686 void
687 ClusterMgr::execAPI_REGCONF(const NdbApiSignal * signal,
688  const LinearSectionPtr ptr[])
689 {
690  const ApiRegConf * apiRegConf = CAST_CONSTPTR(ApiRegConf,
691  signal->getDataPtr());
692  const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
693 
694 #ifdef DEBUG_REG
695  ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
696 #endif
697 
698  assert(nodeId > 0 && nodeId < MAX_NODES);
699 
700  Node & cm_node = theNodes[nodeId];
701  trp_node & node = cm_node;
702  assert(node.defined == true);
703  assert(node.is_connected() == true);
704 
705  if(node.m_info.m_version != apiRegConf->version){
706  node.m_info.m_version = apiRegConf->version;
707  node.m_info.m_mysql_version = apiRegConf->mysql_version;
708  if (node.m_info.m_version < NDBD_SPLIT_VERSION)
709  node.m_info.m_mysql_version = 0;
710 
711  if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM)
712  node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
713  node.m_info.m_version);
714  else
715  node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
716  node.m_info.m_version);
717  }
718 
719  node.set_confirmed(true);
720 
721  if (node.minDbVersion != apiRegConf->minDbVersion)
722  {
723  node.minDbVersion = apiRegConf->minDbVersion;
724  recalcMinDbVersion();
725  }
726 
727  if (node.m_info.m_version >= NDBD_255_NODES_VERSION)
728  {
729  node.m_state = apiRegConf->nodeState;
730  }
731  else
732  {
736  memcpy(&node.m_state, &apiRegConf->nodeState, sizeof(node.m_state) - 24);
737  }
738 
739  if (node.m_info.m_type == NodeInfo::DB)
740  {
744  if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED ||
745  node.m_state.getSingleUserMode()))
746  {
747  set_node_alive(node, true);
748  }
749  else
750  {
751  set_node_alive(node, false);
752  }
753  }
754 
755  cm_node.hbMissed = 0;
756  cm_node.hbCounter = 0;
757  cm_node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
758 
759  // Distribute signal to all threads/blocks
760  // TODO only if state changed...
761  theFacade.for_each(this, signal, ptr);
762 
763  check_wait_for_hb(nodeId);
764 }
765 
766 void
767 ClusterMgr::check_wait_for_hb(NodeId nodeId)
768 {
769  if(waitingForHB)
770  {
771  waitForHBFromNodes.clear(nodeId);
772 
773  if(waitForHBFromNodes.isclear())
774  {
775  waitingForHB= false;
776  NdbCondition_Broadcast(waitForHBCond);
777  }
778  }
779  return;
780 }
781 
782 
783 void
784 ClusterMgr::execAPI_REGREF(const Uint32 * theData){
785 
786  ApiRegRef * ref = (ApiRegRef*)theData;
787 
788  const NodeId nodeId = refToNode(ref->ref);
789 
790  assert(nodeId > 0 && nodeId < MAX_NODES);
791 
792  Node & cm_node = theNodes[nodeId];
793  trp_node & node = cm_node;
794 
795  assert(node.is_connected() == true);
796  assert(node.defined == true);
797  /* Only DB nodes will send API_REGREF */
798  assert(node.m_info.getType() == NodeInfo::DB);
799 
800  node.compatible = false;
801  set_node_alive(node, false);
802  node.m_state = NodeState::SL_NOTHING;
803  node.m_info.m_version = ref->version;
804 
805  switch(ref->errorCode){
806  case ApiRegRef::WrongType:
807  ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
808  abort();
809  case ApiRegRef::UnsupportedVersion:
810  default:
811  break;
812  }
813 
814  check_wait_for_hb(nodeId);
815 }
816 
817 void
818 ClusterMgr::execNF_COMPLETEREP(const NdbApiSignal* signal,
819  const LinearSectionPtr ptr[3])
820 {
821  const NFCompleteRep * nfComp = CAST_CONSTPTR(NFCompleteRep,
822  signal->getDataPtr());
823  const NodeId nodeId = nfComp->failedNodeId;
824  assert(nodeId > 0 && nodeId < MAX_NODES);
825 
826  trp_node & node = theNodes[nodeId];
827  if (node.nfCompleteRep == false)
828  {
829  node.nfCompleteRep = true;
830  theFacade.for_each(this, signal, ptr);
831  }
832 }
833 
834 void
836 {
837  DBUG_ENTER("ClusterMgr::reportConnected");
838  DBUG_PRINT("info", ("nodeId: %u", nodeId));
844  assert(nodeId > 0 && nodeId < MAX_NODES);
845  if (nodeId == getOwnNodeId())
846  {
847  noOfConnectedNodes--; // Don't count self...
848  }
849 
850  noOfConnectedNodes++;
851 
852  Node & cm_node = theNodes[nodeId];
853  trp_node & theNode = cm_node;
854 
855  cm_node.hbMissed = 0;
856  cm_node.hbCounter = 0;
857  cm_node.hbFrequency = 0;
858 
859  assert(theNode.is_connected() == false);
860 
865  theNode.set_connected(true);
866  theNode.m_state.m_connected_nodes.set(nodeId);
867  theNode.m_info.m_version = 0;
868  theNode.compatible = true;
869  theNode.nfCompleteRep = true;
870  theNode.m_node_fail_rep = false;
871  theNode.m_state.startLevel = NodeState::SL_NOTHING;
872  theNode.minDbVersion = 0;
873 
881  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
882  signal.theVerId_signalNumber = GSN_CONNECT_REP;
883  signal.theReceiversBlockNumber = API_CLUSTERMGR;
884  signal.theTrace = 0;
885  signal.theLength = 1;
886  signal.getDataPtrSend()[0] = nodeId;
887  raw_sendSignal(&signal, getOwnNodeId());
888  DBUG_VOID_RETURN;
889 }
890 
891 void
892 ClusterMgr::execCONNECT_REP(const NdbApiSignal* sig,
893  const LinearSectionPtr ptr[])
894 {
895  theFacade.for_each(this, sig, 0);
896 }
897 
898 void
899 ClusterMgr::set_node_dead(trp_node& theNode)
900 {
901  set_node_alive(theNode, false);
902  theNode.set_confirmed(false);
903  theNode.m_state.m_connected_nodes.clear();
904  theNode.m_state.startLevel = NodeState::SL_NOTHING;
905  theNode.m_info.m_connectCount ++;
906  theNode.nfCompleteRep = false;
907 }
908 
909 void
911 {
912  assert(nodeId > 0 && nodeId < MAX_NODES);
913  assert(noOfConnectedNodes > 0);
914 
922  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
923  signal.theVerId_signalNumber = GSN_DISCONNECT_REP;
924  signal.theReceiversBlockNumber = API_CLUSTERMGR;
925  signal.theTrace = 0;
926  signal.theLength = DisconnectRep::SignalLength;
927 
928  DisconnectRep * rep = CAST_PTR(DisconnectRep, signal.getDataPtrSend());
929  rep->nodeId = nodeId;
930  rep->err = 0;
931  raw_sendSignal(&signal, getOwnNodeId());
932 }
933 
934 void
935 ClusterMgr::execDISCONNECT_REP(const NdbApiSignal* sig,
936  const LinearSectionPtr ptr[])
937 {
938  const DisconnectRep * rep = CAST_CONSTPTR(DisconnectRep, sig->getDataPtr());
939  Uint32 nodeId = rep->nodeId;
940 
941  assert(nodeId > 0 && nodeId < MAX_NODES);
942  Node & cm_node = theNodes[nodeId];
943  trp_node & theNode = cm_node;
944 
945  bool node_failrep = theNode.m_node_fail_rep;
946  set_node_dead(theNode);
947  theNode.set_connected(false);
948 
949  noOfConnectedNodes--;
950  if (noOfConnectedNodes == 0)
951  {
952  if (!global_flag_skip_invalidate_cache &&
953  theFacade.m_globalDictCache)
954  {
955  theFacade.m_globalDictCache->lock();
956  theFacade.m_globalDictCache->invalidate_all();
957  theFacade.m_globalDictCache->unlock();
958  m_connect_count ++;
959  m_cluster_state = CS_waiting_for_clean_cache;
960  }
961 
962  if (m_auto_reconnect == 0)
963  {
964  theStop = 2;
965  }
966  }
967 
968  if (node_failrep == false)
969  {
973  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
974  signal.theVerId_signalNumber = GSN_NODE_FAILREP;
975  signal.theReceiversBlockNumber = API_CLUSTERMGR;
976  signal.theTrace = 0;
977  signal.theLength = NodeFailRep::SignalLengthLong;
978 
979  NodeFailRep * rep = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
980  rep->failNo = 0;
981  rep->masterNodeId = 0;
982  rep->noOfNodes = 1;
983  NodeBitmask::clear(rep->theNodes);
984  NodeBitmask::set(rep->theNodes, nodeId);
985  execNODE_FAILREP(&signal, 0);
986  }
987 }
988 
989 void
990 ClusterMgr::execNODE_FAILREP(const NdbApiSignal* sig,
991  const LinearSectionPtr ptr[])
992 {
993  const NodeFailRep * rep = CAST_CONSTPTR(NodeFailRep, sig->getDataPtr());
994 
995  NdbApiSignal signal(sig->theSendersBlockRef);
996  signal.theVerId_signalNumber = GSN_NODE_FAILREP;
997  signal.theReceiversBlockNumber = API_CLUSTERMGR;
998  signal.theTrace = 0;
999  signal.theLength = NodeFailRep::SignalLengthLong;
1000 
1001  NodeFailRep * copy = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
1002  copy->failNo = 0;
1003  copy->masterNodeId = 0;
1004  copy->noOfNodes = 0;
1005  NodeBitmask::clear(copy->theNodes);
1006 
1007  for (Uint32 i = NdbNodeBitmask::find_first(rep->theNodes);
1008  i != NdbNodeBitmask::NotFound;
1009  i = NdbNodeBitmask::find_next(rep->theNodes, i + 1))
1010  {
1011  Node & cm_node = theNodes[i];
1012  trp_node & theNode = cm_node;
1013 
1014  bool node_failrep = theNode.m_node_fail_rep;
1015  bool connected = theNode.is_connected();
1016  set_node_dead(theNode);
1017 
1018  if (node_failrep == false)
1019  {
1020  theNode.m_node_fail_rep = true;
1021  NodeBitmask::set(copy->theNodes, i);
1022  copy->noOfNodes++;
1023  }
1024 
1025  if (connected)
1026  {
1027  theFacade.doDisconnect(i);
1028  }
1029  }
1030 
1031  recalcMinDbVersion();
1032  if (copy->noOfNodes)
1033  {
1034  theFacade.for_each(this, &signal, 0); // report GSN_NODE_FAILREP
1035  }
1036 
1037  if (noOfAliveNodes == 0)
1038  {
1039  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
1040  signal.theVerId_signalNumber = GSN_NF_COMPLETEREP;
1041  signal.theReceiversBlockNumber = 0;
1042  signal.theTrace = 0;
1043  signal.theLength = NFCompleteRep::SignalLength;
1044 
1045  NFCompleteRep * rep = CAST_PTR(NFCompleteRep, signal.getDataPtrSend());
1046  rep->blockNo =0;
1047  rep->nodeId = getOwnNodeId();
1048  rep->unused = 0;
1049  rep->from = __LINE__;
1050 
1051  for (Uint32 i = 1; i < MAX_NODES; i++)
1052  {
1053  trp_node& theNode = theNodes[i];
1054  if (theNode.defined && theNode.nfCompleteRep == false)
1055  {
1056  rep->failedNodeId = i;
1057  execNF_COMPLETEREP(&signal, 0);
1058  }
1059  }
1060  }
1061 }
1062 
1063 void
1064 ClusterMgr::print_nodes(const char* where, NdbOut& out)
1065 {
1066  out << where << " >>" << endl;
1067  for (NodeId n = 1; n < MAX_NODES ; n++)
1068  {
1069  const trp_node node = getNodeInfo(n);
1070  if (!node.defined)
1071  continue;
1072  out << "node: " << n << endl;
1073  out << " -";
1074  out << " connected: " << node.is_connected();
1075  out << ", compatible: " << node.compatible;
1076  out << ", nf_complete_rep: " << node.nfCompleteRep;
1077  out << ", alive: " << node.m_alive;
1078  out << ", confirmed: " << node.is_confirmed();
1079  out << endl;
1080 
1081  out << " - " << node.m_info << endl;
1082  out << " - " << node.m_state << endl;
1083  }
1084  out << "<<" << endl;
1085 }
1086 
1087 
1088 /******************************************************************************
1089  * Arbitrator
1090  ******************************************************************************/
1091 ArbitMgr::ArbitMgr(ClusterMgr & c)
1092  : m_clusterMgr(c)
1093 {
1094  DBUG_ENTER("ArbitMgr::ArbitMgr");
1095 
1096  theThreadMutex = NdbMutex_Create();
1097  theInputCond = NdbCondition_Create();
1098  theInputMutex = NdbMutex_Create();
1099 
1100  theRank = 0;
1101  theDelay = 0;
1102  theThread = 0;
1103 
1104  theInputTimeout = 0;
1105  theInputFull = false;
1106  memset(&theInputBuffer, 0, sizeof(theInputBuffer));
1107  theState = StateInit;
1108 
1109  memset(&theStartReq, 0, sizeof(theStartReq));
1110  memset(&theChooseReq1, 0, sizeof(theChooseReq1));
1111  memset(&theChooseReq2, 0, sizeof(theChooseReq2));
1112  memset(&theStopOrd, 0, sizeof(theStopOrd));
1113 
1114  DBUG_VOID_RETURN;
1115 }
1116 
1117 ArbitMgr::~ArbitMgr()
1118 {
1119  DBUG_ENTER("ArbitMgr::~ArbitMgr");
1120  NdbMutex_Destroy(theThreadMutex);
1121  NdbCondition_Destroy(theInputCond);
1122  NdbMutex_Destroy(theInputMutex);
1123  DBUG_VOID_RETURN;
1124 }
1125 
1126 // Start arbitrator thread. This is kernel request.
1127 // First stop any previous thread since it is a left-over
1128 // which was never used and which now has wrong ticket.
1129 void
1130 ArbitMgr::doStart(const Uint32* theData)
1131 {
1132  ArbitSignal aSignal;
1133  NdbMutex_Lock(theThreadMutex);
1134  if (theThread != NULL) {
1135  aSignal.init(GSN_ARBIT_STOPORD, NULL);
1136  aSignal.data.code = StopRestart;
1137  sendSignalToThread(aSignal);
1138  void* value;
1139  NdbThread_WaitFor(theThread, &value);
1140  NdbThread_Destroy(&theThread);
1141  theState = StateInit;
1142  theInputFull = false;
1143  }
1144  aSignal.init(GSN_ARBIT_STARTREQ, theData);
1145  sendSignalToThread(aSignal);
1146  theThread = NdbThread_Create(
1147  runArbitMgr_C, (void**)this,
1148  0, // default stack size
1149  "ndb_arbitmgr",
1150  NDB_THREAD_PRIO_HIGH);
1151  NdbMutex_Unlock(theThreadMutex);
1152 }
1153 
1154 // The "choose me" signal from a candidate.
1155 void
1156 ArbitMgr::doChoose(const Uint32* theData)
1157 {
1158  ArbitSignal aSignal;
1159  aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
1160  sendSignalToThread(aSignal);
1161 }
1162 
1163 // Stop arbitrator thread via stop signal from the kernel
1164 // or when exiting API program.
1165 void
1166 ArbitMgr::doStop(const Uint32* theData)
1167 {
1168  DBUG_ENTER("ArbitMgr::doStop");
1169  ArbitSignal aSignal;
1170  NdbMutex_Lock(theThreadMutex);
1171  if (theThread != NULL) {
1172  aSignal.init(GSN_ARBIT_STOPORD, theData);
1173  if (theData == 0) {
1174  aSignal.data.code = StopExit;
1175  } else {
1176  aSignal.data.code = StopRequest;
1177  }
1178  sendSignalToThread(aSignal);
1179  void* value;
1180  NdbThread_WaitFor(theThread, &value);
1181  NdbThread_Destroy(&theThread);
1182  theState = StateInit;
1183  }
1184  NdbMutex_Unlock(theThreadMutex);
1185  DBUG_VOID_RETURN;
1186 }
1187 
1188 // private methods
1189 
1190 extern "C"
1191 void*
1192 runArbitMgr_C(void* me)
1193 {
1194  ((ArbitMgr*) me)->threadMain();
1195  return NULL;
1196 }
1197 
1198 void
1199 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
1200 {
1201 #ifdef DEBUG_ARBIT
1202  char buf[17] = "";
1203  ndbout << "arbit recv: ";
1204  ndbout << " gsn=" << aSignal.gsn;
1205  ndbout << " send=" << aSignal.data.sender;
1206  ndbout << " code=" << aSignal.data.code;
1207  ndbout << " node=" << aSignal.data.node;
1208  ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1209  ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1210  ndbout << endl;
1211 #endif
1212  aSignal.setTimestamp(); // signal arrival time
1213  NdbMutex_Lock(theInputMutex);
1214  while (theInputFull) {
1215  NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
1216  }
1217  theInputBuffer = aSignal;
1218  theInputFull = true;
1219  NdbCondition_Signal(theInputCond);
1220  NdbMutex_Unlock(theInputMutex);
1221 }
1222 
1223 void
1224 ArbitMgr::threadMain()
1225 {
1226  ArbitSignal aSignal;
1227  aSignal = theInputBuffer;
1228  threadStart(aSignal);
1229  bool stop = false;
1230  while (! stop) {
1231  NdbMutex_Lock(theInputMutex);
1232  while (! theInputFull) {
1233  NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
1234  threadTimeout();
1235  }
1236  aSignal = theInputBuffer;
1237  theInputFull = false;
1238  NdbCondition_Signal(theInputCond);
1239  NdbMutex_Unlock(theInputMutex);
1240  switch (aSignal.gsn) {
1241  case GSN_ARBIT_CHOOSEREQ:
1242  threadChoose(aSignal);
1243  break;
1244  case GSN_ARBIT_STOPORD:
1245  stop = true;
1246  break;
1247  }
1248  }
1249  threadStop(aSignal);
1250 }
1251 
1252 // handle events in the thread
1253 
1254 void
1255 ArbitMgr::threadStart(ArbitSignal& aSignal)
1256 {
1257  theStartReq = aSignal;
1258  sendStartConf(theStartReq, ArbitCode::ApiStart);
1259  theState = StateStarted;
1260  theInputTimeout = 1000;
1261 }
1262 
1263 void
1264 ArbitMgr::threadChoose(ArbitSignal& aSignal)
1265 {
1266  switch (theState) {
1267  case StateStarted: // first REQ
1268  if (! theStartReq.data.match(aSignal.data)) {
1269  sendChooseRef(aSignal, ArbitCode::ErrTicket);
1270  break;
1271  }
1272  theChooseReq1 = aSignal;
1273  if (theDelay == 0) {
1274  sendChooseConf(aSignal, ArbitCode::WinChoose);
1275  theState = StateFinished;
1276  theInputTimeout = 1000;
1277  break;
1278  }
1279  theState = StateChoose1;
1280  theInputTimeout = 1;
1281  return;
1282  case StateChoose1: // second REQ within Delay
1283  if (! theStartReq.data.match(aSignal.data)) {
1284  sendChooseRef(aSignal, ArbitCode::ErrTicket);
1285  break;
1286  }
1287  theChooseReq2 = aSignal;
1288  theState = StateChoose2;
1289  theInputTimeout = 1;
1290  return;
1291  case StateChoose2: // too many REQs - refuse all
1292  if (! theStartReq.data.match(aSignal.data)) {
1293  sendChooseRef(aSignal, ArbitCode::ErrTicket);
1294  break;
1295  }
1296  sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
1297  sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
1298  sendChooseRef(aSignal, ArbitCode::ErrToomany);
1299  theState = StateFinished;
1300  theInputTimeout = 1000;
1301  return;
1302  default:
1303  sendChooseRef(aSignal, ArbitCode::ErrState);
1304  break;
1305  }
1306 }
1307 
1308 void
1309 ArbitMgr::threadTimeout()
1310 {
1311  switch (theState) {
1312  case StateStarted:
1313  break;
1314  case StateChoose1:
1315  if (theChooseReq1.getTimediff() < theDelay)
1316  break;
1317  sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1318  theState = StateFinished;
1319  theInputTimeout = 1000;
1320  break;
1321  case StateChoose2:
1322  sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1323  sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1324  theState = StateFinished;
1325  theInputTimeout = 1000;
1326  break;
1327  default:
1328  break;
1329  }
1330 }
1331 
1332 void
1333 ArbitMgr::threadStop(ArbitSignal& aSignal)
1334 {
1335  switch (aSignal.data.code) {
1336  case StopExit:
1337  switch (theState) {
1338  case StateStarted:
1339  sendStopRep(theStartReq, 0);
1340  break;
1341  case StateChoose1: // just in time
1342  sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1343  break;
1344  case StateChoose2:
1345  sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1346  sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1347  break;
1348  case StateInit:
1349  case StateFinished:
1350  //??
1351  break;
1352  }
1353  break;
1354  case StopRequest:
1355  break;
1356  case StopRestart:
1357  break;
1358  }
1359 }
1360 
1361 // output routines
1362 
1363 void
1364 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
1365 {
1366  ArbitSignal copySignal = aSignal;
1367  copySignal.gsn = GSN_ARBIT_STARTCONF;
1368  copySignal.data.code = code;
1369  sendSignalToQmgr(copySignal);
1370 }
1371 
1372 void
1373 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
1374 {
1375  ArbitSignal copySignal = aSignal;
1376  copySignal.gsn = GSN_ARBIT_CHOOSECONF;
1377  copySignal.data.code = code;
1378  sendSignalToQmgr(copySignal);
1379 }
1380 
1381 void
1382 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
1383 {
1384  ArbitSignal copySignal = aSignal;
1385  copySignal.gsn = GSN_ARBIT_CHOOSEREF;
1386  copySignal.data.code = code;
1387  sendSignalToQmgr(copySignal);
1388 }
1389 
1390 void
1391 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
1392 {
1393  ArbitSignal copySignal = aSignal;
1394  copySignal.gsn = GSN_ARBIT_STOPREP;
1395  copySignal.data.code = code;
1396  sendSignalToQmgr(copySignal);
1397 }
1398 
1405 void
1406 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
1407 {
1408  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId()));
1409 
1410  signal.theVerId_signalNumber = aSignal.gsn;
1411  signal.theReceiversBlockNumber = QMGR;
1412  signal.theTrace = 0;
1413  signal.theLength = ArbitSignalData::SignalLength;
1414 
1415  ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
1416 
1417  sd->sender = numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId());
1418  sd->code = aSignal.data.code;
1419  sd->node = aSignal.data.node;
1420  sd->ticket = aSignal.data.ticket;
1421  sd->mask = aSignal.data.mask;
1422 
1423 #ifdef DEBUG_ARBIT
1424  char buf[17] = "";
1425  ndbout << "arbit send: ";
1426  ndbout << " gsn=" << aSignal.gsn;
1427  ndbout << " recv=" << aSignal.data.sender;
1428  ndbout << " code=" << aSignal.data.code;
1429  ndbout << " node=" << aSignal.data.node;
1430  ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1431  ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1432  ndbout << endl;
1433 #endif
1434 
1435  {
1436  m_clusterMgr.lock();
1437  m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
1438  m_clusterMgr.unlock();
1439  }
1440 }
1441