MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_gtid_execution.cc
1 /* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or
4  modify it under the terms of the GNU General Public License as
5  published by the Free Software Foundation; version 2 of the
6  License.
7 
8  This program is distributed in the hope that it will be useful, but
9  WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
16  02110-1301 USA */
17 
18 #include "rpl_gtid.h"
19 
20 #include "sql_class.h"
21 #include "binlog.h"
22 #include "transaction.h"
23 #include "rpl_slave.h"
24 #include "rpl_mi.h"
25 #include "sql_parse.h"
26 
27 
41 int gtid_acquire_ownership_single(THD *thd)
42 {
43  DBUG_ENTER("gtid_acquire_ownership_single");
44  int ret= 0;
45  const Gtid gtid_next= thd->variables.gtid_next.gtid;
46  while (true)
47  {
48  global_sid_lock->rdlock();
49  // acquire lock before checking conditions
50  gtid_state->lock_sidno(gtid_next.sidno);
51 
52  // GTID already logged
53  if (gtid_state->is_logged(gtid_next))
54  {
55  /*
56  Don't skip the statement here, skip it in
57  gtid_pre_statement_checks.
58  */
59  break;
60  }
61  my_thread_id owner= gtid_state->get_owner(gtid_next);
62  // GTID not owned by anyone: acquire ownership
63  if (owner == 0)
64  {
65  if (gtid_state->acquire_ownership(thd, gtid_next) != RETURN_STATUS_OK)
66  ret= 1;
67  thd->owned_gtid= gtid_next;
68  break;
69  }
70  // GTID owned by someone (other thread)
71  else
72  {
73  DBUG_ASSERT(owner != thd->id);
74  // The call below releases the read lock on global_sid_lock and
75  // the mutex lock on SIDNO.
76  gtid_state->wait_for_gtid(thd, gtid_next);
77 
78  // global_sid_lock and mutex are now released
79 
80  // Check if thread was killed.
81  if (thd->killed || abort_loop)
82  DBUG_RETURN(1);
83 #ifdef HAVE_REPLICATION
84  // If this thread is a slave SQL thread or slave SQL worker
85  // thread, we need this additional condition to determine if it
86  // has been stopped by STOP SLAVE [SQL_THREAD].
87  if ((thd->system_thread &
88  (SYSTEM_THREAD_SLAVE_SQL | SYSTEM_THREAD_SLAVE_WORKER)) != 0)
89  {
90  // TODO: error is *not* reported on cancel
91  DBUG_ASSERT(active_mi != NULL && active_mi->rli != NULL);
92  if (active_mi->rli->abort_slave)
93  DBUG_RETURN(1);
94  }
95 #endif // HAVE_REPLICATION
96  }
97  }
98  gtid_state->unlock_sidno(gtid_next.sidno);
99  global_sid_lock->unlock();
100  DBUG_RETURN(ret);
101 }
102 
103 
108 #ifdef HAVE_GTID_NEXT_LIST
109 int gtid_acquire_ownership_multiple(THD *thd)
110 {
111  const Gtid_set *gtid_next_list= thd->get_gtid_next_list_const();
112  rpl_sidno greatest_sidno= 0;
113  DBUG_ENTER("gtid_acquire_ownership_multiple");
114  // first check if we need to wait for any group
115  while (true)
116  {
117  Gtid_set::Gtid_iterator git(gtid_next_list);
118  Gtid g= git.get();
119  my_thread_id owner= 0;
120  rpl_sidno last_sidno= 0;
121  global_sid_lock->rdlock();
122  while (g.sidno != 0)
123  {
124  // lock all SIDNOs in order
125  if (g.sidno != last_sidno)
126  gtid_state->lock_sidno(g.sidno);
127  if (!gtid_state->is_logged(g))
128  {
129  owner= gtid_state->get_owner(g);
130  // break the do-loop and wait for the sid to be updated
131  if (owner != 0)
132  {
133  DBUG_ASSERT(owner != thd->id);
134  break;
135  }
136  }
137  last_sidno= g.sidno;
138  greatest_sidno= g.sidno;
139  git.next();
140  g= git.get();
141  }
142 
143  // we don't need to wait for any groups, and all SIDNOs in the
144  // set are locked
145  if (g.sidno == 0)
146  break;
147 
148  // unlock all previous sidnos to avoid blocking them
149  // while waiting. keep lock on g.sidno
150  for (rpl_sidno sidno= 1; sidno < g.sidno; sidno++)
151  if (gtid_next_list->contains_sidno(sidno))
152  gtid_state->unlock_sidno(sidno);
153 
154  // wait. this call releases the read lock on global_sid_lock and
155  // the mutex lock on SIDNO
156  gtid_state->wait_for_gtid(thd, g);
157 
158  // global_sid_lock and mutex are now released
159 
160  // at this point, we don't hold any locks. re-acquire the global
161  // read lock that was held when this function was invoked
162  if (thd->killed || abort_loop)
163  DBUG_RETURN(1);
164 #ifdef HAVE_REPLICATION
165  // If this thread is a slave SQL thread or slave SQL worker
166  // thread, we need this additional condition to determine if it
167  // has been stopped by STOP SLAVE [SQL_THREAD].
168  if ((thd->system_thread &
169  (SYSTEM_THREAD_SLAVE_SQL | SYSTEM_THREAD_SLAVE_WORKER)) != 0)
170  {
171  DBUG_ASSERT(active_mi != NULL && active_mi->rli != NULL);
172  if (active_mi->rli->abort_slave)
173  DBUG_RETURN(1);
174  }
175 #endif // HAVE_REPLICATION
176  }
177 
178  // global_sid_lock is now held
179  thd->owned_gtid_set.ensure_sidno(greatest_sidno);
180 
181  /*
182  Now the following hold:
183  - None of the GTIDs in GTID_NEXT_LIST is owned by any thread.
184  - We hold a lock on global_sid_lock.
185  - We hold a lock on all SIDNOs in GTID_NEXT_LIST.
186  So we acquire ownership of all groups that we need.
187  */
188  int ret= 0;
189  Gtid_set::Gtid_iterator git(gtid_next_list);
190  Gtid g= git.get();
191  do
192  {
193  if (!gtid_state->is_logged(g))
194  {
195  if (gtid_state->acquire_ownership(thd, g) != RETURN_STATUS_OK ||
196  thd->owned_gtid_set._add_gtid(g))
197  {
199  ret= 1;
200  break;
201  }
202  }
203  git.next();
204  g= git.get();
205  } while (g.sidno != 0);
206 
207  // unlock all sidnos
208  rpl_sidno max_sidno= gtid_next_list->get_max_sidno();
209  for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
210  if (gtid_next_list->contains_sidno(sidno))
211  gtid_state->unlock_sidno(sidno);
212 
213  global_sid_lock->unlock();
214 
215  DBUG_RETURN(ret);
216 }
217 #endif
218 
219 
229 static inline bool is_already_logged_transaction(const THD *thd)
230 {
231  DBUG_ENTER("is_already_logged_transaction");
232 
233  const Gtid_specification *gtid_next= &thd->variables.gtid_next;
234  const Gtid_set *gtid_next_list= thd->get_gtid_next_list_const();
235 
236  if (gtid_next_list == NULL)
237  {
238  if (gtid_next->type == GTID_GROUP)
239  {
240  if (thd->owned_gtid.sidno == 0)
241  DBUG_RETURN(true);
242  else
243  DBUG_ASSERT(thd->owned_gtid.equals(gtid_next->gtid));
244  }
245  else
246  DBUG_ASSERT(thd->owned_gtid.sidno == 0);
247  }
248  else
249  {
250 #ifdef HAVE_GTID_NEXT_LIST
251  if (gtid_next->type == GTID_GROUP)
252  {
253  DBUG_ASSERT(gtid_next_list->contains_gtid(gtid_next->gtid));
254  if (!thd->owned_gtid_set.contains_gtid(gtid_next->gtid))
255  DBUG_RETURN(true);
256  }
257 #else
258  DBUG_ASSERT(0);/*NOTREACHED*/
259 #endif
260  }
261 
262  DBUG_RETURN(false);
263 }
264 
265 
274 static inline enum_gtid_statement_status skip_statement(const THD *thd)
275 {
276  DBUG_ENTER("skip_statement");
277 
278  DBUG_PRINT("info", ("skipping statement '%s'. "
279  "gtid_next->type=%d sql_command=%d "
280  "thd->thread_id=%lu",
281  thd->query(),
282  thd->variables.gtid_next.type,
283  thd->lex->sql_command,
284  thd->thread_id));
285 
286 #ifndef DBUG_OFF
287  const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
288  global_sid_lock->rdlock();
289  DBUG_ASSERT(logged_gtids->contains_gtid(thd->variables.gtid_next.gtid));
290  global_sid_lock->unlock();
291 #endif
292 
293  DBUG_RETURN(GTID_STATEMENT_SKIP);
294 }
295 
296 
297 enum_gtid_statement_status gtid_pre_statement_checks(const THD *thd)
298 {
299  DBUG_ENTER("gtid_pre_statement_checks");
300 
301  if (enforce_gtid_consistency && !thd->is_ddl_gtid_compatible())
302  {
303  // error message has been generated by thd->is_ddl_gtid_compatible()
304  DBUG_RETURN(GTID_STATEMENT_CANCEL);
305  }
306 
307  const Gtid_specification *gtid_next= &thd->variables.gtid_next;
308  if (stmt_causes_implicit_commit(thd, CF_IMPLICIT_COMMIT_BEGIN) &&
309  thd->in_active_multi_stmt_transaction() &&
310  gtid_next->type != AUTOMATIC_GROUP)
311  {
312  my_error(ER_CANT_DO_IMPLICIT_COMMIT_IN_TRX_WHEN_GTID_NEXT_IS_SET, MYF(0));
313  DBUG_RETURN(GTID_STATEMENT_CANCEL);
314  }
315 
316  /*
317  never skip BEGIN/COMMIT.
318 
319  @todo: add flag to sql_command_flags to detect if statement
320  controls transactions instead of listing the commands in the
321  condition below
322 
323  @todo: figure out how to handle SQLCOM_XA_*
324  */
325  enum_sql_command sql_command= thd->lex->sql_command;
326  if (sql_command == SQLCOM_COMMIT || sql_command == SQLCOM_BEGIN ||
327  sql_command == SQLCOM_ROLLBACK ||
328  ((sql_command == SQLCOM_SELECT ||
329  (sql_command == SQLCOM_SET_OPTION && !thd->lex->is_set_password_sql)) &&
330  !thd->lex->uses_stored_routines()))
331  DBUG_RETURN(GTID_STATEMENT_EXECUTE);
332 
333  /*
334  If a transaction updates both non-transactional and transactional
335  or more then one non-transactional tables it must be stopped, this
336  is the case when on master all updated tables are transactional but
337  on slave at least one is non-transactional, e.g.:
338 
339  On master, tables are transactional:
340  CREATE TABLE t1 (a INT) Engine=InnoDB;
341  CREATE TABLE t2 (a INT) Engine=InnoDB;
342  On slave, one table is non-transactional:
343  CREATE TABLE t1 (a INT) Engine=MyISAM;
344  CREATE TABLE t2 (a INT) Engine=InnoDB;
345  On master, user executes:
346  BEGIN;
347  INSERT INTO t1 VALUES (1);
348  INSERT INTO t2 VALUES (1);
349  COMMIT;
350  On slave, the second statement must error due to a second statement
351  being executed after a statement that updated a non-transactional
352  table.
353  */
354  if (UNDEFINED_GROUP == gtid_next->type)
355  {
356  char buf[Gtid::MAX_TEXT_LENGTH + 1];
357  global_sid_lock->rdlock();
358  gtid_next->to_string(global_sid_map, buf);
359  global_sid_lock->unlock();
360  my_error(ER_GTID_NEXT_TYPE_UNDEFINED_GROUP, MYF(0), buf);
361  DBUG_RETURN(GTID_STATEMENT_CANCEL);
362  }
363 
364  const Gtid_set *gtid_next_list= thd->get_gtid_next_list_const();
365 
366  DBUG_PRINT("info", ("gtid_next_list=%p gtid_next->type=%d "
367  "thd->owned_gtid.gtid.{sidno,gno}={%d,%lld} "
368  "thd->thread_id=%lu",
369  gtid_next_list, gtid_next->type,
370  thd->owned_gtid.sidno,
371  thd->owned_gtid.gno,
372  (ulong)thd->thread_id));
373 
374  const bool skip_transaction= is_already_logged_transaction(thd);
375  if (gtid_next_list == NULL)
376  {
377  if (skip_transaction)
378  DBUG_RETURN(skip_statement(thd));
379  DBUG_RETURN(GTID_STATEMENT_EXECUTE);
380  }
381  else
382  {
383 #ifdef HAVE_GTID_NEXT_LIST
384  switch (gtid_next->type)
385  {
386  case AUTOMATIC_GROUP:
387  my_error(ER_GTID_NEXT_CANT_BE_AUTOMATIC_IF_GTID_NEXT_LIST_IS_NON_NULL,
388  MYF(0));
389  DBUG_RETURN(GTID_STATEMENT_CANCEL);
390  case GTID_GROUP:
391  if (skip_transaction)
392  DBUG_RETURN(skip_statement(thd));
393  /*FALLTHROUGH*/
394  case ANONYMOUS_GROUP:
395  DBUG_RETURN(GTID_STATEMENT_EXECUTE);
396  case INVALID_GROUP:
397  DBUG_ASSERT(0);/*NOTREACHED*/
398  }
399 #else
400  DBUG_ASSERT(0);/*NOTREACHED*/
401 #endif
402  }
403  DBUG_ASSERT(0);/*NOTREACHED*/
404  DBUG_RETURN(GTID_STATEMENT_CANCEL);
405 }
406 
407 
408 void gtid_post_statement_checks(THD *thd)
409 {
410  DBUG_ENTER("gtid_post_statement_checks");
411  const enum_sql_command sql_command= thd->lex->sql_command;
412 
413  /*
414  If transaction is terminated we set GTID_NEXT type to
415  UNDEFINED_GROUP, to prevent that the same GTID is used for another
416  transaction (same GTID here means that user only set
417  GTID_NEXT= GTID_GROUP once for two transactions).
418 
419  If the current statement:
420  implict commits
421  OR
422  is SQLCOM_SET_OPTION AND is SET PASSWORD
423  OR
424  is commit
425  OR
426  is rollback
427  that means the transaction is terminated and we set GTID_NEXT type
428  to UNDEFINED_GROUP.
429 
430  SET AUTOCOMMIT=1 statement is handled on Gtid_state::update_on_flush().
431  */
432  if (thd->variables.gtid_next.type == GTID_GROUP &&
433  thd->get_command() != COM_STMT_PREPARE &&
434  (stmt_causes_implicit_commit(thd, CF_IMPLICIT_COMMIT_BEGIN) ||
435  (sql_command == SQLCOM_SET_OPTION && thd->lex->is_set_password_sql) ||
436  sql_command == SQLCOM_COMMIT ||
437  sql_command == SQLCOM_ROLLBACK))
438  thd->variables.gtid_next.set_undefined();
439 
440  DBUG_VOID_RETURN;
441 }
442 
443 
444 int gtid_rollback(THD *thd)
445 {
446  DBUG_ENTER("gtid_rollback");
447 
448  global_sid_lock->rdlock();
449  gtid_state->update_on_rollback(thd);
450  global_sid_lock->unlock();
451 
452  DBUG_RETURN(0);
453 }