MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
db.cpp
1 /*
2  Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "atrt.hpp"
19 #include <NdbSleep.h>
20 
21 static bool connect_mysqld(atrt_process* proc);
22 static bool populate_db(atrt_config&, atrt_process*);
23 static bool setup_repl(atrt_config&);
24 
25 static
26 bool
27 run_query(atrt_process* proc, const char * query)
28 {
29  MYSQL* mysql = &proc->m_mysql;
30  g_logger.debug("'%s@%s' - Running query '%s'",
31  proc->m_cluster->m_name.c_str(),
32  proc->m_host->m_hostname.c_str(),
33  query);
34 
35  if (mysql_query(mysql, query))
36  {
37  g_logger.error("'%s@%s' - Failed to run query '%s' %d:%s",
38  proc->m_cluster->m_name.c_str(),
39  proc->m_host->m_hostname.c_str(),
40  query,
41  mysql_errno(mysql),
42  mysql_error(mysql));
43  return false;
44  }
45  return true;
46 }
47 
48 static const char* create_sql[] = {
49 "create database atrt",
50 
51 "use atrt",
52 
53 "create table host ("
54 " id int primary key,"
55 " name varchar(255),"
56 " port int unsigned,"
57 " unique(name, port)"
58 ") engine = myisam;",
59 
60 "create table cluster ("
61 " id int primary key,"
62 " name varchar(255),"
63 " unique(name)"
64 " ) engine = myisam;",
65 
66 "create table process ("
67 " id int primary key,"
68 " host_id int not null,"
69 " cluster_id int not null,"
70 " node_id int not null,"
71 " type enum ('ndbd', 'ndbapi', 'ndb_mgmd', 'mysqld', 'mysql') not null,"
72 " state enum ('starting', 'started', 'stopping', 'stopped') not null"
73 " ) engine = myisam;",
74 
75 "create table options ("
76 " id int primary key,"
77 " process_id int not null,"
78 " name varchar(255) not null,"
79 " value varchar(255) not null"
80 " ) engine = myisam;",
81 
82 "create table repl ("
83 " id int auto_increment primary key,"
84 " master_id int not null,"
85 " slave_id int not null"
86 " ) engine = myisam;",
87 
88 "create table command ("
89 " id int auto_increment primary key,"
90 " state enum ('new', 'running', 'done') not null default 'new',"
91 " cmd int not null,"
92 " process_id int not null,"
93 " process_args varchar(255) default NULL"
94 " ) engine = myisam;",
95 
96  0};
97 
98 bool
99 setup_db(atrt_config& config)
100 {
104  atrt_process* atrt_client = 0;
105  {
106  atrt_cluster* cluster = 0;
107  for (size_t i = 0; i<config.m_clusters.size(); i++)
108  {
109  if (strcmp(config.m_clusters[i]->m_name.c_str(), ".atrt") == 0)
110  {
111  cluster = config.m_clusters[i];
112 
113  for (size_t i = 0; i<cluster->m_processes.size(); i++)
114  {
115  if (cluster->m_processes[i]->m_type == atrt_process::AP_CLIENT)
116  {
117  atrt_client = cluster->m_processes[i];
118  break;
119  }
120  }
121  break;
122  }
123  }
124  }
125 
129 #ifndef _WIN32
130  for (size_t i = 0; i<config.m_processes.size(); i++)
131  {
132  atrt_process * proc = config.m_processes[i];
133  if (proc->m_type == atrt_process::AP_MYSQLD)
134  {
135  if (!connect_mysqld(config.m_processes[i]))
136  return false;
137  }
138  }
139 
140  if (atrt_client)
141  {
142  atrt_process* atrt_mysqld = atrt_client->m_mysqld;
143  assert(atrt_mysqld);
144 
145  // Run the commands to create the db
146  for (int i = 0; create_sql[i]; i++)
147  {
148  const char* query = create_sql[i];
149  if (!run_query(atrt_mysqld, query))
150  return false;
151  }
152 
153  if (!populate_db(config, atrt_mysqld))
154  return false;
155  }
156 
160  if (setup_repl(config) != true)
161  return false;
162  #endif
163 
164  return true;
165 }
166 
167 static
168 const char*
169 find(atrt_process* proc, const char * key)
170 {
171  const char * res = 0;
172  if (proc->m_options.m_loaded.get(key, &res))
173  return res;
174 
175  proc->m_options.m_generated.get(key, &res);
176  return res;
177 }
178 
179 bool
180 connect_mysqld(atrt_process* proc)
181 {
182  if ( !mysql_init(&proc->m_mysql))
183  {
184  g_logger.error("Failed to init mysql");
185  return false;
186  }
187 
188  const char * port = find(proc, "--port=");
189  const char * socket = find(proc, "--socket=");
190  if (port == 0 && socket == 0)
191  {
192  g_logger.error("Neither socket nor port specified...cant connect to mysql");
193  return false;
194  }
195 
196  for (size_t i = 0; i<20; i++)
197  {
198  if (port)
199  {
200  mysql_protocol_type val = MYSQL_PROTOCOL_TCP;
201  mysql_options(&proc->m_mysql, MYSQL_OPT_PROTOCOL, &val);
202  }
203  if (mysql_real_connect(&proc->m_mysql,
204  proc->m_host->m_hostname.c_str(),
205  "root", "", "test",
206  port ? atoi(port) : 0,
207  socket,
208  0))
209  {
210  return true;
211  }
212  g_logger.info("Retrying connect to %s:%u 3s",
213  proc->m_host->m_hostname.c_str(),atoi(port));
214  NdbSleep_SecSleep(3);
215  }
216 
217  g_logger.error("Failed to connect to mysqld err: >%s< >%s:%u:%s<",
218  mysql_error(&proc->m_mysql),
219  proc->m_host->m_hostname.c_str(), port ? atoi(port) : 0,
220  socket ? socket : "<null>");
221  return false;
222 }
223 
224 void
225 BINDI(MYSQL_BIND& bind, int * i)
226 {
227  bind.buffer_type= MYSQL_TYPE_LONG;
228  bind.buffer= (char*)i;
229  bind.is_unsigned= 0;
230  bind.is_null= 0;
231 }
232 
233 void
234 BINDS(MYSQL_BIND& bind, const char * s, unsigned long * len)
235 {
236  bind.buffer_type= MYSQL_TYPE_STRING;
237  bind.buffer= (char*)s;
238  bind.buffer_length= * len = strlen(s);
239  bind.length= len;
240  bind.is_null= 0;
241 }
242 
243 template <typename T>
244 int
245 find(T* obj, Vector<T*>& arr)
246 {
247  for (size_t i = 0; i<arr.size(); i++)
248  if (arr[i] == obj)
249  return (int)i;
250  abort();
251  return -1;
252 }
253 
254 static
255 bool
256 populate_options(MYSQL* mysql, MYSQL_STMT* stmt, int* option_id,
257  int process_id, Properties* p)
258 {
259  int kk = *option_id;
260  Properties::Iterator it(p);
261  const char * name = it.first();
262  for (; name; name = it.next())
263  {
264  int optid = kk;
265  int proc_id = process_id;
266  unsigned long l0, l1;
267  const char * value;
268  p->get(name, &value);
269  MYSQL_BIND bind2[4];
270  bzero(bind2, sizeof(bind2));
271  BINDI(bind2[0], &optid);
272  BINDI(bind2[1], &proc_id);
273  BINDS(bind2[2], name, &l0);
274  BINDS(bind2[3], value, &l1);
275 
276  if (mysql_stmt_bind_param(stmt, bind2))
277  {
278  g_logger.error("Failed to bind: %s", mysql_error(mysql));
279  return false;
280  }
281 
282  if (mysql_stmt_execute(stmt))
283  {
284  g_logger.error("0 Failed to execute: %s", mysql_error(mysql));
285  return false;
286  }
287  kk++;
288  }
289  *option_id = kk;
290  return true;
291 }
292 
293 static
294 bool
295 populate_db(atrt_config& config, atrt_process* mysqld)
296 {
297  {
298  const char * sql = "INSERT INTO host (id, name, port) values (?, ?, ?)";
299  MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
300  if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
301  {
302  g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
303  return false;
304  }
305 
306  for (size_t i = 0; i<config.m_hosts.size(); i++)
307  {
308  unsigned long l0;
309  MYSQL_BIND bind[3];
310  bzero(bind, sizeof(bind));
311  int id = i;
312  int port = config.m_hosts[i]->m_cpcd->getPort();
313  BINDI(bind[0], &id);
314  BINDS(bind[1], config.m_hosts[i]->m_hostname.c_str(), &l0);
315  BINDI(bind[2], &port);
316  if (mysql_stmt_bind_param(stmt, bind))
317  {
318  g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
319  return false;
320  }
321 
322  if (mysql_stmt_execute(stmt))
323  {
324  g_logger.error("1 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
325  return false;
326  }
327  }
328  mysql_stmt_close(stmt);
329  }
330 
331  {
332  const char * sql = "INSERT INTO cluster (id, name) values (?, ?)";
333  MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
334  if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
335  {
336  g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
337  return false;
338  }
339 
340  for (size_t i = 0; i<config.m_clusters.size(); i++)
341  {
342  unsigned long l0;
343  MYSQL_BIND bind[2];
344  bzero(bind, sizeof(bind));
345  int id = i;
346  BINDI(bind[0], &id);
347  BINDS(bind[1], config.m_clusters[i]->m_name.c_str(), &l0);
348 
349  if (mysql_stmt_bind_param(stmt, bind))
350  {
351  g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
352  return false;
353  }
354 
355  if (mysql_stmt_execute(stmt))
356  {
357  g_logger.error("2 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
358  return false;
359  }
360  }
361  mysql_stmt_close(stmt);
362  }
363 
364  {
365  const char * sql =
366  "INSERT INTO process (id, host_id, cluster_id, type, state, node_id) values (?,?,?,?,?,?)";
367 
368  const char * sqlopt =
369  "INSERT INTO options (id, process_id, name, value) values (?,?,?,?)";
370 
371  MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
372  if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
373  {
374  g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
375  return false;
376  }
377 
378  MYSQL_STMT * stmtopt = mysql_stmt_init(&mysqld->m_mysql);
379  if (mysql_stmt_prepare(stmtopt, sqlopt, strlen(sqlopt)))
380  {
381  g_logger.error("Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
382  return false;
383  }
384 
385  int option_id = 0;
386  for (size_t i = 0; i<config.m_processes.size(); i++)
387  {
388  unsigned long l0, l1;
389  MYSQL_BIND bind[6];
390  bzero(bind, sizeof(bind));
391  int id = i;
392  atrt_process* proc = config.m_processes[i];
393  int host_id = find(proc->m_host, config.m_hosts);
394  int cluster_id = find(proc->m_cluster, config.m_clusters);
395  int node_id= proc->m_nodeid;
396 
397  const char * type = 0;
398  const char * state = "started";
399  switch(proc->m_type){
400  case atrt_process::AP_NDBD: type = "ndbd"; break;
401  case atrt_process::AP_NDB_API: type = "ndbapi"; state = "stopped";break;
402  case atrt_process::AP_NDB_MGMD: type = "ndb_mgmd"; break;
403  case atrt_process::AP_MYSQLD: type = "mysqld"; break;
404  case atrt_process::AP_CLIENT: type = "mysql"; state = "stopped";break;
405  default:
406  abort();
407  }
408 
409  BINDI(bind[0], &id);
410  BINDI(bind[1], &host_id);
411  BINDI(bind[2], &cluster_id);
412  BINDS(bind[3], type, &l0);
413  BINDS(bind[4], state, &l1);
414  BINDI(bind[5], &node_id);
415 
416  if (mysql_stmt_bind_param(stmt, bind))
417  {
418  g_logger.error("Failed to bind: %s", mysql_error(&mysqld->m_mysql));
419  return false;
420  }
421 
422  if (mysql_stmt_execute(stmt))
423  {
424  g_logger.error("3 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
425  return false;
426  }
427 
428  if (populate_options(&mysqld->m_mysql, stmtopt, &option_id, id,
429  &proc->m_options.m_loaded) == false)
430  return false;
431 
432  if (populate_options(&mysqld->m_mysql, stmtopt, &option_id, id,
433  &proc->m_cluster->m_options.m_loaded) == false)
434  return false;
435 
436  }
437  mysql_stmt_close(stmt);
438  mysql_stmt_close(stmtopt);
439  }
440 
441  return true;
442 }
443 
444 static
445 bool
446 setup_repl(atrt_process* dst, atrt_process* src)
447 {
448  if (!run_query(src, "STOP SLAVE"))
449  {
450  g_logger.error("Failed to stop slave: %s",
451  mysql_error(&src->m_mysql));
452  return false;
453  }
454 
455  if (!run_query(src, "RESET SLAVE"))
456  {
457  g_logger.error("Failed to reset slave: %s",
458  mysql_error(&src->m_mysql));
459  return false;
460  }
461 
462  BaseString tmp;
463  tmp.assfmt("CHANGE MASTER TO "
464  " MASTER_HOST='%s', "
465  " MASTER_PORT=%u ",
466  dst->m_host->m_hostname.c_str(),
467  atoi(find(dst, "--port=")));
468 
469  if (!run_query(src, tmp.c_str()))
470  {
471  g_logger.error("Failed to setup repl from %s to %s: %s",
472  src->m_host->m_hostname.c_str(),
473  dst->m_host->m_hostname.c_str(),
474  mysql_error(&src->m_mysql));
475  return false;
476  }
477 
478  if (!run_query(src, "START SLAVE"))
479  {
480  g_logger.error("Failed to start slave: %s",
481  mysql_error(&src->m_mysql));
482  return false;
483  }
484 
485  g_logger.info("Replication from %s(%s) to %s(%s) setup",
486  src->m_host->m_hostname.c_str(),
487  src->m_cluster->m_name.c_str(),
488  dst->m_host->m_hostname.c_str(),
489  dst->m_cluster->m_name.c_str());
490 
491  return true;
492 }
493 
494 bool
495 setup_repl(atrt_config& config)
496 {
497  for (size_t i = 0; i<config.m_processes.size(); i++)
498  {
499  atrt_process * dst = config.m_processes[i];
500  if (dst->m_rep_src)
501  {
502  if (setup_repl(dst->m_rep_src, dst) != true)
503  return false;
504  }
505  }
506  return true;
507 }
508 
509 template int find(atrt_host* obj, Vector<atrt_host*>& arr);
510 template int find(atrt_cluster* obj, Vector<atrt_cluster*>& arr);
511