MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ndb_cluster_connection.cpp
1 /*
2  Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include "ndb_cluster_connection_impl.hpp"
21 #include <mgmapi_configuration.hpp>
22 #include <mgmapi_config_parameters.h>
23 #include "TransporterFacade.hpp"
24 #include <NdbOut.hpp>
25 #include <NdbSleep.h>
26 #include <NdbThread.h>
27 #include <ndb_limits.h>
28 #include <ConfigRetriever.hpp>
29 #include <ndb_version.h>
30 #include <mgmapi_debug.h>
31 #include <mgmapi_internal.h>
32 #include "NdbImpl.hpp"
33 #include "NdbDictionaryImpl.hpp"
34 
35 #include <NdbMutex.h>
36 #ifdef VM_TRACE
37 NdbMutex *ndb_print_state_mutex= NULL;
38 #endif
39 
40 #include <EventLogger.hpp>
41 extern EventLogger *g_eventLogger;
42 
43 static int g_ndb_connection_count = 0;
44 
45 /*
46  * Ndb_cluster_connection
47  */
49  : m_impl(* new Ndb_cluster_connection_impl(connect_string, 0, 0))
50 {
51 }
52 
54  int force_api_nodeid)
55  : m_impl(* new Ndb_cluster_connection_impl(connect_string, 0,
56  force_api_nodeid))
57 {
58 }
59 
60 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string,
62  main_connection)
63  : m_impl(* new Ndb_cluster_connection_impl(connect_string,
64  main_connection, 0))
65 {
66 }
67 
69 (Ndb_cluster_connection_impl& impl) : m_impl(impl)
70 {
71 }
72 
73 Ndb_cluster_connection::~Ndb_cluster_connection()
74 {
75  Ndb_cluster_connection_impl *tmp = &m_impl;
76  if (this != tmp)
77  delete tmp;
78 }
79 
80 int Ndb_cluster_connection::get_connected_port() const
81 {
82  if (m_impl.m_config_retriever)
83  return m_impl.m_config_retriever->get_mgmd_port();
84  return -1;
85 }
86 
87 const char *Ndb_cluster_connection::get_connected_host() const
88 {
89  if (m_impl.m_config_retriever)
90  return m_impl.m_config_retriever->get_mgmd_host();
91  return 0;
92 }
93 
94 const char *Ndb_cluster_connection::get_connectstring(char *buf,
95  int buf_sz) const
96 {
97  if (m_impl.m_config_retriever)
98  return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
99  return 0;
100 }
101 
102 extern "C"
103 void *
104 run_ndb_cluster_connection_connect_thread(void *me)
105 {
107  connection->m_run_connect_thread= 1;
108  connection->connect_thread();
109  return me;
110 }
111 
112 int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
113 {
114  int r;
115  DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
116  m_impl.m_connect_callback= connect_callback;
117  if ((r = connect(0,0,0)) == 1)
118  {
119  DBUG_PRINT("info",("starting thread"));
120  m_impl.m_connect_thread=
121  NdbThread_Create(run_ndb_cluster_connection_connect_thread,
122  (void**)&m_impl,
123  0, // default stack size
124  "ndb_cluster_connection",
125  NDB_THREAD_PRIO_LOW);
126  }
127  else if (r < 0)
128  {
129  DBUG_RETURN(-1);
130  }
131  else if (m_impl.m_connect_callback)
132  {
133  (*m_impl.m_connect_callback)();
134  }
135  DBUG_RETURN(0);
136 }
137 
138 void Ndb_cluster_connection::set_optimized_node_selection(int val)
139 {
140  m_impl.m_optimized_node_selection= val;
141 }
142 
143 void
144 Ndb_cluster_connection_impl::init_get_next_node
146 {
147  if (iter.scan_state != (Uint8)~0)
148  iter.cur_pos= iter.scan_state;
149  if (iter.cur_pos >= no_db_nodes())
150  iter.cur_pos= 0;
151  iter.init_pos= iter.cur_pos;
152  iter.scan_state= 0;
153  // fprintf(stderr,"[init %d]",iter.init_pos);
154  return;
155 }
156 
157 Uint32
158 Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
159 {
160  Uint32 cur_pos= iter.cur_pos;
161  if (cur_pos >= no_db_nodes())
162  return 0;
163 
164  Ndb_cluster_connection_impl::Node *nodes= m_all_nodes.getBase();
165  Ndb_cluster_connection_impl::Node &node= nodes[cur_pos];
166 
167  if (iter.scan_state != (Uint8)~0)
168  {
169  assert(iter.scan_state < no_db_nodes());
170  if (nodes[iter.scan_state].group == node.group)
171  iter.scan_state= ~0;
172  else
173  return nodes[iter.scan_state++].id;
174  }
175 
176  // fprintf(stderr,"[%d]",node.id);
177 
178  cur_pos++;
179  Uint32 init_pos= iter.init_pos;
180  if (cur_pos == node.next_group)
181  {
182  cur_pos= nodes[init_pos].this_group;
183  }
184 
185  // fprintf(stderr,"[cur_pos %d]",cur_pos);
186  if (cur_pos != init_pos)
187  iter.cur_pos= cur_pos;
188  else
189  {
190  iter.cur_pos= node.next_group;
191  iter.init_pos= node.next_group;
192  }
193  return node.id;
194 }
195 
196 Uint32
197 Ndb_cluster_connection_impl::get_next_alive_node(Ndb_cluster_connection_node_iter &iter)
198 {
199  Uint32 id;
200 
201  TransporterFacade *tp = m_impl.m_transporter_facade;
202  if (tp == 0 || tp->ownId() == 0)
203  return 0;
204 
205  while ((id = get_next_node(iter)))
206  {
207  tp->lock_mutex();
208  if (tp->get_node_alive(id) != 0)
209  {
210  tp->unlock_mutex();
211  return id;
212  }
213  tp->unlock_mutex();
214  }
215  return 0;
216 }
217 
218 unsigned
219 Ndb_cluster_connection::no_db_nodes()
220 {
221  return m_impl.m_all_nodes.size();
222 }
223 
224 unsigned
225 Ndb_cluster_connection::node_id()
226 {
227  return m_impl.m_transporter_facade->ownId();
228 }
229 
230 unsigned
231 Ndb_cluster_connection::max_nodegroup()
232 {
233  TransporterFacade *tp = m_impl.m_transporter_facade;
234  if (tp == 0 || tp->ownId() == 0)
235  return 0;
236 
238  tp->lock_mutex();
239  for(unsigned i= 0; i < no_db_nodes(); i++)
240  {
241  //************************************************
242  // If any node is answering, ndb is answering
243  //************************************************
244  trp_node n = tp->theClusterMgr->getNodeInfo(m_impl.m_all_nodes[i].id);
245  if (n.is_confirmed() && n.m_state.nodeGroup <= MAX_NDB_NODES)
246  ng.set(n.m_state.nodeGroup);
247  }
248  tp->unlock_mutex();
249 
250  if (ng.isclear())
251  return 0;
252 
253  Uint32 n = ng.find_first();
254  Uint32 m;
255  do
256  {
257  m = n;
258  } while ((n = ng.find(n+1)) != ng.NotFound);
259 
260  return m;
261 }
262 
263 int Ndb_cluster_connection::get_no_ready()
264 {
265  TransporterFacade *tp = m_impl.m_transporter_facade;
266  if (tp == 0 || tp->ownId() == 0)
267  return -1;
268 
269  unsigned int foundAliveNode = 0;
270  tp->lock_mutex();
271  for(unsigned i= 0; i < no_db_nodes(); i++)
272  {
273  //************************************************
274  // If any node is answering, ndb is answering
275  //************************************************
276  if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
277  foundAliveNode++;
278  }
279  }
280  tp->unlock_mutex();
281 
282  return foundAliveNode;
283 }
284 
285 int
287  int timeout_after_first_alive)
288 {
289  DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
290  TransporterFacade *tp = m_impl.m_transporter_facade;
291  if (tp == 0)
292  {
293  DBUG_RETURN(-1);
294  }
295  if (tp->ownId() == 0)
296  {
297  DBUG_RETURN(-1);
298  }
299  int secondsCounter = 0;
300  int milliCounter = 0;
301  int noChecksSinceFirstAliveFound = 0;
302  do {
303  unsigned int foundAliveNode = get_no_ready();
304 
305  if (foundAliveNode == no_db_nodes())
306  {
307  DBUG_RETURN(0);
308  }
309  else if (foundAliveNode > 0)
310  {
311  noChecksSinceFirstAliveFound++;
312  // 100 ms delay -> 10*
313  if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive)
314  DBUG_RETURN(1);
315  }
316  else if (secondsCounter >= timeout)
317  { // no alive nodes and timed out
318  DBUG_RETURN(-1);
319  }
320  NdbSleep_MilliSleep(100);
321  milliCounter += 100;
322  if (milliCounter >= 1000) {
323  secondsCounter++;
324  milliCounter = 0;
325  }//if
326  } while (1);
327 }
328 
329 unsigned Ndb_cluster_connection::get_connect_count() const
330 {
331  return m_impl.get_connect_count();
332 }
333 
334 unsigned Ndb_cluster_connection::get_min_db_version() const
335 {
336  return m_impl.get_min_db_version();
337 }
338 
339 int Ndb_cluster_connection::get_latest_error() const
340 {
341  return m_impl.m_latest_error;
342 }
343 
344 const char *Ndb_cluster_connection::get_latest_error_msg() const
345 {
346  return m_impl.m_latest_error_msg.c_str();
347 }
348 
349 /*
350  * Ndb_cluster_connection_impl
351  */
352 
353 Ndb_cluster_connection_impl::
354 Ndb_cluster_connection_impl(const char * connect_string,
355  Ndb_cluster_connection *main_connection,
356  int force_api_nodeid)
357  : Ndb_cluster_connection(*this),
358  m_main_connection(main_connection),
359  m_optimized_node_selection(1),
360  m_run_connect_thread(0),
361  m_latest_trans_gci(0),
362  m_first_ndb_object(0),
363  m_latest_error_msg(),
364  m_latest_error(0),
365  m_max_trans_id(0)
366 {
367  DBUG_ENTER("Ndb_cluster_connection");
368  DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%lx", (long) this));
369 
370  NdbMutex_Lock(g_ndb_connection_mutex);
371  if(g_ndb_connection_count++ == 0)
372  {
373  NdbColumnImpl::create_pseudo_columns();
374  g_eventLogger->createConsoleHandler();
375  g_eventLogger->setCategory("NdbApi");
376  g_eventLogger->enable(Logger::LL_ON, Logger::LL_ERROR);
377  /*
378  Disable repeated message handling as it interfers
379  with mysqld logging, in which case messages come out
380  of order. Same applies for regular ndbapi user.
381  */
382  g_eventLogger->setRepeatFrequency(0);
383  }
384  NdbMutex_Unlock(g_ndb_connection_mutex);
385 
386  m_event_add_drop_mutex= NdbMutex_Create();
387  m_new_delete_ndb_mutex = NdbMutex_Create();
388 
389  m_connect_thread= 0;
390  m_connect_callback= 0;
391 
392  /* Clear global stats baseline */
393  memset(globalApiStatsBaseline, 0, sizeof(globalApiStatsBaseline));
394 
395 #ifdef VM_TRACE
396  if (ndb_print_state_mutex == NULL)
397  ndb_print_state_mutex= NdbMutex_Create();
398 #endif
399  m_config_retriever=
400  new ConfigRetriever(connect_string, force_api_nodeid,
401  NDB_VERSION, NDB_MGM_NODE_TYPE_API);
402  if (m_config_retriever->hasError())
403  {
404  m_latest_error= 1;
405  m_latest_error_msg.assfmt
406  ("Could not initialize handle to management server: %s",
407  m_config_retriever->getErrorString());
408  printf("%s\n", get_latest_error_msg());
409  }
410  if (!m_main_connection)
411  {
412  m_globalDictCache = new GlobalDictCache;
413  m_transporter_facade= new TransporterFacade(m_globalDictCache);
414  }
415  else
416  {
417  assert(m_main_connection->m_impl.m_globalDictCache != NULL);
418  m_globalDictCache = 0;
419  m_transporter_facade=
420  new TransporterFacade(m_main_connection->m_impl.m_globalDictCache);
421 
422  // The secondary connection can't use same nodeid, clear the nodeid
423  // in ConfigRetriver to avoid asking for the same nodeid again
424  m_config_retriever->setNodeId(0);
425 
426  }
427 
428  DBUG_VOID_RETURN;
429 }
430 
431 Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
432 {
433  DBUG_ENTER("~Ndb_cluster_connection");
434 
435  if (m_first_ndb_object != 0)
436  {
437  g_eventLogger->warning("Deleting Ndb_cluster_connection with Ndb-object"
438  " not deleted");
439  Ndb * p = m_first_ndb_object;
440  printf("this: %p Ndb-object(s): ", (Ndb_cluster_connection*)this);
441  while (p)
442  {
443  printf("%p ", p);
444  p = p->theImpl->m_next_ndb_object;
445  }
446  printf("\n");
447  fflush(stdout);
448  }
449 
450  if (m_transporter_facade != 0)
451  {
452  m_transporter_facade->stop_instance();
453  }
454  if (m_globalDictCache)
455  {
456  delete m_globalDictCache;
457  }
458  if (m_connect_thread)
459  {
460  void *status;
461  m_run_connect_thread= 0;
462  NdbThread_WaitFor(m_connect_thread, &status);
463  NdbThread_Destroy(&m_connect_thread);
464  m_connect_thread= 0;
465  }
466  if (m_transporter_facade != 0)
467  {
468  delete m_transporter_facade;
469  m_transporter_facade = 0;
470  }
471  if (m_config_retriever)
472  {
473  delete m_config_retriever;
474  m_config_retriever= NULL;
475  }
476 #ifdef VM_TRACE
477  if (ndb_print_state_mutex != NULL)
478  {
479  NdbMutex_Destroy(ndb_print_state_mutex);
480  ndb_print_state_mutex= NULL;
481  }
482 #endif
483 
484  NdbMutex_Lock(g_ndb_connection_mutex);
485  if(--g_ndb_connection_count == 0)
486  {
487  NdbColumnImpl::destory_pseudo_columns();
488  }
489  NdbMutex_Unlock(g_ndb_connection_mutex);
490 
491  if (m_event_add_drop_mutex)
492  NdbMutex_Destroy(m_event_add_drop_mutex);
493  m_event_add_drop_mutex = 0;
494 
495  if (m_new_delete_ndb_mutex)
496  NdbMutex_Destroy(m_new_delete_ndb_mutex);
497  m_new_delete_ndb_mutex = 0;
498 
499  DBUG_VOID_RETURN;
500 }
501 
502 void
504 {
505  NdbMutex_Lock(m_impl.m_new_delete_ndb_mutex);
506 }
507 
508 void
510 {
511  NdbMutex_Unlock(m_impl.m_new_delete_ndb_mutex);
512 }
513 
514 const Ndb*
516 {
517  if (p == 0)
518  return m_impl.m_first_ndb_object;
519 
520  return p->theImpl->m_next_ndb_object;
521 }
522 
523 void
524 Ndb_cluster_connection_impl::link_ndb_object(Ndb* p)
525 {
527  if (m_first_ndb_object != 0)
528  {
529  m_first_ndb_object->theImpl->m_prev_ndb_object = p;
530  }
531 
532  p->theImpl->m_next_ndb_object = m_first_ndb_object;
533  m_first_ndb_object = p;
534 
535  p->theFirstTransId += m_max_trans_id;
537 }
538 
539 void
540 Ndb_cluster_connection_impl::unlink_ndb_object(Ndb* p)
541 {
543  Ndb* prev = p->theImpl->m_prev_ndb_object;
544  Ndb* next = p->theImpl->m_next_ndb_object;
545 
546  if (prev == 0)
547  {
548  assert(m_first_ndb_object == p);
549  m_first_ndb_object = next;
550  }
551  else
552  {
553  prev->theImpl->m_next_ndb_object = next;
554  }
555 
556  if (next)
557  {
558  next->theImpl->m_prev_ndb_object = prev;
559  }
560 
561  p->theImpl->m_prev_ndb_object = 0;
562  p->theImpl->m_next_ndb_object = 0;
563 
564  Uint32 transId = (Uint32)p->theFirstTransId;
565  if (transId > m_max_trans_id)
566  {
567  m_max_trans_id = transId;
568  }
569 
570  /* This Ndb is leaving for a better place,
571  * record its contribution to global warming
572  * for posterity
573  */
574  for (Uint32 i=0; i<Ndb::NumClientStatistics; i++)
575  {
576  globalApiStatsBaseline[i] += p->theImpl->clientStats[i];
577  }
578 
580 }
581 
582 void
583 Ndb_cluster_connection_impl::set_name(const char *name)
584 {
585  NdbMgmHandle h= m_config_retriever->get_mgmHandle();
586  ndb_mgm_set_name(h, name);
587 }
588 
589 int
590 Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
591  const ndb_mgm_configuration
592  &config)
593 {
594  DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
595  ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
596 
597  for(iter.first(); iter.valid(); iter.next())
598  {
599  Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
600  const char * remoteHostName= 0, * localHostName= 0;
601  if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
602  if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
603 
604  if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
605  remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
606 
607  iter.get(CFG_CONNECTION_GROUP, &group);
608 
609  {
610  const char * host1= 0, * host2= 0;
611  iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
612  iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
613  localHostName = (nodeid == nodeid1 ? host1 : host2);
614  remoteHostName = (nodeid == nodeid1 ? host2 : host1);
615  }
616 
617  Uint32 type = ~0;
618  if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
619 
620  switch(type){
621  case CONNECTION_TYPE_SHM:{
622  break;
623  }
624  case CONNECTION_TYPE_SCI:{
625  break;
626  }
627  case CONNECTION_TYPE_TCP:{
628  // connecting through localhost
629  // check if config_hostname is local
630  if (SocketServer::tryBind(0,remoteHostName))
631  group--; // upgrade group value
632  break;
633  }
634  }
635  if (m_all_nodes.push_back(Node(group,remoteNodeId)))
636  {
637  DBUG_RETURN(-1);
638  }
639  DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
640  for (int i= m_all_nodes.size()-2;
641  i >= 0 && m_all_nodes[i].group > m_all_nodes[i+1].group;
642  i--)
643  {
644  Node tmp= m_all_nodes[i];
645  m_all_nodes[i]= m_all_nodes[i+1];
646  m_all_nodes[i+1]= tmp;
647  }
648  }
649 
650  int i;
651  Uint32 cur_group, i_group= 0;
652  cur_group= ~0;
653  for (i= (int)m_all_nodes.size()-1; i >= 0; i--)
654  {
655  if (m_all_nodes[i].group != cur_group)
656  {
657  cur_group= m_all_nodes[i].group;
658  i_group= i+1;
659  }
660  m_all_nodes[i].next_group= i_group;
661  }
662  cur_group= ~0;
663  for (i= 0; i < (int)m_all_nodes.size(); i++)
664  {
665  if (m_all_nodes[i].group != cur_group)
666  {
667  cur_group= m_all_nodes[i].group;
668  i_group= i;
669  }
670  m_all_nodes[i].this_group= i_group;
671  }
672 #if 0
673  for (i= 0; i < (int)m_all_nodes.size(); i++)
674  {
675  fprintf(stderr, "[%d] %d %d %d %d\n",
676  i,
677  m_all_nodes[i].id,
678  m_all_nodes[i].group,
679  m_all_nodes[i].this_group,
680  m_all_nodes[i].next_group);
681  }
682 
683  do_test();
684 #endif
685  DBUG_RETURN(0);
686 }
687 
688 Uint32
689 Ndb_cluster_connection_impl::get_db_nodes(Uint8 arr[MAX_NDB_NODES]) const
690 {
691  Uint32 cnt = (Uint32)m_all_nodes.size();
692  assert(cnt < MAX_NDB_NODES);
693  const Node *nodes = m_all_nodes.getBase();
694  for (Uint32 i = 0; i<cnt; i++)
695  arr[i] = (Uint8)nodes[i].id;
696  return cnt;
697 }
698 
699 int
700 Ndb_cluster_connection_impl::configure(Uint32 nodeId,
701  const ndb_mgm_configuration &config)
702 {
703  DBUG_ENTER("Ndb_cluster_connection_impl::configure");
704  {
705  ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
706  if(iter.find(CFG_NODE_ID, nodeId))
707  DBUG_RETURN(-1);
708 
709  // Configure scan settings
710  Uint32 scan_batch_size= 0;
711  if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
712  m_config.m_scan_batch_size= scan_batch_size;
713  }
714  Uint32 batch_byte_size= 0;
715  if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
716  m_config.m_batch_byte_size= batch_byte_size;
717  }
718  Uint32 batch_size= 0;
719  if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
720  m_config.m_batch_size= batch_size;
721  }
722 
723  // Configure timeouts
724  Uint32 timeout = 120000;
725  for (iter.first(); iter.valid(); iter.next())
726  {
727  Uint32 tmp1 = 0, tmp2 = 0;
728  iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);
729  iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);
730  tmp1 += tmp2;
731  if (tmp1 > timeout)
732  timeout = tmp1;
733  }
734  m_config.m_waitfor_timeout = timeout;
735 
736  Uint32 queue = 0;
737  if (!iter.get(CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION, &queue))
738  {
739  m_config.m_default_queue_option = queue;
740  }
741  }
742  DBUG_RETURN(init_nodes_vector(nodeId, config));
743 }
744 
745 void
746 Ndb_cluster_connection_impl::do_test()
747 {
749  int n= no_db_nodes()+5;
750  Uint32 *nodes= new Uint32[n+1];
751 
752  for (int g= 0; g < n; g++)
753  {
754  for (int h= 0; h < n; h++)
755  {
756  Uint32 id;
758  {
759  for (int j= 0; j < g; j++)
760  {
761  nodes[j]= get_next_node(iter2);
762  }
763  }
764 
765  for (int i= 0; i < n; i++)
766  {
767  init_get_next_node(iter);
768  fprintf(stderr, "%d dead:(", g);
769  id= 0;
770  while (id == 0)
771  {
772  if ((id= get_next_node(iter)) == 0)
773  break;
774  for (int j= 0; j < g; j++)
775  {
776  if (nodes[j] == id)
777  {
778  fprintf(stderr, " %d", id);
779  id= 0;
780  break;
781  }
782  }
783  }
784  fprintf(stderr, ")");
785  if (id == 0)
786  {
787  break;
788  }
789  fprintf(stderr, " %d\n", id);
790  }
791  fprintf(stderr, "\n");
792  }
793  }
794  delete [] nodes;
795 }
796 
797 void Ndb_cluster_connection::set_name(const char *name)
798 {
799  m_impl.set_name(name);
800 }
801 
802 int Ndb_cluster_connection_impl::connect(int no_retries,
803  int retry_delay_in_seconds,
804  int verbose)
805 {
806  DBUG_ENTER("Ndb_cluster_connection::connect");
807  do {
808  if (m_config_retriever == 0)
809  {
810  if (!m_latest_error)
811  {
812  m_latest_error = 1;
813  m_latest_error_msg.assign("Ndb_cluster_connection init "
814  "error: m_config_retriever==0");
815  }
816  DBUG_PRINT("exit", ("no m_config_retriever, ret: -1"));
817  DBUG_RETURN(-1);
818  }
819  if (m_config_retriever->do_connect(no_retries,
820  retry_delay_in_seconds,
821  verbose))
822  {
823  char buf[1024];
824  m_latest_error = 1;
825  m_latest_error_msg.assfmt("Connect using '%s' timed out",
826  get_connectstring(buf, sizeof(buf)));
827  DBUG_PRINT("exit", ("mgmt server not up yet, ret: 1"));
828  DBUG_RETURN(1); // mgmt server not up yet
829  }
830 
831  Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,
832  3/*delay*/);
833  if(nodeId == 0)
834  break;
835  ndb_mgm_configuration * props = m_config_retriever->getConfig(nodeId);
836  if(props == 0)
837  break;
838 
839  if (configure(nodeId, *props))
840  {
841  ndb_mgm_destroy_configuration(props);
842  DBUG_PRINT("exit", ("malloc failure, ret: -1"));
843  DBUG_RETURN(-1);
844  }
845 
846  if (m_transporter_facade->start_instance(nodeId, props) < 0)
847  {
848  ndb_mgm_destroy_configuration(props);
849  DBUG_RETURN(-1);
850  }
851 
852  ndb_mgm_destroy_configuration(props);
853  m_transporter_facade->connected();
854  m_latest_error = 0;
855  m_latest_error_msg.assign("");
856  DBUG_PRINT("exit", ("connect ok, ret: 0"));
857  DBUG_RETURN(0);
858  } while(0);
859 
860  const char* erString = m_config_retriever->getErrorString();
861  if (erString == 0) {
862  erString = "No error specified!";
863  }
864  m_latest_error = 1;
865  m_latest_error_msg.assfmt("Configuration error: %s", erString);
866  ndbout << get_latest_error_msg() << endl;
867  DBUG_PRINT("exit", ("connect failed, '%s' ret: -1", erString));
868  DBUG_RETURN(-1);
869 }
870 
871 
872 int
874  int retry_delay_in_seconds,
875  int verbose)
876 {
877  return m_impl.connect(no_retries, retry_delay_in_seconds, verbose);
878 }
879 
880 
881 void Ndb_cluster_connection_impl::connect_thread()
882 {
883  DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
884  int r;
885  do {
886  NdbSleep_SecSleep(1);
887  if ((r = connect(0,0,0)) == 0)
888  break;
889  if (r == -1) {
890  printf("Ndb_cluster_connection::connect_thread error\n");
891  DBUG_ASSERT(false);
892  m_run_connect_thread= 0;
893  } else {
894  // Wait before making a new connect attempt
895  NdbSleep_SecSleep(1);
896  }
897  } while (m_run_connect_thread);
898  if (m_connect_callback)
899  (*m_connect_callback)();
900  DBUG_VOID_RETURN;
901 }
902 
903 Uint64 *
904 Ndb_cluster_connection::get_latest_trans_gci()
905 {
906  return m_impl.get_latest_trans_gci();
907 }
908 
909 void
910 Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter)
911 {
912  m_impl.init_get_next_node(iter);
913 }
914 
915 Uint32
916 Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter)
917 {
918  return m_impl.get_next_node(iter);
919 }
920 
921 unsigned int
922 Ndb_cluster_connection::get_next_alive_node(Ndb_cluster_connection_node_iter &iter)
923 {
924  return m_impl.get_next_alive_node(iter);
925 }
926 
927 unsigned
928 Ndb_cluster_connection::get_active_ndb_objects() const
929 {
930  return m_impl.m_transporter_facade->get_active_ndb_objects();
931 }
932 
934 {
935  return ndb_mgm_set_timeout(m_impl.m_config_retriever->get_mgmHandle(),
936  timeout_ms);
937 }
938 
939 int
940 Ndb_cluster_connection::get_auto_reconnect() const
941 {
942  return m_impl.m_transporter_facade->get_auto_reconnect();
943 }
944 
945 void
947 {
948  m_impl.m_transporter_facade->set_auto_reconnect(value);
949 }
950 
951 Uint32
953 {
954  /* We have a global stats baseline which contains all
955  * the stats for Ndb objects which have been and gone.
956  * Start with that, then add in stats for Ndb objects
957  * currently in use.
958  * Note that despite the lock, this is not thread safe
959  * as we are reading data that other threads may be
960  * concurrently writing. The lock just guards against
961  * concurrent changes to the set of active Ndbs while
962  * we are iterating it.
963  */
964  const Uint32 relevant = MIN((Uint32)Ndb::NumClientStatistics, sz);
965  const Ndb* ndb = NULL;
967  {
968  memcpy(statsArr, &m_impl.globalApiStatsBaseline[0], sizeof(Uint64)*relevant);
969 
970  while((ndb = get_next_ndb_object(ndb)) != NULL)
971  {
972  for (Uint32 i=0; i<relevant; i++)
973  {
974  statsArr[i] += ndb->theImpl->clientStats[i];
975  }
976  }
977  }
979 
980  return relevant;
981 }
982 
984