MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_rli.cc
1 /* Copyright (c) 2006, 2013 Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software Foundation,
14  51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
15 
16 #include "sql_priv.h"
17 #include "unireg.h" // HAVE_*
18 #include "rpl_mi.h"
19 #include "rpl_rli.h"
20 #include "sql_base.h" // close_thread_tables
21 #include <my_dir.h> // For MY_STAT
22 #include "log_event.h" // Format_description_log_event, Log_event,
23  // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
24  // PREFIX_SQL_LOAD
25 #include "rpl_slave.h"
26 #include "rpl_utility.h"
27 #include "transaction.h"
28 #include "sql_parse.h" // end_trans, ROLLBACK
29 #include "rpl_slave.h"
30 #include "rpl_rli_pdb.h"
31 #include "rpl_info_factory.h"
32 #include <mysql/plugin.h>
33 #include <mysql/service_thd_wait.h>
34 
35 using std::min;
36 using std::max;
37 
38 /*
39  Please every time you add a new field to the relay log info, update
40  what follows. For now, this is just used to get the number of
41  fields.
42 */
43 const char* info_rli_fields[]=
44 {
45  "number_of_lines",
46  "group_relay_log_name",
47  "group_relay_log_pos",
48  "group_master_log_name",
49  "group_master_log_pos",
50  "sql_delay",
51  "number_of_workers",
52  "id"
53 };
54 
55 Relay_log_info::Relay_log_info(bool is_slave_recovery
56 #ifdef HAVE_PSI_INTERFACE
57  ,PSI_mutex_key *param_key_info_run_lock,
58  PSI_mutex_key *param_key_info_data_lock,
59  PSI_mutex_key *param_key_info_sleep_lock,
60  PSI_mutex_key *param_key_info_data_cond,
61  PSI_mutex_key *param_key_info_start_cond,
62  PSI_mutex_key *param_key_info_stop_cond,
63  PSI_mutex_key *param_key_info_sleep_cond
64 #endif
65  , uint param_id
66  )
67  :Rpl_info("SQL"
68 #ifdef HAVE_PSI_INTERFACE
69  ,param_key_info_run_lock, param_key_info_data_lock,
70  param_key_info_sleep_lock,
71  param_key_info_data_cond, param_key_info_start_cond,
72  param_key_info_stop_cond, param_key_info_sleep_cond
73 #endif
74  , param_id
75  ),
76  replicate_same_server_id(::replicate_same_server_id),
77  cur_log_fd(-1), relay_log(&sync_relaylog_period),
78  is_relay_log_recovery(is_slave_recovery),
79  save_temporary_tables(0),
80  cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
81  group_master_log_pos(0),
82  gtid_set(global_sid_map, global_sid_lock),
83  log_space_total(0), ignore_log_space_limit(0),
84  sql_force_rotate_relay(false),
85  last_master_timestamp(0), slave_skip_counter(0),
86  abort_pos_wait(0), until_condition(UNTIL_NONE),
87  until_log_pos(0),
88  until_sql_gtids(global_sid_map),
89  until_sql_gtids_first_event(true),
90  retried_trans(0),
91  tables_to_lock(0), tables_to_lock_count(0),
92  rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
93  slave_parallel_workers(0),
94  recovery_parallel_workers(0), checkpoint_seqno(0),
95  checkpoint_group(opt_mts_checkpoint_group),
96  recovery_groups_inited(false), mts_recovery_group_cnt(0),
97  mts_recovery_index(0), mts_recovery_group_seen_begin(0),
98  mts_group_status(MTS_NOT_IN_GROUP), reported_unsafe_warning(false),
99  rli_description_event(NULL),
100  sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
101  long_find_row_note_printed(false), error_on_rli_init_info(false)
102 {
103  DBUG_ENTER("Relay_log_info::Relay_log_info");
104 
105 #ifdef HAVE_PSI_INTERFACE
106  relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
107  key_RELAYLOG_LOCK_commit,
108  key_RELAYLOG_LOCK_commit_queue,
109  key_RELAYLOG_LOCK_done,
110  key_RELAYLOG_LOCK_flush_queue,
111  key_RELAYLOG_LOCK_log,
112  key_RELAYLOG_LOCK_sync,
113  key_RELAYLOG_LOCK_sync_queue,
114  key_RELAYLOG_LOCK_xids,
115  key_RELAYLOG_COND_done,
116  key_RELAYLOG_update_cond,
117  key_RELAYLOG_prep_xids_cond,
118  key_file_relaylog,
119  key_file_relaylog_index);
120 #endif
121 
122  group_relay_log_name[0]= event_relay_log_name[0]=
123  group_master_log_name[0]= 0;
124  until_log_name[0]= ign_master_log_name_end[0]= 0;
125  set_timespec_nsec(last_clock, 0);
126  memset(&cache_buf, 0, sizeof(cache_buf));
127  cached_charset_invalidate();
128 
129  mysql_mutex_init(key_relay_log_info_log_space_lock,
130  &log_space_lock, MY_MUTEX_INIT_FAST);
131  mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
132  mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
133  MY_MUTEX_INIT_FAST);
134  mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
135  my_atomic_rwlock_init(&slave_open_temp_tables_lock);
136 
137  relay_log.init_pthread_objects();
138  do_server_version_split(::server_version, slave_version_split);
139 
140  DBUG_VOID_RETURN;
141 }
142 
146 void Relay_log_info::init_workers(ulong n_workers)
147 {
148  /*
149  Parallel slave parameters initialization is done regardless
150  whether the feature is or going to be active or not.
151  */
152  mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
153  mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
154  mts_last_online_stat= 0;
155  my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
156 }
157 
162 {
163  delete_dynamic(&workers);
164 }
165 
166 Relay_log_info::~Relay_log_info()
167 {
168  DBUG_ENTER("Relay_log_info::~Relay_log_info");
169 
170  if (recovery_groups_inited)
171  bitmap_free(&recovery_groups);
172  mysql_mutex_destroy(&log_space_lock);
173  mysql_cond_destroy(&log_space_cond);
174  mysql_mutex_destroy(&pending_jobs_lock);
175  mysql_cond_destroy(&pending_jobs_cond);
176  my_atomic_rwlock_destroy(&slave_open_temp_tables_lock);
177  relay_log.cleanup();
178  set_rli_description_event(NULL);
179 
180  DBUG_VOID_RETURN;
181 }
182 
193 {
194  if (!is_parallel_exec())
195  return;
196  for (uint i= 0; i < workers.elements; i++)
197  {
198  Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
199  w->relay_log_change_notified= FALSE;
200  }
201 }
202 
220 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
221  bool need_data_lock)
222 {
223  /*
224  If this is not a parallel execution we return immediately.
225  */
226  if (!is_parallel_exec())
227  return;
228 
229  for (uint i= 0; i < workers.elements; i++)
230  {
231  Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
232  /*
233  Reseting the notification information in order to force workers to
234  assign jobs with the new updated information.
235  Notice that the bitmap_shifted is accumulated to indicate how many
236  consecutive jobs were successfully processed.
237 
238  The worker when assigning a new job will set the value back to
239  zero.
240  */
241  w->checkpoint_notified= FALSE;
242  w->bitmap_shifted= w->bitmap_shifted + shift;
243  /*
244  Zero shift indicates the caller rotates the master binlog.
245  The new name will be passed to W through the group descriptor
246  during the first post-rotation time scheduling.
247  */
248  if (shift == 0)
249  w->master_log_change_notified= false;
250 
251  DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
252  "worker->bitmap_shifted --> %lu, worker --> %u.",
253  shift, w->bitmap_shifted, i));
254  }
255  /*
256  There should not be a call where (shift == 0 && checkpoint_seqno != 0).
257 
258  Then the new checkpoint sequence is updated by subtracting the number
259  of consecutive jobs that were successfully processed.
260  */
261  DBUG_ASSERT(!(shift == 0 && checkpoint_seqno != 0));
262  checkpoint_seqno= checkpoint_seqno - shift;
263  DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
264  "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
265 
266  if (new_ts)
267  {
268  if (need_data_lock)
269  mysql_mutex_lock(&data_lock);
270  else
271  mysql_mutex_assert_owner(&data_lock);
272  last_master_timestamp= new_ts;
273  if (need_data_lock)
274  mysql_mutex_unlock(&data_lock);
275  }
276 }
277 
285 {
286  bool ret= false;
287  uint i;
288  uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
289 
290  DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
291 
292  for (i= 0; !ret && i < workers.elements; i++)
293  {
294  Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
295  ret= w->reset_recovery_info();
296  DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
297  }
298  /*
299  The loop is traversed in the worker index descending order due
300  to specifics of the Worker table repository that does not like
301  even temporary holes. Therefore stale records are deleted
302  from the tail.
303  */
304  for (i= recovery_parallel_workers; i > workers.elements && !ret; i--)
305  {
306  Slave_worker *w=
307  Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
308  ret= w->remove_info();
309  delete w;
310  }
311  recovery_parallel_workers= slave_parallel_workers;
312 
313  DBUG_RETURN(ret);
314 }
315 
316 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
317 {
318  MY_STAT s;
319  DBUG_ENTER("add_relay_log");
320  if (!mysql_file_stat(key_file_relaylog,
321  linfo->log_file_name, &s, MYF(0)))
322  {
323  sql_print_error("log %s listed in the index, but failed to stat.",
324  linfo->log_file_name);
325  DBUG_RETURN(1);
326  }
327  rli->log_space_total += s.st_size;
328 #ifndef DBUG_OFF
329  char buf[22];
330  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
331 #endif
332  DBUG_RETURN(0);
333 }
334 
335 int Relay_log_info::count_relay_log_space()
336 {
337  LOG_INFO flinfo;
338  DBUG_ENTER("Relay_log_info::count_relay_log_space");
339  log_space_total= 0;
340  if (relay_log.find_log_pos(&flinfo, NullS, 1))
341  {
342  sql_print_error("Could not find first log while counting relay log space.");
343  DBUG_RETURN(1);
344  }
345  do
346  {
347  if (add_relay_log(this, &flinfo))
348  DBUG_RETURN(1);
349  } while (!relay_log.find_next_log(&flinfo, 1));
350  /*
351  As we have counted everything, including what may have written in a
352  preceding write, we must reset bytes_written, or we may count some space
353  twice.
354  */
355  relay_log.reset_bytes_written();
356  DBUG_RETURN(0);
357 }
358 
364 {
365  DBUG_ENTER("clear_until_condition");
366 
367  until_condition= Relay_log_info::UNTIL_NONE;
368  until_log_name[0]= 0;
369  until_log_pos= 0;
370  until_sql_gtids.clear();
371  until_sql_gtids_first_event= true;
372  DBUG_VOID_RETURN;
373 }
374 
407  ulonglong pos, bool need_data_lock,
408  const char** errmsg,
409  bool look_for_description_event)
410 {
411  DBUG_ENTER("Relay_log_info::init_relay_log_pos");
412  DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
413 
414  *errmsg=0;
415  const char* errmsg_fmt= 0;
416  static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
417  mysql_mutex_t *log_lock= relay_log.get_log_lock();
418 
419  if (need_data_lock)
420  mysql_mutex_lock(&data_lock);
421  else
422  mysql_mutex_assert_owner(&data_lock);
423 
424  /*
425  By default the relay log is in binlog format 3 (4.0).
426  Even if format is 4, this will work enough to read the first event
427  (Format_desc) (remember that format 4 is just lenghtened compared to format
428  3; format 3 is a prefix of format 4).
429  */
430  set_rli_description_event(new Format_description_log_event(3));
431 
432  mysql_mutex_lock(log_lock);
433 
434  /* Close log file and free buffers if it's already open */
435  if (cur_log_fd >= 0)
436  {
437  end_io_cache(&cache_buf);
438  mysql_file_close(cur_log_fd, MYF(MY_WME));
439  cur_log_fd = -1;
440  }
441 
442  group_relay_log_pos= event_relay_log_pos= pos;
443 
444  /*
445  Test to see if the previous run was with the skip of purging
446  If yes, we do not purge when we restart
447  */
448  if (relay_log.find_log_pos(&linfo, NullS, 1))
449  {
450  *errmsg="Could not find first log during relay log initialization";
451  goto err;
452  }
453 
454  if (log && relay_log.find_log_pos(&linfo, log, 1))
455  {
456  errmsg_fmt= "Could not find target log file mentioned in "
457  "relay log info in the index file '%s' during "
458  "relay log initialization";
459  sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
460  *errmsg= errmsg_buff;
461  goto err;
462  }
463 
464  strmake(group_relay_log_name, linfo.log_file_name,
465  sizeof(group_relay_log_name) - 1);
466  strmake(event_relay_log_name, linfo.log_file_name,
467  sizeof(event_relay_log_name) - 1);
468 
469  if (relay_log.is_active(linfo.log_file_name))
470  {
471  /*
472  The IO thread is using this log file.
473  In this case, we will use the same IO_CACHE pointer to
474  read data as the IO thread is using to write data.
475  */
476  my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
477  if (check_binlog_magic(cur_log, errmsg))
478  goto err;
479  cur_log_old_open_count=relay_log.get_open_count();
480  }
481  else
482  {
483  /*
484  Open the relay log and set cur_log to point at this one
485  */
486  if ((cur_log_fd=open_binlog_file(&cache_buf,
487  linfo.log_file_name,errmsg)) < 0)
488  goto err;
489  cur_log = &cache_buf;
490  }
491  /*
492  In all cases, check_binlog_magic() has been called so we're at offset 4 for
493  sure.
494  */
495  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
496  {
497  Log_event* ev;
498  while (look_for_description_event)
499  {
500  /*
501  Read the possible Format_description_log_event; if position
502  was 4, no need, it will be read naturally.
503  */
504  DBUG_PRINT("info",("looking for a Format_description_log_event"));
505 
506  if (my_b_tell(cur_log) >= pos)
507  break;
508 
509  /*
510  Because of we have data_lock and log_lock, we can safely read an
511  event
512  */
513  if (!(ev= Log_event::read_log_event(cur_log, 0,
514  rli_description_event,
515  opt_slave_sql_verify_checksum)))
516  {
517  DBUG_PRINT("info",("could not read event, cur_log->error=%d",
518  cur_log->error));
519  if (cur_log->error) /* not EOF */
520  {
521  *errmsg= "I/O error reading event at position 4";
522  goto err;
523  }
524  break;
525  }
526  else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
527  {
528  DBUG_PRINT("info",("found Format_description_log_event"));
529  set_rli_description_event((Format_description_log_event *)ev);
530  /*
531  As ev was returned by read_log_event, it has passed is_valid(), so
532  my_malloc() in ctor worked, no need to check again.
533  */
534  /*
535  Ok, we found a Format_description event. But it is not sure that this
536  describes the whole relay log; indeed, one can have this sequence
537  (starting from position 4):
538  Format_desc (of slave)
539  Rotate (of master)
540  Format_desc (of master)
541  So the Format_desc which really describes the rest of the relay log
542  is the 3rd event (it can't be further than that, because we rotate
543  the relay log when we queue a Rotate event from the master).
544  But what describes the Rotate is the first Format_desc.
545  So what we do is:
546  go on searching for Format_description events, until you exceed the
547  position (argument 'pos') or until you find another event than Rotate
548  or Format_desc.
549  */
550  }
551  else
552  {
553  DBUG_PRINT("info",("found event of another type=%d",
554  ev->get_type_code()));
555  look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
556  delete ev;
557  }
558  }
559  my_b_seek(cur_log,(off_t)pos);
560 #ifndef DBUG_OFF
561  {
562  char llbuf1[22], llbuf2[22];
563  DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
564  llstr(my_b_tell(cur_log),llbuf1),
565  llstr(get_event_relay_log_pos(),llbuf2)));
566  }
567 #endif
568 
569  }
570 
571 err:
572  /*
573  If we don't purge, we can't honour relay_log_space_limit ;
574  silently discard it
575  */
576  if (!relay_log_purge)
577  {
578  log_space_limit= 0; // todo: consider to throw a warning at least
579  }
580  mysql_cond_broadcast(&data_cond);
581 
582  mysql_mutex_unlock(log_lock);
583 
584  if (need_data_lock)
585  mysql_mutex_unlock(&data_lock);
586  if (!rli_description_event->is_valid() && !*errmsg)
587  *errmsg= "Invalid Format_description log event; could be out of memory";
588 
589  DBUG_RETURN ((*errmsg) ? 1 : 0);
590 }
591 
613 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
614  longlong log_pos,
615  longlong timeout)
616 {
617  int event_count = 0;
618  ulong init_abort_pos_wait;
619  int error=0;
620  struct timespec abstime; // for timeout checking
621  PSI_stage_info old_stage;
622  DBUG_ENTER("Relay_log_info::wait_for_pos");
623 
624  if (!inited)
625  DBUG_RETURN(-2);
626 
627  DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
628  log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
629 
630  set_timespec(abstime,timeout);
631  mysql_mutex_lock(&data_lock);
632  thd->ENTER_COND(&data_cond, &data_lock,
633  &stage_waiting_for_the_slave_thread_to_advance_position,
634  &old_stage);
635  /*
636  This function will abort when it notices that some CHANGE MASTER or
637  RESET MASTER has changed the master info.
638  To catch this, these commands modify abort_pos_wait ; We just monitor
639  abort_pos_wait and see if it has changed.
640  Why do we have this mechanism instead of simply monitoring slave_running
641  in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
642  the SQL thread be stopped?
643  This is becasue if someones does:
644  STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
645  the change may happen very quickly and we may not notice that
646  slave_running briefly switches between 1/0/1.
647  */
648  init_abort_pos_wait= abort_pos_wait;
649 
650  /*
651  We'll need to
652  handle all possible log names comparisons (e.g. 999 vs 1000).
653  We use ulong for string->number conversion ; this is no
654  stronger limitation than in find_uniq_filename in sql/log.cc
655  */
656  ulong log_name_extension;
657  char log_name_tmp[FN_REFLEN]; //make a char[] from String
658 
659  strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
660 
661  char *p= fn_ext(log_name_tmp);
662  char *p_end;
663  if (!*p || log_pos<0)
664  {
665  error= -2; //means improper arguments
666  goto err;
667  }
668  // Convert 0-3 to 4
669  log_pos= max<ulong>(log_pos, BIN_LOG_HEADER_SIZE);
670  /* p points to '.' */
671  log_name_extension= strtoul(++p, &p_end, 10);
672  /*
673  p_end points to the first invalid character.
674  If it equals to p, no digits were found, error.
675  If it contains '\0' it means conversion went ok.
676  */
677  if (p_end==p || *p_end)
678  {
679  error= -2;
680  goto err;
681  }
682 
683  /* The "compare and wait" main loop */
684  while (!thd->killed &&
685  init_abort_pos_wait == abort_pos_wait &&
686  slave_running)
687  {
688  bool pos_reached;
689  int cmp_result= 0;
690 
691  DBUG_PRINT("info",
692  ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
693  init_abort_pos_wait, abort_pos_wait));
694  DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
695  group_master_log_name, (ulong) group_master_log_pos));
696 
697  /*
698  group_master_log_name can be "", if we are just after a fresh
699  replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
700  (before we have executed one Rotate event from the master) or
701  (rare) if the user is doing a weird slave setup (see next
702  paragraph). If group_master_log_name is "", we assume we don't
703  have enough info to do the comparison yet, so we just wait until
704  more data. In this case master_log_pos is always 0 except if
705  somebody (wrongly) sets this slave to be a slave of itself
706  without using --replicate-same-server-id (an unsupported
707  configuration which does nothing), then group_master_log_pos
708  will grow and group_master_log_name will stay "".
709  */
710  if (*group_master_log_name)
711  {
712  char *basename= (group_master_log_name +
713  dirname_length(group_master_log_name));
714  /*
715  First compare the parts before the extension.
716  Find the dot in the master's log basename,
717  and protect against user's input error :
718  if the names do not match up to '.' included, return error
719  */
720  char *q= (char*)(fn_ext(basename)+1);
721  if (strncmp(basename, log_name_tmp, (int)(q-basename)))
722  {
723  error= -2;
724  break;
725  }
726  // Now compare extensions.
727  char *q_end;
728  ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
729  if (group_master_log_name_extension < log_name_extension)
730  cmp_result= -1 ;
731  else
732  cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
733 
734  pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
735  cmp_result > 0);
736  if (pos_reached || thd->killed)
737  break;
738  }
739 
740  //wait for master update, with optional timeout.
741 
742  DBUG_PRINT("info",("Waiting for master update"));
743  /*
744  We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
745  will wake us up.
746  */
747  thd_wait_begin(thd, THD_WAIT_BINLOG);
748  if (timeout > 0)
749  {
750  /*
751  Note that mysql_cond_timedwait checks for the timeout
752  before for the condition ; i.e. it returns ETIMEDOUT
753  if the system time equals or exceeds the time specified by abstime
754  before the condition variable is signaled or broadcast, _or_ if
755  the absolute time specified by abstime has already passed at the time
756  of the call.
757  For that reason, mysql_cond_timedwait will do the "timeoutting" job
758  even if its condition is always immediately signaled (case of a loaded
759  master).
760  */
761  error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
762  }
763  else
764  mysql_cond_wait(&data_cond, &data_lock);
765  thd_wait_end(thd);
766  DBUG_PRINT("info",("Got signal of master update or timed out"));
767  if (error == ETIMEDOUT || error == ETIME)
768  {
769 #ifndef DBUG_OFF
770  /*
771  Doing this to generate a stack trace and make debugging
772  easier.
773  */
774  if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
775  DBUG_ASSERT(0);
776 #endif
777  error= -1;
778  break;
779  }
780  error=0;
781  event_count++;
782  DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
783  }
784 
785 err:
786  thd->EXIT_COND(&old_stage);
787  DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
788 improper_arguments: %d timed_out: %d",
789  thd->killed_errno(),
790  (int) (init_abort_pos_wait != abort_pos_wait),
791  (int) slave_running,
792  (int) (error == -2),
793  (int) (error == -1)));
794  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
795  !slave_running)
796  {
797  error= -2;
798  }
799  DBUG_RETURN( error ? error : event_count );
800 }
801 
802 /*
803  TODO: This is a duplicated code that needs to be simplified.
804  This will be done while developing all possible sync options.
805  See WL#3584's specification.
806 
807  /Alfranio
808 */
809 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
810  longlong timeout)
811 {
812  int event_count = 0;
813  ulong init_abort_pos_wait;
814  int error=0;
815  struct timespec abstime; // for timeout checking
816  PSI_stage_info old_stage;
817  DBUG_ENTER("Relay_log_info::wait_for_gtid_set");
818 
819  if (!inited)
820  DBUG_RETURN(-2);
821 
822  DBUG_PRINT("info", ("Waiting for %s timeout %lld", gtid->c_ptr_safe(),
823  timeout));
824 
825  set_timespec(abstime, timeout);
826  mysql_mutex_lock(&data_lock);
827  thd->ENTER_COND(&data_cond, &data_lock,
828  &stage_waiting_for_the_slave_thread_to_advance_position,
829  &old_stage);
830  /*
831  This function will abort when it notices that some CHANGE MASTER or
832  RESET MASTER has changed the master info.
833  To catch this, these commands modify abort_pos_wait ; We just monitor
834  abort_pos_wait and see if it has changed.
835  Why do we have this mechanism instead of simply monitoring slave_running
836  in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
837  the SQL thread be stopped?
838  This is becasue if someones does:
839  STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
840  the change may happen very quickly and we may not notice that
841  slave_running briefly switches between 1/0/1.
842  */
843  init_abort_pos_wait= abort_pos_wait;
844  Gtid_set wait_gtid_set(global_sid_map);
845  global_sid_lock->rdlock();
846  if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
847  {
848  global_sid_lock->unlock();
849  goto err;
850  }
851  global_sid_lock->unlock();
852 
853  /* The "compare and wait" main loop */
854  while (!thd->killed &&
855  init_abort_pos_wait == abort_pos_wait &&
856  slave_running)
857  {
858  DBUG_PRINT("info",
859  ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
860  init_abort_pos_wait, abort_pos_wait));
861 
862  //wait for master update, with optional timeout.
863 
864  global_sid_lock->wrlock();
865  const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
866  const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
867 
868  DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
869  "!is_intersection_nonempty: %d",
870  gtid->c_ptr_safe(), wait_gtid_set.is_subset(logged_gtids),
871  !owned_gtids->is_intersection_nonempty(&wait_gtid_set)));
872  logged_gtids->dbug_print("gtid_executed:");
873  owned_gtids->dbug_print("owned_gtids:");
874 
875  /*
876  Since commit is performed after log to binary log, we must also
877  check if any GTID of wait_gtid_set is not yet committed.
878  */
879  if (wait_gtid_set.is_subset(logged_gtids) &&
880  !owned_gtids->is_intersection_nonempty(&wait_gtid_set))
881  {
882  global_sid_lock->unlock();
883  break;
884  }
885  global_sid_lock->unlock();
886 
887  DBUG_PRINT("info",("Waiting for master update"));
888 
889  /*
890  We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
891  will wake us up.
892  */
893  thd_wait_begin(thd, THD_WAIT_BINLOG);
894  if (timeout > 0)
895  {
896  /*
897  Note that mysql_cond_timedwait checks for the timeout
898  before for the condition ; i.e. it returns ETIMEDOUT
899  if the system time equals or exceeds the time specified by abstime
900  before the condition variable is signaled or broadcast, _or_ if
901  the absolute time specified by abstime has already passed at the time
902  of the call.
903  For that reason, mysql_cond_timedwait will do the "timeoutting" job
904  even if its condition is always immediately signaled (case of a loaded
905  master).
906  */
907  error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
908  }
909  else
910  mysql_cond_wait(&data_cond, &data_lock);
911  thd_wait_end(thd);
912  DBUG_PRINT("info",("Got signal of master update or timed out"));
913  if (error == ETIMEDOUT || error == ETIME)
914  {
915 #ifndef DBUG_OFF
916  /*
917  Doing this to generate a stack trace and make debugging
918  easier.
919  */
920  if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
921  DBUG_ASSERT(0);
922 #endif
923  error= -1;
924  break;
925  }
926  error=0;
927  event_count++;
928  DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
929  }
930 
931 err:
932  thd->EXIT_COND(&old_stage);
933  DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
934 improper_arguments: %d timed_out: %d",
935  thd->killed_errno(),
936  (int) (init_abort_pos_wait != abort_pos_wait),
937  (int) slave_running,
938  (int) (error == -2),
939  (int) (error == -1)));
940  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
941  !slave_running)
942  {
943  error= -2;
944  }
945  DBUG_RETURN( error ? error : event_count );
946 }
947 
948 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
949  bool need_data_lock)
950 {
951  int error= 0;
952  DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
953 
954  if (need_data_lock)
955  mysql_mutex_lock(&data_lock);
956  else
957  mysql_mutex_assert_owner(&data_lock);
958 
959  inc_event_relay_log_pos();
960  group_relay_log_pos= event_relay_log_pos;
961  strmake(group_relay_log_name,event_relay_log_name,
962  sizeof(group_relay_log_name)-1);
963 
964  notify_group_relay_log_name_update();
965 
966  /*
967  In 4.x we used the event's len to compute the positions here. This is
968  wrong if the event was 3.23/4.0 and has been converted to 5.0, because
969  then the event's len is not what is was in the master's binlog, so this
970  will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
971  replication: Exec_master_log_pos is wrong). Only way to solve this is to
972  have the original offset of the end of the event the relay log. This is
973  what we do in 5.0: log_pos has become "end_log_pos" (because the real use
974  of log_pos in 4.0 was to compute the end_log_pos; so better to store
975  end_log_pos instead of begin_log_pos.
976  If we had not done this fix here, the problem would also have appeared
977  when the slave and master are 5.0 but with different event length (for
978  example the slave is more recent than the master and features the event
979  UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
980  SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
981  value which would lead to badly broken replication.
982  Even the relay_log_pos will be corrupted in this case, because the len is
983  the relay log is not "val".
984  With the end_log_pos solution, we avoid computations involving lengthes.
985  */
986  DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
987  (long) log_pos, (long) group_master_log_pos));
988 
989  if (log_pos > 0) // 3.23 binlogs don't have log_posx
990  group_master_log_pos= log_pos;
991 
992  /*
993  In MTS mode FD or Rotate event commit their solitary group to
994  Coordinator's info table. Callers make sure that Workers have been
995  executed all assignements.
996  Broadcast to master_pos_wait() waiters should be done after
997  the table is updated.
998  */
999  DBUG_ASSERT(!is_parallel_exec() ||
1000  mts_group_status != Relay_log_info::MTS_IN_GROUP);
1001  /*
1002  We do not force synchronization at this point, note the
1003  parameter false, because a non-transactional change is
1004  being committed.
1005 
1006  For that reason, the synchronization here is subjected to
1007  the option sync_relay_log_info.
1008 
1009  See sql/rpl_rli.h for further information on this behavior.
1010  */
1011  error= flush_info(FALSE);
1012 
1013  mysql_cond_broadcast(&data_cond);
1014  if (need_data_lock)
1015  mysql_mutex_unlock(&data_lock);
1016  DBUG_RETURN(error);
1017 }
1018 
1019 
1020 void Relay_log_info::close_temporary_tables()
1021 {
1022  TABLE *table,*next;
1023  DBUG_ENTER("Relay_log_info::close_temporary_tables");
1024 
1025  for (table=save_temporary_tables ; table ; table=next)
1026  {
1027  next=table->next;
1028  /*
1029  Don't ask for disk deletion. For now, anyway they will be deleted when
1030  slave restarts, but it is a better intention to not delete them.
1031  */
1032  DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1033  close_temporary(table, 1, 0);
1034  }
1035  save_temporary_tables= 0;
1036  slave_open_temp_tables= 0;
1037  DBUG_VOID_RETURN;
1038 }
1039 
1053 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1054  const char** errmsg)
1055 {
1056  int error=0;
1057  DBUG_ENTER("Relay_log_info::purge_relay_logs");
1058 
1059  /*
1060  Even if inited==0, we still try to empty master_log_* variables. Indeed,
1061  inited==0 does not imply that they already are empty.
1062 
1063  It could be that slave's info initialization partly succeeded: for example
1064  if relay-log.info existed but *relay-bin*.* have been manually removed,
1065  init_info reads the old relay-log.info and fills rli->master_log_*, then
1066  init_info checks for the existence of the relay log, this fails and
1067  init_info leaves inited to 0.
1068  In that pathological case, master_log_pos* will be properly reinited at
1069  the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1070  purge_relay_logs, will delete bogus *.info files or replace them with
1071  correct files), however if the user does SHOW SLAVE STATUS before START
1072  SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1073  master_log_* for SHOW SLAVE STATUS to display fine in any case.
1074  */
1075  group_master_log_name[0]= 0;
1076  group_master_log_pos= 0;
1077 
1078  if (!inited)
1079  {
1080  DBUG_PRINT("info", ("inited == 0"));
1081  DBUG_RETURN(0);
1082  }
1083 
1084  DBUG_ASSERT(slave_running == 0);
1085  DBUG_ASSERT(mi->slave_running == 0);
1086 
1087  slave_skip_counter= 0;
1088  mysql_mutex_lock(&data_lock);
1089 
1090  /*
1091  we close the relay log fd possibly left open by the slave SQL thread,
1092  to be able to delete it; the relay log fd possibly left open by the slave
1093  I/O thread will be closed naturally in reset_logs() by the
1094  close(LOG_CLOSE_TO_BE_OPENED) call
1095  */
1096  if (cur_log_fd >= 0)
1097  {
1098  end_io_cache(&cache_buf);
1099  my_close(cur_log_fd, MYF(MY_WME));
1100  cur_log_fd= -1;
1101  }
1102 
1103  if (relay_log.reset_logs(thd))
1104  {
1105  *errmsg = "Failed during log reset";
1106  error=1;
1107  goto err;
1108  }
1109  /* Save name of used relay log file */
1110  strmake(group_relay_log_name, relay_log.get_log_fname(),
1111  sizeof(group_relay_log_name)-1);
1112  strmake(event_relay_log_name, relay_log.get_log_fname(),
1113  sizeof(event_relay_log_name)-1);
1114  group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1115  if (count_relay_log_space())
1116  {
1117  *errmsg= "Error counting relay log space";
1118  error= 1;
1119  goto err;
1120  }
1121  if (!just_reset)
1122  error= init_relay_log_pos(group_relay_log_name,
1123  group_relay_log_pos,
1124  false/*need_data_lock=false*/, errmsg, 0);
1125 
1126 err:
1127 #ifndef DBUG_OFF
1128  char buf[22];
1129 #endif
1130  DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1131  mysql_mutex_unlock(&data_lock);
1132  DBUG_RETURN(error);
1133 }
1134 
1135 
1167 {
1168  char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1169  "condition is bad.";
1170  DBUG_ENTER("Relay_log_info::is_until_satisfied");
1171 
1172  switch (until_condition)
1173  {
1174  case UNTIL_MASTER_POS:
1175  case UNTIL_RELAY_POS:
1176  {
1177  const char *log_name= NULL;
1178  ulonglong log_pos= 0;
1179 
1180  if (until_condition == UNTIL_MASTER_POS)
1181  {
1182  if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1183  DBUG_RETURN(false);
1184  log_name= group_master_log_name;
1185  log_pos= (!ev)? group_master_log_pos :
1186  ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ?
1187  group_master_log_pos : ev->log_pos - ev->data_written);
1188  }
1189  else
1190  { /* until_condition == UNTIL_RELAY_POS */
1191  log_name= group_relay_log_name;
1192  log_pos= group_relay_log_pos;
1193  }
1194 
1195 #ifndef DBUG_OFF
1196  {
1197  char buf[32];
1198  DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1199  group_master_log_name, llstr(group_master_log_pos, buf)));
1200  DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1201  group_relay_log_name, llstr(group_relay_log_pos, buf)));
1202  DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1203  until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1204  log_name, llstr(log_pos, buf)));
1205  DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1206  until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1207  until_log_name, llstr(until_log_pos, buf)));
1208  }
1209 #endif
1210 
1211  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1212  {
1213  /*
1214  We have no cached comparison results so we should compare log names
1215  and cache result.
1216  If we are after RESET SLAVE, and the SQL slave thread has not processed
1217  any event yet, it could be that group_master_log_name is "". In that case,
1218  just wait for more events (as there is no sensible comparison to do).
1219  */
1220 
1221  if (*log_name)
1222  {
1223  const char *basename= log_name + dirname_length(log_name);
1224 
1225  const char *q= (const char*)(fn_ext(basename)+1);
1226  if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1227  {
1228  /* Now compare extensions. */
1229  char *q_end;
1230  ulong log_name_extension= strtoul(q, &q_end, 10);
1231  if (log_name_extension < until_log_name_extension)
1232  until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1233  else
1234  until_log_names_cmp_result=
1235  (log_name_extension > until_log_name_extension) ?
1236  UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1237  }
1238  else
1239  {
1240  /* Base names do not match, so we abort */
1241  sql_print_error("%s", error_msg);
1242  DBUG_RETURN(true);
1243  }
1244  }
1245  else
1246  DBUG_RETURN(until_log_pos == 0);
1247  }
1248 
1249  if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1250  log_pos >= until_log_pos) ||
1251  until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1252  {
1253  char buf[22];
1254  sql_print_information("Slave SQL thread stopped because it reached its"
1255  " UNTIL position %s", llstr(until_pos(), buf));
1256  DBUG_RETURN(true);
1257  }
1258  DBUG_RETURN(false);
1259  }
1260 
1261  case UNTIL_SQL_BEFORE_GTIDS:
1262  // We only need to check once if logged_gtids set contains any of the until_sql_gtids.
1263  if (until_sql_gtids_first_event)
1264  {
1265  until_sql_gtids_first_event= false;
1266  global_sid_lock->wrlock();
1267  /* Check if until GTIDs were already applied. */
1268  const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
1269  if (until_sql_gtids.is_intersection_nonempty(logged_gtids))
1270  {
1271  char *buffer= until_sql_gtids.to_string();
1272  global_sid_lock->unlock();
1273  sql_print_information("Slave SQL thread stopped because "
1274  "UNTIL SQL_BEFORE_GTIDS %s is already "
1275  "applied", buffer);
1276  my_free(buffer);
1277  DBUG_RETURN(true);
1278  }
1279  global_sid_lock->unlock();
1280  }
1281  if (ev != NULL && ev->get_type_code() == GTID_LOG_EVENT)
1282  {
1283  Gtid_log_event *gev= (Gtid_log_event *)ev;
1284  global_sid_lock->rdlock();
1285  if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1286  {
1287  char *buffer= until_sql_gtids.to_string();
1288  global_sid_lock->unlock();
1289  sql_print_information("Slave SQL thread stopped because it reached "
1290  "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1291  my_free(buffer);
1292  DBUG_RETURN(true);
1293  }
1294  global_sid_lock->unlock();
1295  }
1296  DBUG_RETURN(false);
1297  break;
1298 
1299  case UNTIL_SQL_AFTER_GTIDS:
1300  {
1301  global_sid_lock->wrlock();
1302  const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
1303  if (until_sql_gtids.is_subset(logged_gtids))
1304  {
1305  char *buffer= until_sql_gtids.to_string();
1306  global_sid_lock->unlock();
1307  sql_print_information("Slave SQL thread stopped because it reached "
1308  "UNTIL SQL_AFTER_GTIDS %s", buffer);
1309  my_free(buffer);
1310  DBUG_RETURN(true);
1311  }
1312  global_sid_lock->unlock();
1313  DBUG_RETURN(false);
1314  }
1315  break;
1316 
1317  case UNTIL_SQL_AFTER_MTS_GAPS:
1318 #ifndef DBUG_OFF
1319  case UNTIL_DONE:
1320 #endif
1321  /*
1322  TODO: this condition is actually post-execution or post-scheduling
1323  so the proper place to check it before SQL thread goes
1324  into next_event() where it can wait while the condition
1325  has been satisfied already.
1326  It's deployed here temporarily to be fixed along the regular UNTIL
1327  support for MTS is provided.
1328  */
1329  if (mts_recovery_group_cnt == 0)
1330  {
1331  sql_print_information("Slave SQL thread stopped according to "
1332  "UNTIL SQL_AFTER_MTS_GAPS as it has "
1333  "processed all gap transactions left from "
1334  "the previous slave session.");
1335 #ifndef DBUG_OFF
1336  until_condition= UNTIL_DONE;
1337 #endif
1338  DBUG_RETURN(true);
1339  }
1340  else
1341  {
1342  DBUG_RETURN(false);
1343  }
1344  break;
1345 
1346  case UNTIL_NONE:
1347  DBUG_ASSERT(0);
1348  break;
1349  }
1350 
1351  DBUG_ASSERT(0);
1352  DBUG_RETURN(false);
1353 }
1354 
1356 {
1357  DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1358 
1359  /* Full of zeroes means uninitialized. */
1360  memset(cached_charset, 0, sizeof(cached_charset));
1361  DBUG_VOID_RETURN;
1362 }
1363 
1364 
1365 bool Relay_log_info::cached_charset_compare(char *charset) const
1366 {
1367  DBUG_ENTER("Relay_log_info::cached_charset_compare");
1368 
1369  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1370  {
1371  memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1372  DBUG_RETURN(1);
1373  }
1374  DBUG_RETURN(0);
1375 }
1376 
1377 
1378 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1379 {
1380  int error= 0;
1381 
1382  clear_flag(IN_STMT);
1383 
1384  DBUG_ASSERT(!belongs_to_client());
1385  /* Worker does not execute binlog update position logics */
1386  DBUG_ASSERT(!is_mts_worker(info_thd));
1387 
1388  /*
1389  Replication keeps event and group positions to specify the
1390  set of events that were executed.
1391  Event positions are incremented after processing each event
1392  whereas group positions are incremented when an event or a
1393  set of events is processed such as in a transaction and are
1394  committed or rolled back.
1395 
1396  A transaction can be ended with a Query Event, i.e. either
1397  commit or rollback, or by a Xid Log Event. Query Event is
1398  used to terminate pseudo-transactions that are executed
1399  against non-transactional engines such as MyIsam. Xid Log
1400  Event denotes though that a set of changes executed
1401  against a transactional engine is about to commit.
1402 
1403  Events' positions are incremented at stmt_done(). However,
1404  transactions that are ended with Xid Log Event have their
1405  group position incremented in the do_apply_event() and in
1406  the do_apply_event_work().
1407 
1408  Notice that the type of the engine, i.e. where data and
1409  positions are stored, against what events are being applied
1410  are not considered in this logic.
1411 
1412  Regarding the code that follows, notice that the executed
1413  group coordinates don't change if the current event is internal
1414  to the group. The same applies to MTS Coordinator when it
1415  handles a Format Descriptor event that appears in the middle
1416  of a group that is about to be assigned.
1417  */
1418  if ((!is_parallel_exec() && is_in_group()) ||
1419  mts_group_status != MTS_NOT_IN_GROUP)
1420  {
1421  inc_event_relay_log_pos();
1422  }
1423  else
1424  {
1425  if (is_parallel_exec())
1426  {
1427 
1428  DBUG_ASSERT(!is_mts_worker(info_thd));
1429 
1430  /*
1431  Format Description events only can drive MTS execution to this
1432  point. It is a special event group that is handled with
1433  synchronization. For that reason, the checkpoint routine is
1434  called here.
1435  */
1436  error= mts_checkpoint_routine(this, 0, false,
1437  true/*need_data_lock=true*/);
1438  }
1439  if (!error)
1440  error= inc_group_relay_log_pos(event_master_log_pos,
1441  true/*need_data_lock=true*/);
1442  }
1443 
1444  return error;
1445 }
1446 
1447 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1448 void Relay_log_info::cleanup_context(THD *thd, bool error)
1449 {
1450  DBUG_ENTER("Relay_log_info::cleanup_context");
1451 
1452  DBUG_ASSERT(info_thd == thd);
1453  /*
1454  1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1455  may have opened tables, which we cannot be sure have been closed (because
1456  maybe the Rows_log_event have not been found or will not be, because slave
1457  SQL thread is stopping, or relay log has a missing tail etc). So we close
1458  all thread's tables. And so the table mappings have to be cancelled.
1459  2) Rows_log_event::do_apply_event() may even have started statements or
1460  transactions on them, which we need to rollback in case of error.
1461  3) If finding a Format_description_log_event after a BEGIN, we also need
1462  to rollback before continuing with the next events.
1463  4) so we need this "context cleanup" function.
1464  */
1465  if (error)
1466  {
1467  trans_rollback_stmt(thd); // if a "statement transaction"
1468  trans_rollback(thd); // if a "real transaction"
1469  }
1470  if (rows_query_ev)
1471  {
1472  delete rows_query_ev;
1473  rows_query_ev= NULL;
1474  info_thd->set_query(NULL, 0);
1475  }
1476  m_table_map.clear_tables();
1477  slave_close_thread_tables(thd);
1478  if (error)
1479  thd->mdl_context.release_transactional_locks();
1480  clear_flag(IN_STMT);
1481  /*
1482  Cleanup for the flags that have been set at do_apply_event.
1483  */
1484  thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1485  thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1486 
1487  /*
1488  Reset state related to long_find_row notes in the error log:
1489  - timestamp
1490  - flag that decides whether the slave prints or not
1491  */
1492  reset_row_stmt_start_timestamp();
1493  unset_long_find_row_note_printed();
1494 
1495  DBUG_VOID_RETURN;
1496 }
1497 
1498 void Relay_log_info::clear_tables_to_lock()
1499 {
1500  DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
1501 #ifndef DBUG_OFF
1502 
1512  uint i=0;
1513  for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
1514  DBUG_ASSERT(i == tables_to_lock_count);
1515 #endif
1516 
1517  while (tables_to_lock)
1518  {
1519  uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1520  if (tables_to_lock->m_tabledef_valid)
1521  {
1522  tables_to_lock->m_tabledef.table_def::~table_def();
1523  tables_to_lock->m_tabledef_valid= FALSE;
1524  }
1525 
1526  /*
1527  If blob fields were used during conversion of field values
1528  from the master table into the slave table, then we need to
1529  free the memory used temporarily to store their values before
1530  copying into the slave's table.
1531  */
1532  if (tables_to_lock->m_conv_table)
1533  free_blobs(tables_to_lock->m_conv_table);
1534 
1535  tables_to_lock=
1536  static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1537  tables_to_lock_count--;
1538  my_free(to_free);
1539  }
1540  DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
1541  DBUG_VOID_RETURN;
1542 }
1543 
1544 void Relay_log_info::slave_close_thread_tables(THD *thd)
1545 {
1546  thd->get_stmt_da()->set_overwrite_status(true);
1547  DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
1548  thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1549  thd->get_stmt_da()->set_overwrite_status(false);
1550 
1551  close_thread_tables(thd);
1552  /*
1553  - If transaction rollback was requested due to deadlock
1554  perform it and release metadata locks.
1555  - If inside a multi-statement transaction,
1556  defer the release of metadata locks until the current
1557  transaction is either committed or rolled back. This prevents
1558  other statements from modifying the table for the entire
1559  duration of this transaction. This provides commit ordering
1560  and guarantees serializability across multiple transactions.
1561  - If in autocommit mode, or outside a transactional context,
1562  automatically release metadata locks of the current statement.
1563  */
1564  if (thd->transaction_rollback_request)
1565  {
1566  trans_rollback_implicit(thd);
1567  thd->mdl_context.release_transactional_locks();
1568  }
1569  else if (! thd->in_multi_stmt_transaction_mode())
1570  thd->mdl_context.release_transactional_locks();
1571  else
1572  thd->mdl_context.release_statement_locks();
1573 
1574  clear_tables_to_lock();
1575  DBUG_VOID_RETURN;
1576 }
1586 bool mysql_show_relaylog_events(THD* thd)
1587 {
1588  Protocol *protocol= thd->protocol;
1589  List<Item> field_list;
1590  DBUG_ENTER("mysql_show_relaylog_events");
1591 
1592  DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1593 
1594  Log_event::init_show_field_list(&field_list);
1595  if (protocol->send_result_set_metadata(&field_list,
1596  Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1597  DBUG_RETURN(TRUE);
1598 
1599  if (active_mi == NULL)
1600  {
1601  my_error(ER_SLAVE_CONFIGURATION, MYF(0));
1602  DBUG_RETURN(true);
1603  }
1604 
1605  DBUG_RETURN(show_binlog_events(thd, &active_mi->rli->relay_log));
1606 }
1607 
1608 #endif
1609 
1610 int Relay_log_info::rli_init_info()
1611 {
1612  int error= 0;
1613  enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
1614  const char *msg= NULL;
1615 
1616  DBUG_ENTER("Relay_log_info::rli_init_info");
1617 
1618  mysql_mutex_assert_owner(&data_lock);
1619 
1620  /*
1621  If Relay_log_info is issued again after a failed init_info(), for
1622  instance because of missing relay log files, it will generate new
1623  files and ignore the previous failure, to avoid that we set
1624  error_on_rli_init_info as true.
1625  This a consequence of the behaviour change, in the past server was
1626  stopped when there were replication initialization errors, now it is
1627  not and so init_info() must be aware of previous failures.
1628  */
1629  if (error_on_rli_init_info)
1630  goto err;
1631 
1632  if (inited)
1633  {
1634  /*
1635  We have to reset read position of relay-log-bin as we may have
1636  already been reading from 'hotlog' when the slave was stopped
1637  last time. If this case pos_in_file would be set and we would
1638  get a crash when trying to read the signature for the binary
1639  relay log.
1640 
1641  We only rewind the read position if we are starting the SQL
1642  thread. The handle_slave_sql thread assumes that the read
1643  position is at the beginning of the file, and will read the
1644  "signature" and then fast-forward to the last position read.
1645  */
1646  bool hot_log= FALSE;
1647  /*
1648  my_b_seek does an implicit flush_io_cache, so we need to:
1649 
1650  1. check if this log is active (hot)
1651  2. if it is we keep log_lock until the seek ends, otherwise
1652  release it right away.
1653 
1654  If we did not take log_lock, SQL thread might race with IO
1655  thread for the IO_CACHE mutex.
1656 
1657  */
1658  mysql_mutex_t *log_lock= relay_log.get_log_lock();
1659  mysql_mutex_lock(log_lock);
1660  hot_log= relay_log.is_active(linfo.log_file_name);
1661 
1662  if (!hot_log)
1663  mysql_mutex_unlock(log_lock);
1664 
1665  my_b_seek(cur_log, (my_off_t) 0);
1666 
1667  if (hot_log)
1668  mysql_mutex_unlock(log_lock);
1669  DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
1670  }
1671 
1672  cur_log_fd = -1;
1673  slave_skip_counter= 0;
1674  abort_pos_wait= 0;
1675  log_space_limit= relay_log_space_limit;
1676  log_space_total= 0;
1677  tables_to_lock= 0;
1678  tables_to_lock_count= 0;
1679 
1680  char pattern[FN_REFLEN];
1681  (void) my_realpath(pattern, slave_load_tmpdir, 0);
1682  if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
1683  MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
1684  {
1685  sql_print_error("Unable to use slave's temporary directory '%s'.",
1686  slave_load_tmpdir);
1687  DBUG_RETURN(1);
1688  }
1689  unpack_filename(slave_patternload_file, pattern);
1690  slave_patternload_file_size= strlen(slave_patternload_file);
1691 
1692  /*
1693  The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
1694  Note that the I/O thread flushes it to disk after writing every
1695  event, in flush_info within the master info.
1696  */
1697  /*
1698  For the maximum log size, we choose max_relay_log_size if it is
1699  non-zero, max_binlog_size otherwise. If later the user does SET
1700  GLOBAL on one of these variables, fix_max_binlog_size and
1701  fix_max_relay_log_size will reconsider the choice (for example
1702  if the user changes max_relay_log_size to zero, we have to
1703  switch to using max_binlog_size for the relay log) and update
1704  relay_log.max_size (and mysql_bin_log.max_size).
1705  */
1706  {
1707  /* Reports an error and returns, if the --relay-log's path
1708  is a directory.*/
1709  if (opt_relay_logname &&
1710  opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
1711  {
1712  sql_print_error("Path '%s' is a directory name, please specify \
1713 a file name for --relay-log option.", opt_relay_logname);
1714  DBUG_RETURN(1);
1715  }
1716 
1717  /* Reports an error and returns, if the --relay-log-index's path
1718  is a directory.*/
1719  if (opt_relaylog_index_name &&
1720  opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
1721  == FN_LIBCHAR)
1722  {
1723  sql_print_error("Path '%s' is a directory name, please specify \
1724 a file name for --relay-log-index option.", opt_relaylog_index_name);
1725  DBUG_RETURN(1);
1726  }
1727 
1728  char buf[FN_REFLEN];
1729  const char *ln;
1730  static bool name_warning_sent= 0;
1731  ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
1732  1, buf);
1733  /* We send the warning only at startup, not after every RESET SLAVE */
1734  if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
1735  {
1736  /*
1737  User didn't give us info to name the relay log index file.
1738  Picking `hostname`-relay-bin.index like we do, causes replication to
1739  fail if this slave's hostname is changed later. So, we would like to
1740  instead require a name. But as we don't want to break many existing
1741  setups, we only give warning, not error.
1742  */
1743  sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
1744  " so replication "
1745  "may break when this MySQL server acts as a "
1746  "slave and has his hostname changed!! Please "
1747  "use '--relay-log=%s' to avoid this problem.", ln);
1748  name_warning_sent= 1;
1749  }
1750 
1751  relay_log.is_relay_log= TRUE;
1752 
1753  if (relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1754  {
1755  sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
1756  DBUG_RETURN(1);
1757  }
1758 #ifndef DBUG_OFF
1759  global_sid_lock->wrlock();
1760  gtid_set.dbug_print("set of GTIDs in relay log before initialization");
1761  global_sid_lock->unlock();
1762 #endif
1763  if (!current_thd &&
1764  relay_log.init_gtid_sets(&gtid_set, NULL,
1765  opt_slave_sql_verify_checksum,
1766  true/*true=need lock*/))
1767  {
1768  sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
1769  DBUG_RETURN(1);
1770  }
1771 #ifndef DBUG_OFF
1772  global_sid_lock->wrlock();
1773  gtid_set.dbug_print("set of GTIDs in relay log after initialization");
1774  global_sid_lock->unlock();
1775 #endif
1776  /*
1777  Configures what object is used by the current log to store processed
1778  gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
1779  corretly compute the set of previous gtids.
1780  */
1781  relay_log.set_previous_gtid_set(&gtid_set);
1782  /*
1783  note, that if open() fails, we'll still have index file open
1784  but a destructor will take care of that
1785  */
1786  if (relay_log.open_binlog(ln, 0, SEQ_READ_APPEND,
1787  (max_relay_log_size ? max_relay_log_size :
1788  max_binlog_size), true,
1789  true/*need_lock_index=true*/,
1790  true/*need_sid_lock=true*/,
1791  mi->get_mi_description_event()))
1792  {
1793  sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
1794  DBUG_RETURN(1);
1795  }
1796  }
1797 
1798  /*
1799  This checks if the repository was created before and thus there
1800  will be values to be read. Please, do not move this call after
1801  the handler->init_info().
1802  */
1803  if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
1804  {
1805  msg= "Error checking relay log repository";
1806  error= 1;
1807  goto err;
1808  }
1809 
1810  if (handler->init_info())
1811  {
1812  msg= "Error reading relay log configuration";
1813  error= 1;
1814  goto err;
1815  }
1816 
1817  if (check_return == REPOSITORY_DOES_NOT_EXIST)
1818  {
1819  /* Init relay log with first entry in the relay index file */
1820  if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
1821  false/*need_data_lock=false (lock should be held
1822  prior to invoking this function)*/,
1823  &msg, 0))
1824  {
1825  error= 1;
1826  goto err;
1827  }
1828  group_master_log_name[0]= 0;
1829  group_master_log_pos= 0;
1830  }
1831  else
1832  {
1833  if (read_info(handler))
1834  {
1835  msg= "Error reading relay log configuration";
1836  error= 1;
1837  goto err;
1838  }
1839 
1840  if (is_relay_log_recovery && init_recovery(mi, &msg))
1841  {
1842  error= 1;
1843  goto err;
1844  }
1845 
1846  if (init_relay_log_pos(group_relay_log_name,
1847  group_relay_log_pos,
1848  false/*need_data_lock=false (lock should be held
1849  prior to invoking this function)*/,
1850  &msg, 0))
1851  {
1852  char llbuf[22];
1853  sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
1854  group_relay_log_name,
1855  llstr(group_relay_log_pos, llbuf));
1856  error= 1;
1857  goto err;
1858  }
1859 
1860 #ifndef DBUG_OFF
1861  {
1862  char llbuf1[22], llbuf2[22];
1863  DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
1864  llstr(my_b_tell(cur_log),llbuf1),
1865  llstr(event_relay_log_pos,llbuf2)));
1866  DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
1867  DBUG_ASSERT((my_b_tell(cur_log) == event_relay_log_pos));
1868  }
1869 #endif
1870  }
1871 
1872  inited= 1;
1873  error_on_rli_init_info= false;
1874  if (flush_info(TRUE))
1875  {
1876  msg= "Error reading relay log configuration";
1877  error= 1;
1878  goto err;
1879  }
1880 
1881  if (count_relay_log_space())
1882  {
1883  msg= "Error counting relay log space";
1884  error= 1;
1885  goto err;
1886  }
1887 
1888  is_relay_log_recovery= FALSE;
1889  DBUG_RETURN(error);
1890 
1891 err:
1892  handler->end_info();
1893  inited= 0;
1894  error_on_rli_init_info= true;
1895  if (msg)
1896  sql_print_error("%s.", msg);
1897  relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1898  DBUG_RETURN(error);
1899 }
1900 
1901 void Relay_log_info::end_info()
1902 {
1903  DBUG_ENTER("Relay_log_info::end_info");
1904 
1905  error_on_rli_init_info= false;
1906  if (!inited)
1907  DBUG_VOID_RETURN;
1908 
1909  handler->end_info();
1910 
1911  if (cur_log_fd >= 0)
1912  {
1913  end_io_cache(&cache_buf);
1914  (void)my_close(cur_log_fd, MYF(MY_WME));
1915  cur_log_fd= -1;
1916  }
1917  inited = 0;
1918  relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1919  relay_log.harvest_bytes_written(&log_space_total);
1920  /*
1921  Delete the slave's temporary tables from memory.
1922  In the future there will be other actions than this, to ensure persistance
1923  of slave's temp tables after shutdown.
1924  */
1925  close_temporary_tables();
1926 
1927  DBUG_VOID_RETURN;
1928 }
1929 
1930 int Relay_log_info::flush_current_log()
1931 {
1932  DBUG_ENTER("Relay_log_info::flush_current_log");
1933  /*
1934  When we come to this place in code, relay log may or not be initialized;
1935  the caller is responsible for setting 'flush_relay_log_cache' accordingly.
1936  */
1937  IO_CACHE *log_file= relay_log.get_log_file();
1938  if (flush_io_cache(log_file))
1939  DBUG_RETURN(2);
1940 
1941  DBUG_RETURN(0);
1942 }
1943 
1944 void Relay_log_info::set_master_info(Master_info* info)
1945 {
1946  mi= info;
1947 }
1948 
2002 int Relay_log_info::flush_info(const bool force)
2003 {
2004  DBUG_ENTER("Relay_log_info::flush_info");
2005 
2006  if (!inited)
2007  DBUG_RETURN(0);
2008 
2009  /*
2010  We update the sync_period at this point because only here we
2011  now that we are handling a relay log info. This needs to be
2012  update every time we call flush because the option maybe
2013  dinamically set.
2014  */
2015  handler->set_sync_period(sync_relayloginfo_period);
2016 
2017  if (write_info(handler))
2018  goto err;
2019 
2020  if (handler->flush_info(force))
2021  goto err;
2022 
2023  DBUG_RETURN(0);
2024 
2025 err:
2026  sql_print_error("Error writing relay log configuration.");
2027  DBUG_RETURN(1);
2028 }
2029 
2030 size_t Relay_log_info::get_number_info_rli_fields()
2031 {
2032  return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2033 }
2034 
2035 bool Relay_log_info::read_info(Rpl_info_handler *from)
2036 {
2037  int lines= 0;
2038  char *first_non_digit= NULL;
2039  ulong temp_group_relay_log_pos= 0;
2040  ulong temp_group_master_log_pos= 0;
2041  int temp_sql_delay= 0;
2042  int temp_internal_id= 0;
2043 
2044  DBUG_ENTER("Relay_log_info::read_info");
2045 
2046  /*
2047  Should not read RLI from file in client threads. Client threads
2048  only use RLI to execute BINLOG statements.
2049 
2050  @todo Uncomment the following assertion. Currently,
2051  Relay_log_info::init() is called from init_master_info() before
2052  the THD object Relay_log_info::sql_thd is created. That means we
2053  cannot call belongs_to_client() since belongs_to_client()
2054  dereferences Relay_log_info::sql_thd. So we need to refactor
2055  slightly: the THD object should be created by Relay_log_info
2056  constructor (or passed to it), so that we are guaranteed that it
2057  exists at this point. /Sven
2058  */
2059  //DBUG_ASSERT(!belongs_to_client());
2060 
2061  /*
2062  Starting from 5.1.x, relay-log.info has a new format. Now, its
2063  first line contains the number of lines in the file. By reading
2064  this number we can determine which version our master.info comes
2065  from. We can't simply count the lines in the file, since
2066  versions before 5.1.x could generate files with more lines than
2067  needed. If first line doesn't contain a number, or if it
2068  contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2069  then the file is treated like a file from pre-5.1.x version.
2070  There is no ambiguity when reading an old master.info: before
2071  5.1.x, the first line contained the binlog's name, which is
2072  either empty or has an extension (contains a '.'), so can't be
2073  confused with an integer.
2074 
2075  So we're just reading first line and trying to figure which
2076  version is this.
2077  */
2078 
2079  /*
2080  The first row is temporarily stored in mi->master_log_name, if
2081  it is line count and not binlog name (new format) it will be
2082  overwritten by the second row later.
2083  */
2084  if (from->prepare_info_for_read() ||
2085  from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
2086  (char *) ""))
2087  DBUG_RETURN(TRUE);
2088 
2089  lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2090 
2091  if (group_relay_log_name[0]!='\0' &&
2092  *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2093  {
2094  /* Seems to be new format => read group relay log name */
2095  if (from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
2096  (char *) ""))
2097  DBUG_RETURN(TRUE);
2098  }
2099  else
2100  DBUG_PRINT("info", ("relay_log_info file is in old format."));
2101 
2102  if (from->get_info((ulong *) &temp_group_relay_log_pos,
2103  (ulong) BIN_LOG_HEADER_SIZE) ||
2104  from->get_info(group_master_log_name,
2105  (size_t) sizeof(group_relay_log_name),
2106  (char *) "") ||
2107  from->get_info((ulong *) &temp_group_master_log_pos,
2108  (ulong) 0))
2109  DBUG_RETURN(TRUE);
2110 
2111  if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2112  {
2113  if (from->get_info((int *) &temp_sql_delay, (int) 0))
2114  DBUG_RETURN(TRUE);
2115  }
2116 
2117  if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2118  {
2119  if (from->get_info(&recovery_parallel_workers,(ulong) 0))
2120  DBUG_RETURN(TRUE);
2121  }
2122 
2123  if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2124  {
2125  if (from->get_info(&temp_internal_id, (int) 1))
2126  DBUG_RETURN(TRUE);
2127  }
2128 
2129  group_relay_log_pos= temp_group_relay_log_pos;
2130  group_master_log_pos= temp_group_master_log_pos;
2131  sql_delay= (int32) temp_sql_delay;
2132  internal_id= (uint) temp_internal_id;
2133 
2134  DBUG_ASSERT(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2135  (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2136  DBUG_RETURN(FALSE);
2137 }
2138 
2139 bool Relay_log_info::write_info(Rpl_info_handler *to)
2140 {
2141  DBUG_ENTER("Relay_log_info::write_info");
2142 
2143  /*
2144  @todo Uncomment the following assertion. See todo in
2145  Relay_log_info::read_info() for details. /Sven
2146  */
2147  //DBUG_ASSERT(!belongs_to_client());
2148 
2149  if (to->prepare_info_for_write() ||
2150  to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2151  to->set_info(group_relay_log_name) ||
2152  to->set_info((ulong) group_relay_log_pos) ||
2153  to->set_info(group_master_log_name) ||
2154  to->set_info((ulong) group_master_log_pos) ||
2155  to->set_info((int) sql_delay) ||
2156  to->set_info(recovery_parallel_workers) ||
2157  to->set_info((int) internal_id))
2158  DBUG_RETURN(TRUE);
2159 
2160  DBUG_RETURN(FALSE);
2161 }
2162 
2179 {
2180  DBUG_ASSERT(!info_thd || !is_mts_worker(info_thd) || !fe);
2181 
2182  if (fe)
2183  {
2184  adapt_to_master_version(fe);
2185  if (info_thd && is_parallel_exec())
2186  {
2187  for (uint i= 0; i < workers.elements; i++)
2188  {
2189  Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
2190  mysql_mutex_lock(&w->jobs_lock);
2191  if (w->running_status == Slave_worker::RUNNING)
2192  w->set_rli_description_event(fe);
2193  mysql_mutex_unlock(&w->jobs_lock);
2194  }
2195  }
2196  }
2197  delete rli_description_event;
2198  rli_description_event= fe;
2199 }
2200 
2202 {
2203  /*
2204  The enum must be in the version non-descending top-down order,
2205  the last item formally corresponds to highest possible server
2206  version (never reached, thereby no adapting actions here);
2207  enumeration starts from zero.
2208  */
2209  enum
2210  {
2211  WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2212  _END_OF_LIST // always last
2213  } item;
2214  /*
2215  Version where the feature is introduced.
2216  */
2217  uchar version_split[3];
2218  /*
2219  Action to perform when according to FormatDescriptor event Master
2220  is found to be feature-aware while previously it has *not* been.
2221  */
2222  void (*upgrade) (THD*);
2223  /*
2224  Action to perform when according to FormatDescriptor event Master
2225  is found to be feature-*un*aware while previously it has been.
2226  */
2227  void (*downgrade) (THD*);
2228 };
2229 
2230 void wl6292_upgrade_func(THD *thd)
2231 {
2232  thd->variables.explicit_defaults_for_timestamp= false;
2233  if (global_system_variables.explicit_defaults_for_timestamp)
2234  thd->variables.explicit_defaults_for_timestamp= true;
2235 
2236  return;
2237 }
2238 
2239 void wl6292_downgrade_func(THD *thd)
2240 {
2241  if (global_system_variables.explicit_defaults_for_timestamp)
2242  thd->variables.explicit_defaults_for_timestamp= false;
2243 
2244  return;
2245 }
2246 
2251 static st_feature_version s_features[]=
2252 {
2253  // order is the same as in the enum
2254  { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2255  {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2256  { st_feature_version::_END_OF_LIST,
2257  {255, 255, 255}, NULL, NULL }
2258 };
2259 
2288 {
2289  THD *thd=info_thd;
2290  ulong master_version, current_version;
2291  int changed= !fdle || ! rli_description_event ? 0 :
2292  (master_version= fdle->get_version_product()) -
2293  (current_version= rli_description_event->get_version_product());
2294 
2295  /* When the last version is not changed nothing to adapt for */
2296  if (!changed)
2297  return;
2298 
2299  /*
2300  find item starting from and ending at for which adaptive actions run
2301  for downgrade or upgrade branches.
2302  (todo: convert into bsearch when number of features will grow significantly)
2303  */
2304  bool downgrade= changed < 0;
2305  long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
2306 
2307  for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
2308  {
2309  ulong ver_f= version_product(s_features[i].version_split);
2310 
2311  if ((downgrade ? master_version : current_version) < ver_f &&
2312  i_first == st_feature_version::_END_OF_LIST)
2313  i_first= i;
2314  if ((downgrade ? current_version : master_version) < ver_f)
2315  {
2316  i_last= i;
2317  DBUG_ASSERT(i_last >= i_first);
2318  break;
2319  }
2320  }
2321 
2322  /*
2323  actions, executed in version non-descending st_feature_version order
2324  */
2325  for (i= i_first; i < i_last; i++)
2326  {
2327  /* Run time check of the st_feature_version items ordering */
2328  DBUG_ASSERT(!i ||
2329  version_product(s_features[i - 1].version_split) <=
2330  version_product(s_features[i].version_split));
2331 
2332  DBUG_ASSERT((downgrade ? master_version : current_version) <
2333  version_product(s_features[i].version_split) &&
2334  (downgrade ? current_version : master_version >=
2335  version_product(s_features[i].version_split)));
2336 
2337  if (downgrade && s_features[i].downgrade)
2338  {
2339  s_features[i].downgrade(thd);
2340  }
2341  else if (s_features[i].upgrade)
2342  {
2343  s_features[i].upgrade(thd);
2344  }
2345  }
2346 }