MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ha_ndbcluster_connection.cc
1 /*
2  Copyright (C) 2000-2003 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "ha_ndbcluster_glue.h"
20 
21 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
22 #include <ndbapi/NdbApi.hpp>
23 #include <portlib/NdbTick.h>
24 #include "ha_ndbcluster_connection.h"
25 
26 Ndb* g_ndb= NULL;
27 Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
28 static Ndb_cluster_connection **g_pool= NULL;
29 static uint g_pool_alloc= 0;
30 static uint g_pool_pos= 0;
31 static pthread_mutex_t g_pool_mutex;
32 
33 /*
34  Global flag in ndbapi to specify if api should wait to connect
35  until dict cache is clean.
36 
37  Set to 1 below to not wait, as ndb handler makes sure that no
38  old ndb objects are used.
39 */
40 extern int global_flag_skip_waiting_for_clean_cache;
41 
42 int
43 ndbcluster_connect(int (*connect_callback)(void),
44  ulong wait_connected,
45  uint connection_pool_size,
46  bool optimized_node_select,
47  const char* connect_string,
48  uint force_nodeid)
49 {
50  NDB_TICKS end_time;
51 
52 #ifndef EMBEDDED_LIBRARY
53  const char mysqld_name[]= "mysqld";
54 #else
55  const char mysqld_name[]= "libmysqld";
56 #endif
57  int res;
58  DBUG_ENTER("ndbcluster_connect");
59  DBUG_PRINT("enter", ("connect_string: %s, force_nodeid: %d",
60  connect_string, force_nodeid));
61 
62  global_flag_skip_waiting_for_clean_cache= 1;
63 
64  g_ndb_cluster_connection=
65  new Ndb_cluster_connection(connect_string, force_nodeid);
66  if (!g_ndb_cluster_connection)
67  {
68  sql_print_error("NDB: failed to allocate global ndb cluster connection");
69  DBUG_PRINT("error", ("Ndb_cluster_connection(%s)", connect_string));
70  my_errno= HA_ERR_OUT_OF_MEM;
71  DBUG_RETURN(-1);
72  }
73  {
74  char buf[128];
75  my_snprintf(buf, sizeof(buf), "%s --server-id=%lu",
76  mysqld_name, server_id);
77  g_ndb_cluster_connection->set_name(buf);
78  }
79  g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select);
80 
81  // Create a Ndb object to open the connection to NDB
82  if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
83  {
84  sql_print_error("NDB: failed to allocate global ndb object");
85  DBUG_PRINT("error", ("failed to create global ndb object"));
86  my_errno= HA_ERR_OUT_OF_MEM;
87  DBUG_RETURN(-1);
88  }
89  if (g_ndb->init() != 0)
90  {
91  DBUG_PRINT("error", ("%d message: %s",
92  g_ndb->getNdbError().code,
93  g_ndb->getNdbError().message));
94  DBUG_RETURN(-1);
95  }
96 
97  /* Connect to management server */
98 
99  end_time= NdbTick_CurrentMillisecond();
100  end_time+= 1000 * wait_connected;
101 
102  while ((res= g_ndb_cluster_connection->connect(0,0,0)) == 1)
103  {
104  if (NdbTick_CurrentMillisecond() > end_time)
105  break;
106  do_retry_sleep(100);
107  if (abort_loop)
108  DBUG_RETURN(-1);
109  }
110 
111  {
112  g_pool_alloc= connection_pool_size;
113  g_pool= (Ndb_cluster_connection**)
114  my_malloc(g_pool_alloc * sizeof(Ndb_cluster_connection*),
115  MYF(MY_WME | MY_ZEROFILL));
116  pthread_mutex_init(&g_pool_mutex,
117  MY_MUTEX_INIT_FAST);
118  g_pool[0]= g_ndb_cluster_connection;
119  for (uint i= 1; i < g_pool_alloc; i++)
120  {
121  if ((g_pool[i]=
122  new Ndb_cluster_connection(connect_string,
123  g_ndb_cluster_connection)) == 0)
124  {
125  sql_print_error("NDB[%u]: failed to allocate cluster connect object",
126  i);
127  DBUG_PRINT("error",("Ndb_cluster_connection[%u](%s)",
128  i, connect_string));
129  DBUG_RETURN(-1);
130  }
131  {
132  char buf[128];
133  my_snprintf(buf, sizeof(buf), "%s --server-id=%lu (connection %u)",
134  mysqld_name, server_id, i+1);
135  g_pool[i]->set_name(buf);
136  }
137  g_pool[i]->set_optimized_node_selection(optimized_node_select);
138  }
139  }
140 
141  if (res == 0)
142  {
143  connect_callback();
144  for (uint i= 0; i < g_pool_alloc; i++)
145  {
146  int node_id= g_pool[i]->node_id();
147  if (node_id == 0)
148  {
149  // not connected to mgmd yet, try again
150  g_pool[i]->connect(0,0,0);
151  if (g_pool[i]->node_id() == 0)
152  {
153  sql_print_warning("NDB[%u]: starting connect thread", i);
154  g_pool[i]->start_connect_thread();
155  continue;
156  }
157  node_id= g_pool[i]->node_id();
158  }
159  DBUG_PRINT("info",
160  ("NDBCLUSTER storage engine (%u) at %s on port %d", i,
161  g_pool[i]->get_connected_host(),
162  g_pool[i]->get_connected_port()));
163 
164  NDB_TICKS now_time;
165  do
166  {
167  res= g_pool[i]->wait_until_ready(1, 1);
168  now_time= NdbTick_CurrentMillisecond();
169  } while (res != 0 && now_time < end_time);
170 
171  const char *msg= 0;
172  if (res == 0)
173  {
174  msg= "all storage nodes connected";
175  }
176  else if (res > 0)
177  {
178  msg= "some storage nodes connected";
179  }
180  else if (res < 0)
181  {
182  msg= "no storage nodes connected (timed out)";
183  }
184  sql_print_information("NDB[%u]: NodeID: %d, %s",
185  i, node_id, msg);
186  }
187  }
188  else if (res == 1)
189  {
190  for (uint i= 0; i < g_pool_alloc; i++)
191  {
192  if (g_pool[i]->
193  start_connect_thread(i == 0 ? connect_callback : NULL))
194  {
195  sql_print_error("NDB[%u]: failed to start connect thread", i);
196  DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
197  DBUG_RETURN(-1);
198  }
199  }
200 #ifndef DBUG_OFF
201  {
202  char buf[1024];
203  DBUG_PRINT("info",
204  ("NDBCLUSTER storage engine not started, "
205  "will connect using %s",
206  g_ndb_cluster_connection->
207  get_connectstring(buf,sizeof(buf))));
208  }
209 #endif
210  }
211  else
212  {
213  DBUG_ASSERT(res == -1);
214  DBUG_PRINT("error", ("permanent error"));
215  sql_print_error("NDB: error (%u) %s",
216  g_ndb_cluster_connection->get_latest_error(),
217  g_ndb_cluster_connection->get_latest_error_msg());
218  DBUG_RETURN(-1);
219  }
220  DBUG_RETURN(0);
221 }
222 
223 void ndbcluster_disconnect(void)
224 {
225  DBUG_ENTER("ndbcluster_disconnect");
226  if (g_ndb)
227  delete g_ndb;
228  g_ndb= NULL;
229  {
230  if (g_pool)
231  {
232  /* first in pool is the main one, wait with release */
233  for (uint i= 1; i < g_pool_alloc; i++)
234  {
235  if (g_pool[i])
236  delete g_pool[i];
237  }
238  my_free((uchar*) g_pool, MYF(MY_ALLOW_ZERO_PTR));
239  pthread_mutex_destroy(&g_pool_mutex);
240  g_pool= 0;
241  }
242  g_pool_alloc= 0;
243  g_pool_pos= 0;
244  }
245  if (g_ndb_cluster_connection)
246  delete g_ndb_cluster_connection;
247  g_ndb_cluster_connection= NULL;
248  DBUG_VOID_RETURN;
249 }
250 
251 Ndb_cluster_connection *ndb_get_cluster_connection()
252 {
253  pthread_mutex_lock(&g_pool_mutex);
254  Ndb_cluster_connection *connection= g_pool[g_pool_pos];
255  g_pool_pos++;
256  if (g_pool_pos == g_pool_alloc)
257  g_pool_pos= 0;
258  pthread_mutex_unlock(&g_pool_mutex);
259  return connection;
260 }
261 
262 ulonglong ndb_get_latest_trans_gci()
263 {
264  ulonglong val= *g_ndb_cluster_connection->get_latest_trans_gci();
265  for (uint i= 1; i < g_pool_alloc; i++)
266  {
267  ulonglong tmp= *g_pool[i]->get_latest_trans_gci();
268  if (tmp > val)
269  val= tmp;
270  }
271  return val;
272 }
273 
274 void ndb_set_latest_trans_gci(ulonglong val)
275 {
276  for (uint i= 0; i < g_pool_alloc; i++)
277  {
278  *g_pool[i]->get_latest_trans_gci()= val;
279  }
280 }
281 
282 int ndb_has_node_id(uint id)
283 {
284  for (uint i= 0; i < g_pool_alloc; i++)
285  {
286  if (id == g_pool[i]->node_id())
287  return 1;
288  }
289  return 0;
290 }
291 
292 void ndb_get_connection_stats(Uint64* statsArr)
293 {
294  Uint64 connectionStats[ Ndb::NumClientStatistics ];
295  memset(statsArr, 0, sizeof(connectionStats));
296 
297  for (uint i=0; i < g_pool_alloc; i++)
298  {
299  g_pool[i]->collect_client_stats(connectionStats, Ndb::NumClientStatistics);
300 
301  for (Uint32 s=0; s < Ndb::NumClientStatistics; s++)
302  statsArr[s]+= connectionStats[s];
303  }
304 }
305 
306 #endif /* WITH_NDBCLUSTER_STORAGE_ENGINE */