MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
command.cpp
1 /*
2  Copyright (C) 2008 MySQL AB, 2008-2010 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 
20 #include "atrt.hpp"
21 #include <AtrtClient.hpp>
22 
23 
24 
25 MYSQL* find_atrtdb_client(atrt_config& config)
26 {
27  atrt_cluster* cluster = 0;
28  for (size_t i = 0; i<config.m_clusters.size(); i++)
29  {
30  if (strcmp(config.m_clusters[i]->m_name.c_str(), ".atrt") == 0)
31  {
32  cluster = config.m_clusters[i];
33 
34  for (size_t i = 0; i<cluster->m_processes.size(); i++)
35  {
36  if (cluster->m_processes[i]->m_type == atrt_process::AP_CLIENT)
37  {
38  atrt_process* atrt_client= cluster->m_processes[i];
39  if (!atrt_client)
40  return NULL; /* No atrt db */
41 
42  atrt_process* f_mysqld = atrt_client->m_mysqld;
43  assert(f_mysqld);
44 
45  return &f_mysqld->m_mysql;
46  }
47  }
48  break;
49  }
50  }
51  return NULL;
52 }
53 
54 
55 
56 static bool
57 ack_command(AtrtClient& atrtdb, int command_id, const char* state)
58 {
59  BaseString sql;
60  sql.assfmt("UPDATE command SET state = '%s' WHERE id = %d",
61  state, command_id);
62  return atrtdb.doQuery(sql);
63 }
64 
65 
67 set_env_var(const BaseString& existing,
68  const BaseString& name,
69  const BaseString& value)
70 {
71  /* Split existing on space
72  * (may have issues with env vars with spaces)
73  * Split assignments on =
74  * Where name == name, output new value
75  */
76  BaseString newEnv;
77  Vector<BaseString> assignments;
78  int assignmentCount = existing.split(assignments, BaseString(" "));
79 
80  for (int i=0; i < assignmentCount; i++)
81  {
82  Vector<BaseString> terms;
83  int termCount = assignments[i].split(terms, BaseString("="));
84 
85  if (termCount)
86  {
87  if (strcmp(name.c_str(), terms[0].c_str()) == 0)
88  {
89  /* Found element */
90  newEnv.append(name);
91  newEnv.append('=');
92  newEnv.append(value);
93  }
94  else
95  {
96  newEnv.append(assignments[i]);
97  }
98  }
99  newEnv.append(' ');
100  }
101 
102  return newEnv;
103 }
104 
105 
106 Vector<atrt_process> g_saved_procs;
107 
108 static
109 bool
110 do_change_version(atrt_config& config, SqlResultSet& command,
111  AtrtClient& atrtdb){
115  uint process_id= command.columnAsInt("process_id");
116  const char* process_args= command.column("process_args");
117 
118  g_logger.info("Change version for process: %d, args: %s",
119  process_id, process_args);
120 
121  // Get the process
122  if (process_id > config.m_processes.size()){
123  g_logger.critical("Invalid process id %d", process_id);
124  return false;
125  }
126  atrt_process& proc= *config.m_processes[process_id];
127 
128  const char* new_prefix= g_prefix1 ? g_prefix1 : g_prefix;
129  const char* old_prefix= g_prefix;
130  const char *start= strstr(proc.m_proc.m_path.c_str(), old_prefix);
131  if (!start){
132  /* Process path does not contain old prefix.
133  * Perhaps it contains the new prefix - e.g. is already
134  * upgraded?
135  */
136  if (strstr(proc.m_proc.m_path.c_str(), new_prefix))
137  {
138  /* Process is already upgraded, *assume* that this
139  * is ok
140  * Alternatives could be - error, or downgrade.
141  */
142  g_logger.info("Process already upgraded");
143  return true;
144  }
145 
146  g_logger.critical("Could not find '%s' in '%s'",
147  old_prefix, proc.m_proc.m_path.c_str());
148  return false;
149  }
150 
151  // Save current proc state
152  if (proc.m_save.m_saved == false)
153  {
154  proc.m_save.m_proc= proc.m_proc;
155  proc.m_save.m_saved= true;
156  }
157 
158  g_logger.info("stopping process...");
159  if (!stop_process(proc))
160  return false;
161  BaseString newEnv = set_env_var(proc.m_proc.m_env,
162  BaseString("MYSQL_BASE_DIR"),
163  BaseString(new_prefix));
164  proc.m_proc.m_env.assign(newEnv);
165  BaseString suffix(proc.m_proc.m_path.substr(strlen(old_prefix)));
166  proc.m_proc.m_path.assign(new_prefix).append(suffix);
167  if (process_args && strlen(process_args))
168  {
169  /* Beware too long args */
170  proc.m_proc.m_args.append(" ");
171  proc.m_proc.m_args.append(process_args);
172  }
173 
174  ndbout << proc << endl;
175 
176  g_logger.info("starting process...");
177  if (!start_process(proc))
178  return false;
179  return true;
180 }
181 
182 
183 static
184 bool
185 do_reset_proc(atrt_config& config, SqlResultSet& command,
186  AtrtClient& atrtdb){
187  uint process_id= command.columnAsInt("process_id");
188  g_logger.info("Reset process: %d", process_id);
189 
190  // Get the process
191  if (process_id > config.m_processes.size()){
192  g_logger.critical("Invalid process id %d", process_id);
193  return false;
194  }
195  atrt_process& proc= *config.m_processes[process_id];
196 
197  g_logger.info("stopping process...");
198  if (!stop_process(proc))
199  return false;
200 
201  if (proc.m_save.m_saved)
202  {
203  ndbout << "before: " << proc << endl;
204 
205  proc.m_proc= proc.m_save.m_proc;
206  proc.m_save.m_saved= false;
207  proc.m_proc.m_id= -1;
208 
209  ndbout << "after: " << proc << endl;
210 
211  }
212  else
213  {
214  ndbout << "process has not changed" << endl;
215  }
216 
217  g_logger.info("starting process...");
218  if (!start_process(proc))
219  return false;
220  return true;
221 }
222 
223 
224 bool
225 do_command(atrt_config& config){
226 
227 #ifdef _WIN32
228  return true;
229 #endif
230 
231  MYSQL* mysql= find_atrtdb_client(config);
232  if (!mysql)
233  return true;
234 
235  AtrtClient atrtdb(mysql);
236  SqlResultSet command;
237  if (!atrtdb.doQuery("SELECT * FROM command " \
238  "WHERE state = 'new' ORDER BY id LIMIT 1", command)){
239  g_logger.critical("query failed");
240  return false;
241  }
242 
243  if (command.numRows() == 0)
244  return true;
245 
246  uint id= command.columnAsInt("id");
247  uint cmd= command.columnAsInt("cmd");
248  g_logger.info("Got command, id: %d, cmd: %d", id, cmd);
249  // command.print();
250 
251  // Set state of command to running
252  if (!ack_command(atrtdb, id, "running"))
253  return false;
254 
255  switch (cmd){
256  case AtrtClient::ATCT_CHANGE_VERSION:
257  if (!do_change_version(config, command, atrtdb))
258  return false;
259  break;
260 
261  case AtrtClient::ATCT_RESET_PROC:
262  if (!do_reset_proc(config, command, atrtdb))
263  return false;
264  break;
265 
266  default:
267  command.print();
268  g_logger.error("got unknown command: %d", cmd);
269  return false;
270  }
271 
272  // Set state of command to done
273  if (!ack_command(atrtdb, id, "done"))
274  return false;
275 
276  g_logger.info("done!");
277 
278  return true;
279 }
280 
281 
282 template class Vector<atrt_process>;