MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
binlog.cc
1 /* Copyright (c) 2009, 2013 Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software Foundation,
14  51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
15 
16 
17 #include "my_global.h"
18 #include "log.h"
19 #include "binlog.h"
20 #include "log_event.h"
21 #include "rpl_filter.h"
22 #include "rpl_rli.h"
23 #include "sql_plugin.h"
24 #include "rpl_handler.h"
25 #include "rpl_info_factory.h"
26 #include "rpl_utility.h"
27 #include "debug_sync.h"
28 #include "global_threads.h"
29 #include "sql_show.h"
30 #include "sql_parse.h"
31 #include "rpl_mi.h"
32 #include <list>
33 #include <string>
34 #include <my_stacktrace.h>
35 
36 using std::max;
37 using std::min;
38 using std::string;
39 using std::list;
40 
41 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
42 
48 #define MY_OFF_T_UNDEF (~(my_off_t)0UL)
49 
50 /*
51  Constants required for the limit unsafe warnings suppression
52  */
53 //seconds after which the limit unsafe warnings suppression will be activated
54 #define LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT 50
55 //number of limit unsafe warnings after which the suppression will be activated
56 #define LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT 50
57 #define MAX_SESSION_ATTACH_TRIES 10
58 
59 static ulonglong limit_unsafe_suppression_start_time= 0;
60 static bool unsafe_warning_suppression_is_activated= false;
61 static int limit_unsafe_warning_count= 0;
62 
63 static handlerton *binlog_hton;
64 bool opt_binlog_order_commits= true;
65 
66 const char *log_bin_index= 0;
67 const char *log_bin_basename= 0;
68 
69 MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period);
70 
71 static int binlog_init(void *p);
72 static int binlog_start_trans_and_stmt(THD *thd, Log_event *start_event);
73 static int binlog_close_connection(handlerton *hton, THD *thd);
74 static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv);
75 static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv);
76 static int binlog_commit(handlerton *hton, THD *thd, bool all);
77 static int binlog_rollback(handlerton *hton, THD *thd, bool all);
78 static int binlog_prepare(handlerton *hton, THD *thd, bool all);
79 
80 
90 {
91 public:
93  : m_mutex(mutex)
94  {
95  if (m_mutex)
96  mysql_mutex_lock(mutex);
97  }
98 
99  ~Mutex_sentry()
100  {
101  if (m_mutex)
102  mysql_mutex_unlock(m_mutex);
103 #ifndef DBUG_OFF
104  m_mutex= 0;
105 #endif
106  }
107 
108 private:
109  mysql_mutex_t *m_mutex;
110 
111  // It's not allowed to copy this object in any way
112  Mutex_sentry(Mutex_sentry const&);
113  void operator=(Mutex_sentry const&);
114 };
115 
116 
121 static void print_system_time()
122 {
123 #ifdef __WIN__
124  SYSTEMTIME utc_time;
125  GetSystemTime(&utc_time);
126  const long hrs= utc_time.wHour;
127  const long mins= utc_time.wMinute;
128  const long secs= utc_time.wSecond;
129 #else
130  /* Using time() instead of my_time() to avoid looping */
131  const time_t curr_time= time(NULL);
132  /* Calculate time of day */
133  const long tmins = curr_time / 60;
134  const long thrs = tmins / 60;
135  const long hrs = thrs % 24;
136  const long mins = tmins % 60;
137  const long secs = curr_time % 60;
138 #endif
139  char hrs_buf[3]= "00";
140  char mins_buf[3]= "00";
141  char secs_buf[3]= "00";
142  int base= 10;
143  my_safe_itoa(base, hrs, &hrs_buf[2]);
144  my_safe_itoa(base, mins, &mins_buf[2]);
145  my_safe_itoa(base, secs, &secs_buf[2]);
146 
147  my_safe_printf_stderr("---------- %s:%s:%s UTC - ",
148  hrs_buf, mins_buf, secs_buf);
149 }
150 
151 
177 {
178 public:
179  Thread_excursion(THD *thd)
180  : m_original_thd(thd)
181 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
182  , m_saved_psi(PSI_server ? PSI_server->get_thread() : NULL)
183 #endif
184  {
185  }
186 
187  ~Thread_excursion() {
188 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
189  if (PSI_server)
190  PSI_server->set_thread(m_saved_psi);
191 #endif
192 #ifndef EMBEDDED_LIBRARY
193  if (unlikely(setup_thread_globals(m_original_thd)))
194  DBUG_ASSERT(0); // Out of memory?!
195 #endif
196  }
197 
207  void try_to_attach_to(THD *thd)
208  {
209  int i= 0;
210  /*
211  Attach the POSIX thread to a session in MAX_SESSION_ATTACH_TRIES
212  tries when encountering 'out of memory' error.
213  */
214  while (i < MAX_SESSION_ATTACH_TRIES)
215  {
216  /*
217  Currently attach_to(...) returns ER_OUTOFMEMORY or 0. So
218  we continue to attach the POSIX thread when encountering
219  the ER_OUTOFMEMORY error. Please take care other error
220  returned from attach_to(...) in future.
221  */
222  if (!attach_to(thd))
223  {
224  if (i > 0)
225  sql_print_warning("Server overcomes the temporary 'out of memory' "
226  "in '%d' tries while attaching to session thread "
227  "during the group commit phase.\n", i + 1);
228  break;
229  }
230  i++;
231  }
232  /*
233  Terminate the server after failed to attach the POSIX thread
234  to a session in MAX_SESSION_ATTACH_TRIES tries.
235  */
236  if (MAX_SESSION_ATTACH_TRIES == i)
237  {
238  print_system_time();
239  my_safe_printf_stderr("%s", "[Fatal] Out of memory while attaching to "
240  "session thread during the group commit phase. "
241  "Data consistency between master and slave can "
242  "be guaranteed after server restarts.\n");
243  _exit(EXIT_FAILURE);
244  }
245  }
246 
247 private:
248 
252  int attach_to(THD *thd)
253  {
254 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
255  if (PSI_server)
256  PSI_server->set_thread(thd_get_psi(thd));
257 #endif
258 #ifndef EMBEDDED_LIBRARY
259  if (DBUG_EVALUATE_IF("simulate_session_attach_error", 1, 0)
260  || unlikely(setup_thread_globals(thd)))
261  {
262 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
263  if (PSI_server)
264  PSI_server->set_thread(m_saved_psi);
265 #endif /* WITH_PERFSCHEMA_STORAGE_ENGINE */
266  /*
267  Indirectly uses pthread_setspecific, which can only return
268  ENOMEM or EINVAL. Since store_globals are using correct keys,
269  the only alternative is out of memory.
270  */
271  return ER_OUTOFMEMORY;
272  }
273 #endif /* EMBEDDED_LIBRARY */
274  return 0;
275  }
276 
277  int setup_thread_globals(THD *thd) const {
278  int error= 0;
279  THD *original_thd= my_pthread_getspecific(THD*, THR_THD);
280  MEM_ROOT* original_mem_root= my_pthread_getspecific(MEM_ROOT*, THR_MALLOC);
281  if ((error= my_pthread_setspecific_ptr(THR_THD, thd)))
282  goto exit0;
283  if ((error= my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root)))
284  goto exit1;
285  if ((error= set_mysys_var(thd->mysys_var)))
286  goto exit2;
287  goto exit0;
288 exit2:
289  error= my_pthread_setspecific_ptr(THR_MALLOC, original_mem_root);
290 exit1:
291  error= my_pthread_setspecific_ptr(THR_THD, original_thd);
292 exit0:
293  return error;
294  }
295 
296  THD *m_original_thd;
297  PSI_thread *m_saved_psi;
298 };
299 
300 
310 {
311 public:
312 
313  binlog_cache_data(bool trx_cache_arg,
314  my_off_t max_binlog_cache_size_arg,
315  ulong *ptr_binlog_cache_use_arg,
316  ulong *ptr_binlog_cache_disk_use_arg)
317  : m_pending(0), saved_max_binlog_cache_size(max_binlog_cache_size_arg),
318  ptr_binlog_cache_use(ptr_binlog_cache_use_arg),
319  ptr_binlog_cache_disk_use(ptr_binlog_cache_disk_use_arg)
320  {
321  reset();
322  flags.transactional= trx_cache_arg;
323  cache_log.end_of_file= saved_max_binlog_cache_size;
324  }
325 
326  int finalize(THD *thd, Log_event *end_event);
327  int flush(THD *thd, my_off_t *bytes, bool *wrote_xid);
328  int write_event(THD *thd, Log_event *event);
329 
330  virtual ~binlog_cache_data()
331  {
332  DBUG_ASSERT(is_binlog_empty());
333  close_cached_file(&cache_log);
334  }
335 
336  bool is_binlog_empty() const
337  {
338  my_off_t pos= my_b_tell(&cache_log);
339  DBUG_PRINT("debug", ("%s_cache - pending: 0x%llx, bytes: %llu",
340  (flags.transactional ? "trx" : "stmt"),
341  (ulonglong) pending(), (ulonglong) pos));
342  return pending() == NULL && pos == 0;
343  }
344 
345  bool is_group_cache_empty() const
346  {
347  return group_cache.is_empty();
348  }
349 
350 #ifndef DBUG_OFF
351  bool dbug_is_finalized() const {
352  return flags.finalized;
353  }
354 #endif
355 
356  Rows_log_event *pending() const
357  {
358  return m_pending;
359  }
360 
361  void set_pending(Rows_log_event *const pending)
362  {
363  m_pending= pending;
364  }
365 
366  void set_incident(void)
367  {
368  flags.incident= true;
369  }
370 
371  bool has_incident(void) const
372  {
373  return flags.incident;
374  }
375 
376  bool has_xid() const {
377  // There should only be an XID event if we are transactional
378  DBUG_ASSERT((flags.transactional && flags.with_xid) || !flags.with_xid);
379  return flags.with_xid;
380  }
381 
382  bool is_trx_cache() const
383  {
384  return flags.transactional;
385  }
386 
387  my_off_t get_byte_position() const
388  {
389  return my_b_tell(&cache_log);
390  }
391 
392  virtual void reset()
393  {
394  compute_statistics();
395  truncate(0);
396  flags.incident= false;
397  flags.with_xid= false;
398  flags.immediate= false;
399  flags.finalized= false;
400  /*
401  The truncate function calls reinit_io_cache that calls my_b_flush_io_cache
402  which may increase disk_writes. This breaks the disk_writes use by the
403  binary log which aims to compute the ratio between in-memory cache usage
404  and disk cache usage. To avoid this undesirable behavior, we reset the
405  variable after truncating the cache.
406  */
407  cache_log.disk_writes= 0;
408  group_cache.clear();
409  DBUG_ASSERT(is_binlog_empty());
410  }
411 
412  /*
413  Sets the write position to point at the position given. If the
414  cache has swapped to a file, it reinitializes it, so that the
415  proper data is added to the IO_CACHE buffer. Otherwise, it just
416  does a my_b_seek.
417 
418  my_b_seek will not work if the cache has swapped, that's why
419  we do this workaround.
420 
421  @param[IN] pos the new write position.
422  @param[IN] use_reinit if the position should be reset resorting
423  to reset_io_cache (which may issue a flush_io_cache
424  inside)
425 
426  @return The previous write position.
427  */
428  my_off_t reset_write_pos(my_off_t pos, bool use_reinit)
429  {
430  DBUG_ENTER("reset_write_pos");
431  DBUG_ASSERT(cache_log.type == WRITE_CACHE);
432 
433  my_off_t oldpos= get_byte_position();
434 
435  if (use_reinit)
436  reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0);
437  else
438  my_b_seek(&cache_log, pos);
439 
440  DBUG_RETURN(oldpos);
441  }
442 
443  /*
444  Cache to store data before copying it to the binary log.
445  */
446  IO_CACHE cache_log;
447 
452 
453 protected:
454  /*
455  It truncates the cache to a certain position. This includes deleting the
456  pending event.
457  */
458  void truncate(my_off_t pos)
459  {
460  DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos));
462  reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0);
463  cache_log.end_of_file= saved_max_binlog_cache_size;
464  }
465 
469  int flush_pending_event(THD *thd) {
470  if (m_pending)
471  {
472  m_pending->set_flags(Rows_log_event::STMT_END_F);
473  if (int error= write_event(thd, m_pending))
474  return error;
475  thd->clear_binlog_table_maps();
476  }
477  return 0;
478  }
479 
484  delete m_pending;
485  m_pending= NULL;
486  return 0;
487  }
488  struct Flags {
489  /*
490  Defines if this is either a trx-cache or stmt-cache, respectively, a
491  transactional or non-transactional cache.
492  */
493  bool transactional:1;
494 
495  /*
496  This indicates that some events did not get into the cache and most likely
497  it is corrupted.
498  */
499  bool incident:1;
500 
501  /*
502  This indicates that the cache should be written without BEGIN/END.
503  */
504  bool immediate:1;
505 
506  /*
507  This flag indicates that the buffer was finalized and has to be
508  flushed to disk.
509  */
510  bool finalized:1;
511 
512  /*
513  This indicates that the cache contain an XID event.
514  */
515  bool with_xid:1;
516  } flags;
517 
518 private:
519  /*
520  Pending binrows event. This event is the event where the rows are currently
521  written.
522  */
523  Rows_log_event *m_pending;
524 
528  void compute_statistics()
529  {
530  if (!is_binlog_empty())
531  {
532  statistic_increment(*ptr_binlog_cache_use, &LOCK_status);
533  if (cache_log.disk_writes != 0)
534  statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status);
535  }
536  }
537 
538  /*
539  Stores the values of maximum size of the cache allowed when this cache
540  is configured. This corresponds to either
541  . max_binlog_cache_size or max_binlog_stmt_cache_size.
542  */
543  my_off_t saved_max_binlog_cache_size;
544 
545  /*
546  Stores a pointer to the status variable that keeps track of the in-memory
547  cache usage. This corresponds to either
548  . binlog_cache_use or binlog_stmt_cache_use.
549  */
550  ulong *ptr_binlog_cache_use;
551 
552  /*
553  Stores a pointer to the status variable that keeps track of the disk
554  cache usage. This corresponds to either
555  . binlog_cache_disk_use or binlog_stmt_cache_disk_use.
556  */
557  ulong *ptr_binlog_cache_disk_use;
558 
559  binlog_cache_data& operator=(const binlog_cache_data& info);
561 };
562 
563 
565  : public binlog_cache_data
566 {
567 public:
568  binlog_stmt_cache_data(bool trx_cache_arg,
569  my_off_t max_binlog_cache_size_arg,
570  ulong *ptr_binlog_cache_use_arg,
571  ulong *ptr_binlog_cache_disk_use_arg)
572  : binlog_cache_data(trx_cache_arg,
573  max_binlog_cache_size_arg,
574  ptr_binlog_cache_use_arg,
575  ptr_binlog_cache_disk_use_arg)
576  {
577  }
578 
580 
581  int finalize(THD *thd);
582 };
583 
584 
585 int
586 binlog_stmt_cache_data::finalize(THD *thd)
587 {
588  if (flags.immediate)
589  {
590  if (int error= finalize(thd, NULL))
591  return error;
592  }
593  else
594  {
596  end_evt(thd, STRING_WITH_LEN("COMMIT"), false, false, true, 0, true);
597  if (int error= finalize(thd, &end_evt))
598  return error;
599  }
600  return 0;
601 }
602 
603 
605 {
606 public:
607  binlog_trx_cache_data(bool trx_cache_arg,
608  my_off_t max_binlog_cache_size_arg,
609  ulong *ptr_binlog_cache_use_arg,
610  ulong *ptr_binlog_cache_disk_use_arg)
611  : binlog_cache_data(trx_cache_arg,
612  max_binlog_cache_size_arg,
613  ptr_binlog_cache_use_arg,
614  ptr_binlog_cache_disk_use_arg),
615  m_cannot_rollback(FALSE), before_stmt_pos(MY_OFF_T_UNDEF)
616  { }
617 
618  void reset()
619  {
620  DBUG_ENTER("reset");
621  DBUG_PRINT("enter", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
622  m_cannot_rollback= FALSE;
623  before_stmt_pos= MY_OFF_T_UNDEF;
624  binlog_cache_data::reset();
625  DBUG_PRINT("return", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
626  DBUG_VOID_RETURN;
627  }
628 
629  bool cannot_rollback() const
630  {
631  return m_cannot_rollback;
632  }
633 
634  void set_cannot_rollback()
635  {
636  m_cannot_rollback= TRUE;
637  }
638 
639  my_off_t get_prev_position() const
640  {
641  return before_stmt_pos;
642  }
643 
644  void set_prev_position(my_off_t pos)
645  {
646  DBUG_ENTER("set_prev_position");
647  DBUG_PRINT("enter", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
648  before_stmt_pos= pos;
649  DBUG_PRINT("return", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
650  DBUG_VOID_RETURN;
651  }
652 
653  void restore_prev_position()
654  {
655  DBUG_ENTER("restore_prev_position");
656  DBUG_PRINT("enter", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
657  binlog_cache_data::truncate(before_stmt_pos);
658  before_stmt_pos= MY_OFF_T_UNDEF;
659  DBUG_PRINT("return", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
660  DBUG_VOID_RETURN;
661  }
662 
663  void restore_savepoint(my_off_t pos)
664  {
665  DBUG_ENTER("restore_savepoint");
666  DBUG_PRINT("enter", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
668  if (pos <= before_stmt_pos)
669  before_stmt_pos= MY_OFF_T_UNDEF;
670  DBUG_PRINT("return", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
671  DBUG_VOID_RETURN;
672  }
673 
675 
676  int truncate(THD *thd, bool all);
677 
678 private:
679  /*
680  It will be set TRUE if any statement which cannot be rolled back safely
681  is put in trx_cache.
682  */
683  bool m_cannot_rollback;
684 
685  /*
686  Binlog position before the start of the current statement.
687  */
688  my_off_t before_stmt_pos;
689 
690  binlog_trx_cache_data& operator=(const binlog_trx_cache_data& info);
692 };
693 
695 public:
696  binlog_cache_mngr(my_off_t max_binlog_stmt_cache_size_arg,
697  ulong *ptr_binlog_stmt_cache_use_arg,
698  ulong *ptr_binlog_stmt_cache_disk_use_arg,
699  my_off_t max_binlog_cache_size_arg,
700  ulong *ptr_binlog_cache_use_arg,
701  ulong *ptr_binlog_cache_disk_use_arg)
702  : stmt_cache(FALSE, max_binlog_stmt_cache_size_arg,
703  ptr_binlog_stmt_cache_use_arg,
704  ptr_binlog_stmt_cache_disk_use_arg),
705  trx_cache(TRUE, max_binlog_cache_size_arg,
706  ptr_binlog_cache_use_arg,
707  ptr_binlog_cache_disk_use_arg)
708  { }
709 
710  binlog_cache_data* get_binlog_cache_data(bool is_transactional)
711  {
712  if (is_transactional)
713  return &trx_cache;
714  else
715  return &stmt_cache;
716  }
717 
718  IO_CACHE* get_binlog_cache_log(bool is_transactional)
719  {
720  return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log);
721  }
722 
726  bool is_binlog_empty() const {
727  return stmt_cache.is_binlog_empty() && trx_cache.is_binlog_empty();
728  }
729 
730 #ifndef DBUG_OFF
731  bool dbug_any_finalized() const {
732  return stmt_cache.dbug_is_finalized() || trx_cache.dbug_is_finalized();
733  }
734 #endif
735 
736  /*
737  Convenience method to flush both caches to the binary log.
738 
739  @param bytes_written Pointer to variable that will be set to the
740  number of bytes written for the flush.
741  @param wrote_xid Pointer to variable that will be set to @c
742  true if any XID event was written to the
743  binary log. Otherwise, the variable will not
744  be touched.
745  @return Error code on error, zero if no error.
746  */
747  int flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
748  {
749  my_off_t stmt_bytes= 0;
750  my_off_t trx_bytes= 0;
751  DBUG_ASSERT(stmt_cache.has_xid() == 0 && trx_cache.has_xid() <= 1);
752  if (int error= stmt_cache.flush(thd, &stmt_bytes, wrote_xid))
753  return error;
754  if (int error= trx_cache.flush(thd, &trx_bytes, wrote_xid))
755  return error;
756  *bytes_written= stmt_bytes + trx_bytes;
757  return 0;
758  }
759 
760  binlog_stmt_cache_data stmt_cache;
761  binlog_trx_cache_data trx_cache;
762 
763 private:
764 
765  binlog_cache_mngr& operator=(const binlog_cache_mngr& info);
767 };
768 
769 
770 static binlog_cache_mngr *thd_get_cache_mngr(const THD *thd)
771 {
772  /*
773  If opt_bin_log is not set, binlog_hton->slot == -1 and hence
774  thd_get_ha_data(thd, hton) segfaults.
775  */
776  DBUG_ASSERT(opt_bin_log);
777  return (binlog_cache_mngr *)thd_get_ha_data(thd, binlog_hton);
778 }
779 
780 
786 {
787  if (binlog_cache_size > max_binlog_cache_size)
788  {
789  if (thd)
790  {
791  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
792  ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX,
793  ER(ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX),
794  (ulong) binlog_cache_size,
795  (ulong) max_binlog_cache_size);
796  }
797  else
798  {
799  sql_print_warning(ER_DEFAULT(ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX),
800  (ulong) binlog_cache_size,
801  (ulong) max_binlog_cache_size);
802  }
803  binlog_cache_size= max_binlog_cache_size;
804  }
805 }
806 
812 {
813  if (binlog_stmt_cache_size > max_binlog_stmt_cache_size)
814  {
815  if (thd)
816  {
817  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
818  ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX,
819  ER(ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX),
820  (ulong) binlog_stmt_cache_size,
821  (ulong) max_binlog_stmt_cache_size);
822  }
823  else
824  {
825  sql_print_warning(ER_DEFAULT(ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX),
826  (ulong) binlog_stmt_cache_size,
827  (ulong) max_binlog_stmt_cache_size);
828  }
829  binlog_stmt_cache_size= max_binlog_stmt_cache_size;
830  }
831 }
832 
837 {
838  return(binlog_hton && binlog_hton->slot != HA_SLOT_UNDEF);
839 }
840 
841  /*
842  Save position of binary log transaction cache.
843 
844  SYNPOSIS
845  binlog_trans_log_savepos()
846 
847  thd The thread to take the binlog data from
848  pos Pointer to variable where the position will be stored
849 
850  DESCRIPTION
851 
852  Save the current position in the binary log transaction cache into
853  the variable pointed to by 'pos'
854  */
855 
856 static void
857 binlog_trans_log_savepos(THD *thd, my_off_t *pos)
858 {
859  DBUG_ENTER("binlog_trans_log_savepos");
860  DBUG_ASSERT(pos != NULL);
861  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
862  DBUG_ASSERT(mysql_bin_log.is_open());
863  *pos= cache_mngr->trx_cache.get_byte_position();
864  DBUG_PRINT("return", ("position: %lu", (ulong) *pos));
865  DBUG_VOID_RETURN;
866 }
867 
868 
869 /*
870  this function is mostly a placeholder.
871  conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open)
872  should be moved here.
873 */
874 
875 static int binlog_init(void *p)
876 {
877  binlog_hton= (handlerton *)p;
878  binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
879  binlog_hton->db_type=DB_TYPE_BINLOG;
880  binlog_hton->savepoint_offset= sizeof(my_off_t);
881  binlog_hton->close_connection= binlog_close_connection;
882  binlog_hton->savepoint_set= binlog_savepoint_set;
883  binlog_hton->savepoint_rollback= binlog_savepoint_rollback;
884  binlog_hton->commit= binlog_commit;
885  binlog_hton->rollback= binlog_rollback;
886  binlog_hton->prepare= binlog_prepare;
887  binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
888  return 0;
889 }
890 
891 static int binlog_close_connection(handlerton *hton, THD *thd)
892 {
893  DBUG_ENTER("binlog_close_connection");
894  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
895  DBUG_ASSERT(cache_mngr->is_binlog_empty());
896  DBUG_ASSERT(cache_mngr->trx_cache.is_group_cache_empty() &&
897  cache_mngr->stmt_cache.is_group_cache_empty());
898  DBUG_PRINT("debug", ("Set ha_data slot %d to 0x%llx", binlog_hton->slot, (ulonglong) NULL));
899  thd_set_ha_data(thd, binlog_hton, NULL);
900  cache_mngr->~binlog_cache_mngr();
901  my_free(cache_mngr);
902  DBUG_RETURN(0);
903 }
904 
905 int binlog_cache_data::write_event(THD *thd, Log_event *ev)
906 {
907  DBUG_ENTER("binlog_cache_data::write_event");
908 
909  if (gtid_mode > 0)
910  {
912  group_cache.add_logged_group(thd, get_byte_position());
913  if (status == Group_cache::ERROR)
914  DBUG_RETURN(1);
915  else if (status == Group_cache::APPEND_NEW_GROUP)
916  {
917  Gtid_log_event gtid_ev(thd, is_trx_cache());
918  if (gtid_ev.write(&cache_log) != 0)
919  DBUG_RETURN(1);
920  }
921  }
922 
923  if (ev != NULL)
924  {
925  DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
926  {DBUG_SET("+d,simulate_file_write_error");});
927  if (ev->write(&cache_log) != 0)
928  {
929  DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
930  {
931  DBUG_SET("-d,simulate_file_write_error");
932  DBUG_SET("-d,simulate_disk_full_at_flush_pending");
933  /*
934  after +d,simulate_file_write_error the local cache
935  is in unsane state. Since -d,simulate_file_write_error
936  revokes the first simulation do_write_cache()
937  can't be run without facing an assert.
938  So it's blocked with the following 2nd simulation:
939  */
940  DBUG_SET("+d,simulate_do_write_cache_failure");
941  });
942  DBUG_RETURN(1);
943  }
944  if (ev->get_type_code() == XID_EVENT)
945  flags.with_xid= true;
946  if (ev->is_using_immediate_logging())
947  flags.immediate= true;
948  }
949  DBUG_RETURN(0);
950 }
951 
952 
963 static int write_one_empty_group_to_cache(THD *thd,
964  binlog_cache_data *cache_data,
965  Gtid gtid)
966 {
967  DBUG_ENTER("write_one_empty_group_to_cache");
968  Group_cache *group_cache= &cache_data->group_cache;
969  if (group_cache->contains_gtid(gtid))
970  DBUG_RETURN(0);
971  /*
972  Apparently this code is not being called. We need to
973  investigate if this is a bug or this code is not
974  necessary. /Alfranio
975 
976  Empty groups are currently being handled in the function
977  gtid_empty_group_log_and_cleanup().
978  */
979  DBUG_ASSERT(0); /*NOTREACHED*/
980 #ifdef NON_ERROR_GTID
981  IO_CACHE *cache= &cache_data->cache_log;
982  Group_cache::enum_add_group_status status= group_cache->add_empty_group(gtid);
983  if (status == Group_cache::ERROR)
984  DBUG_RETURN(1);
985  DBUG_ASSERT(status == Group_cache::APPEND_NEW_GROUP);
986  Gtid_specification spec= { GTID_GROUP, gtid };
987  Gtid_log_event gtid_ev(thd, cache_data->is_trx_cache(), &spec);
988  if (gtid_ev.write(cache) != 0)
989  DBUG_RETURN(1);
990 #endif
991  DBUG_RETURN(0);
992 }
993 
1003 static int write_empty_groups_to_cache(THD *thd, binlog_cache_data *cache_data)
1004 {
1005  DBUG_ENTER("write_empty_groups_to_cache");
1006  if (thd->owned_gtid.sidno == -1)
1007  {
1008 #ifdef HAVE_GTID_NEXT_LIST
1009  Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
1010  Gtid gtid= git.get();
1011  while (gtid.sidno != 0)
1012  {
1013  if (write_one_empty_group_to_cache(thd, cache_data, gtid) != 0)
1014  DBUG_RETURN(1);
1015  git.next();
1016  gtid= git.get();
1017  }
1018 #else
1019  DBUG_ASSERT(0);
1020 #endif
1021  }
1022  else if (thd->owned_gtid.sidno > 0)
1023  if (write_one_empty_group_to_cache(thd, cache_data, thd->owned_gtid) != 0)
1024  DBUG_RETURN(1);
1025  DBUG_RETURN(0);
1026 }
1027 
1028 
1033 static int
1034 gtid_before_write_cache(THD* thd, binlog_cache_data* cache_data)
1035 {
1036  DBUG_ENTER("gtid_before_write_cache");
1037  int error= 0;
1038 
1039  DBUG_ASSERT(thd->variables.gtid_next.type != UNDEFINED_GROUP);
1040 
1041  if (gtid_mode == 0)
1042  DBUG_RETURN(0);
1043 
1044  Group_cache* group_cache= &cache_data->group_cache;
1045 
1046  global_sid_lock->rdlock();
1047 
1048  if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
1049  {
1050  if (group_cache->generate_automatic_gno(thd) !=
1051  RETURN_STATUS_OK)
1052  {
1053  global_sid_lock->unlock();
1054  DBUG_RETURN(1);
1055  }
1056  }
1057  if (write_empty_groups_to_cache(thd, cache_data) != 0)
1058  {
1059  global_sid_lock->unlock();
1060  DBUG_RETURN(1);
1061  }
1062 
1063  global_sid_lock->unlock();
1064 
1065  /*
1066  If an automatic group number was generated, change the first event
1067  into a "real" one.
1068  */
1069  if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
1070  {
1071  DBUG_ASSERT(group_cache->get_n_groups() == 1);
1072  Cached_group *cached_group= group_cache->get_unsafe_pointer(0);
1073  DBUG_ASSERT(cached_group->spec.type != AUTOMATIC_GROUP);
1074  Gtid_log_event gtid_ev(thd, cache_data->is_trx_cache(),
1075  &cached_group->spec);
1076  bool using_file= cache_data->cache_log.pos_in_file > 0;
1077  my_off_t saved_position= cache_data->reset_write_pos(0, using_file);
1078  error= gtid_ev.write(&cache_data->cache_log);
1079  cache_data->reset_write_pos(saved_position, using_file);
1080  }
1081 
1082  DBUG_RETURN(error);
1083 }
1084 
1099 {
1100  int ret= 1;
1101  binlog_cache_data* cache_data= NULL;
1102 
1103  DBUG_ENTER("gtid_empty_group_log_and_cleanup");
1104 
1105  Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE,
1106  FALSE, TRUE, 0, TRUE);
1107  DBUG_ASSERT(!qinfo.is_using_immediate_logging());
1108 
1109  /*
1110  thd->cache_mngr is uninitialized on the first empty transaction.
1111  */
1112  if (thd->binlog_setup_trx_data())
1113  DBUG_RETURN(1);
1114  cache_data= &thd_get_cache_mngr(thd)->trx_cache;
1115  DBUG_PRINT("debug", ("Writing to trx_cache"));
1116  if (cache_data->write_event(thd, &qinfo) ||
1117  gtid_before_write_cache(thd, cache_data))
1118  goto err;
1119 
1120  ret= mysql_bin_log.commit(thd, true);
1121 
1122 err:
1123  DBUG_RETURN(ret);
1124 }
1125 
1142 int
1144 {
1145  DBUG_ENTER("binlog_cache_data::finalize");
1146  if (!is_binlog_empty())
1147  {
1148  DBUG_ASSERT(!flags.finalized);
1149  if (int error= flush_pending_event(thd))
1150  DBUG_RETURN(error);
1151  if (int error= write_event(thd, end_event))
1152  DBUG_RETURN(error);
1153  flags.finalized= true;
1154  DBUG_PRINT("debug", ("flags.finalized: %s", YESNO(flags.finalized)));
1155  }
1156  DBUG_RETURN(0);
1157 }
1158 
1177 int
1178 binlog_cache_data::flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
1179 {
1180  /*
1181  Doing a commit or a rollback including non-transactional tables,
1182  i.e., ending a transaction where we might write the transaction
1183  cache to the binary log.
1184 
1185  We can always end the statement when ending a transaction since
1186  transactions are not allowed inside stored functions. If they
1187  were, we would have to ensure that we're not ending a statement
1188  inside a stored function.
1189  */
1190  DBUG_ENTER("binlog_cache_data::flush");
1191  DBUG_PRINT("debug", ("flags.finalized: %s", YESNO(flags.finalized)));
1192  int error= 0;
1193  if (flags.finalized)
1194  {
1195  my_off_t bytes_in_cache= my_b_tell(&cache_log);
1196  DBUG_PRINT("debug", ("bytes_in_cache: %llu", bytes_in_cache));
1197  /*
1198  The cache is always reset since subsequent rollbacks of the
1199  transactions might trigger attempts to write to the binary log
1200  if the cache is not reset.
1201  */
1202  if (!(error= gtid_before_write_cache(thd, this)))
1203  error= mysql_bin_log.write_cache(thd, this);
1204 
1205  if (flags.with_xid && error == 0)
1206  *wrote_xid= true;
1207 
1208  /*
1209  Reset have to be after the if above, since it clears the
1210  with_xid flag
1211  */
1212  reset();
1213  if (bytes_written)
1214  *bytes_written= bytes_in_cache;
1215  }
1216  DBUG_ASSERT(!flags.finalized);
1217  DBUG_RETURN(error);
1218 }
1219 
1232 int
1234 {
1235  DBUG_ENTER("binlog_trx_cache_data::truncate");
1236  int error=0;
1237 
1238  DBUG_PRINT("info", ("thd->options={ %s %s}, transaction: %s",
1239  FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
1240  FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
1241  all ? "all" : "stmt"));
1242 
1244 
1245  /*
1246  If rolling back an entire transaction or a single statement not
1247  inside a transaction, we reset the transaction cache.
1248  */
1249  if (ending_trans(thd, all))
1250  {
1251  if (has_incident())
1252  error= mysql_bin_log.write_incident(thd, true/*need_lock_log=true*/);
1253  reset();
1254  }
1255  /*
1256  If rolling back a statement in a transaction, we truncate the
1257  transaction cache to remove the statement.
1258  */
1259  else if (get_prev_position() != MY_OFF_T_UNDEF)
1260  {
1261  restore_prev_position();
1262  if (is_binlog_empty())
1263  {
1264  /*
1265  After restoring the previous position, we need to check if
1266  the cache is empty. In such case, the group cache needs to
1267  be cleaned up too because the GTID is removed too from the
1268  cache.
1269 
1270  So if any change happens again, the GTID must be rewritten
1271  and this will not happen if the group cache is not cleaned
1272  up.
1273 
1274  After integrating this with NDB, we need to check if the
1275  current approach is enough or the group cache needs to
1276  explicitly support rollback to savepoints.
1277  */
1278  group_cache.clear();
1279  }
1280  }
1281 
1282  thd->clear_binlog_table_maps();
1283 
1284  DBUG_RETURN(error);
1285 }
1286 
1287 static int binlog_prepare(handlerton *hton, THD *thd, bool all)
1288 {
1289  /*
1290  do nothing.
1291  just pretend we can do 2pc, so that MySQL won't
1292  switch to 1pc.
1293  real work will be done in MYSQL_BIN_LOG::commit()
1294  */
1295  return 0;
1296 }
1297 
1314 static int binlog_commit(handlerton *hton, THD *thd, bool all)
1315 {
1316  DBUG_ENTER("binlog_commit");
1317  /*
1318  Nothing to do (any more) on commit.
1319  */
1320  DBUG_RETURN(0);
1321 }
1322 
1342 static int binlog_rollback(handlerton *hton, THD *thd, bool all)
1343 {
1344  DBUG_ENTER("binlog_rollback");
1345  int error= 0;
1346  if (thd->lex->sql_command == SQLCOM_ROLLBACK_TO_SAVEPOINT)
1347  error= mysql_bin_log.rollback(thd, all);
1348  DBUG_RETURN(error);
1349 }
1350 
1351 
1352 bool
1354 {
1355  DBUG_ENTER("Stage_manager::Mutex_queue::append");
1356  lock();
1357  DBUG_PRINT("enter", ("first: 0x%llx", (ulonglong) first));
1358  DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1359  (ulonglong) m_first, (ulonglong) &m_first,
1360  (ulonglong) m_last));
1361  bool empty= (m_first == NULL);
1362  *m_last= first;
1363  DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1364  (ulonglong) m_first, (ulonglong) &m_first,
1365  (ulonglong) m_last));
1366  /*
1367  Go to the last THD instance of the list. We expect lists to be
1368  moderately short. If they are not, we need to track the end of
1369  the queue as well.
1370  */
1371  while (first->next_to_commit)
1372  first= first->next_to_commit;
1373  m_last= &first->next_to_commit;
1374  DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1375  (ulonglong) m_first, (ulonglong) &m_first,
1376  (ulonglong) m_last));
1377  DBUG_ASSERT(m_first || m_last == &m_first);
1378  DBUG_PRINT("return", ("empty: %s", YESNO(empty)));
1379  unlock();
1380  DBUG_RETURN(empty);
1381 }
1382 
1383 
1384 std::pair<bool, THD*>
1385 Stage_manager::Mutex_queue::pop_front()
1386 {
1387  DBUG_ENTER("Stage_manager::Mutex_queue::pop_front");
1388  lock();
1389  THD *result= m_first;
1390  bool more= true;
1391  /*
1392  We do not set next_to_commit to NULL here since this is only used
1393  in the flush stage. We will have to call fetch_queue last here,
1394  and will then "cut" the linked list by setting the end of that
1395  queue to NULL.
1396  */
1397  if (result)
1398  m_first= result->next_to_commit;
1399  if (m_first == NULL)
1400  {
1401  more= false;
1402  m_last = &m_first;
1403  }
1404  DBUG_ASSERT(m_first || m_last == &m_first);
1405  unlock();
1406  DBUG_PRINT("return", ("result: 0x%llx, more: %s",
1407  (ulonglong) result, YESNO(more)));
1408  DBUG_RETURN(std::make_pair(more, result));
1409 }
1410 
1411 
1412 bool
1413 Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex)
1414 {
1415  // If the queue was empty: we're the leader for this batch
1416  DBUG_PRINT("debug", ("Enqueue 0x%llx to queue for stage %d",
1417  (ulonglong) thd, stage));
1418  bool leader= m_queue[stage].append(thd);
1419 
1420  /*
1421  The stage mutex can be NULL if we are enrolling for the first
1422  stage.
1423  */
1424  if (stage_mutex)
1425  mysql_mutex_unlock(stage_mutex);
1426 
1427  /*
1428  If the queue was not empty, we're a follower and wait for the
1429  leader to process the queue. If we were holding a mutex, we have
1430  to release it before going to sleep.
1431  */
1432  if (!leader)
1433  {
1434  mysql_mutex_lock(&m_lock_done);
1435 #ifndef DBUG_OFF
1436  /*
1437  Leader can be awaiting all-clear to preempt follower's execution.
1438  With setting the status the follower ensures it won't execute anything
1439  including thread-specific code.
1440  */
1441  thd->transaction.flags.ready_preempt= 1;
1442  if (leader_await_preempt_status)
1443  mysql_cond_signal(&m_cond_preempt);
1444 #endif
1445  while (thd->transaction.flags.pending)
1446  mysql_cond_wait(&m_cond_done, &m_lock_done);
1447  mysql_mutex_unlock(&m_lock_done);
1448  }
1449  return leader;
1450 }
1451 
1452 
1454 {
1455  DBUG_ENTER("Stage_manager::Mutex_queue::fetch_and_empty");
1456  lock();
1457  DBUG_PRINT("enter", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1458  (ulonglong) m_first, (ulonglong) &m_first,
1459  (ulonglong) m_last));
1460  THD *result= m_first;
1461  m_first= NULL;
1462  m_last= &m_first;
1463  DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1464  (ulonglong) m_first, (ulonglong) &m_first,
1465  (ulonglong) m_last));
1466  DBUG_ASSERT(m_first || m_last == &m_first);
1467  DBUG_PRINT("return", ("result: 0x%llx", (ulonglong) result));
1468  unlock();
1469  DBUG_RETURN(result);
1470 }
1471 
1472 #ifndef DBUG_OFF
1474 {
1475  DBUG_ASSERT(head);
1476 
1477  mysql_mutex_lock(&m_lock_done);
1478  while(!head->transaction.flags.ready_preempt)
1479  {
1480  leader_await_preempt_status= true;
1481  mysql_cond_wait(&m_cond_preempt, &m_lock_done);
1482  }
1483  leader_await_preempt_status= false;
1484  mysql_mutex_unlock(&m_lock_done);
1485 }
1486 #endif
1487 
1520 int MYSQL_BIN_LOG::rollback(THD *thd, bool all)
1521 {
1522  int error= 0;
1523  bool stuff_logged= false;
1524 
1525  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
1526  DBUG_ENTER("MYSQL_BIN_LOG::rollback(THD *thd, bool all)");
1527  DBUG_PRINT("enter", ("all: %s, cache_mngr: 0x%llx, thd->is_error: %s",
1528  YESNO(all), (ulonglong) cache_mngr, YESNO(thd->is_error())));
1529 
1530  /*
1531  We roll back the transaction in the engines early since this will
1532  release locks and allow other transactions to start executing.
1533 
1534  If we are executing a ROLLBACK TO SAVEPOINT, we should only clear
1535  the caches since this function is called as part of the engine
1536  rollback.
1537  */
1538  if (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT)
1539  if ((error= ha_rollback_low(thd, all)))
1540  goto end;
1541 
1542  /*
1543  If there is no cache manager, or if there is nothing in the
1544  caches, there are no caches to roll back, so we're trivially done.
1545  */
1546  if (cache_mngr == NULL || cache_mngr->is_binlog_empty())
1547  goto end;
1548 
1549  DBUG_PRINT("debug",
1550  ("all.cannot_safely_rollback(): %s, trx_cache_empty: %s",
1551  YESNO(thd->transaction.all.cannot_safely_rollback()),
1552  YESNO(cache_mngr->trx_cache.is_binlog_empty())));
1553  DBUG_PRINT("debug",
1554  ("stmt.cannot_safely_rollback(): %s, stmt_cache_empty: %s",
1555  YESNO(thd->transaction.stmt.cannot_safely_rollback()),
1556  YESNO(cache_mngr->stmt_cache.is_binlog_empty())));
1557 
1558  /*
1559  If an incident event is set we do not flush the content of the statement
1560  cache because it may be corrupted.
1561  */
1562  if (cache_mngr->stmt_cache.has_incident())
1563  {
1564  error= write_incident(thd, true/*need_lock_log=true*/);
1565  cache_mngr->stmt_cache.reset();
1566  }
1567  else if (!cache_mngr->stmt_cache.is_binlog_empty())
1568  {
1569  if ((error= cache_mngr->stmt_cache.finalize(thd)))
1570  goto end;
1571  stuff_logged= true;
1572  }
1573 
1574  if (ending_trans(thd, all))
1575  {
1577  {
1578  /*
1579  If the transaction is being rolled back and contains changes that
1580  cannot be rolled back, the trx-cache's content is flushed.
1581  */
1583  end_evt(thd, STRING_WITH_LEN("ROLLBACK"), true, false, true, 0, true);
1584  error= cache_mngr->trx_cache.finalize(thd, &end_evt);
1585  stuff_logged= true;
1586  }
1587  else
1588  {
1589  /*
1590  If the transaction is being rolled back and its changes can be
1591  rolled back, the trx-cache's content is truncated.
1592  */
1593  error= cache_mngr->trx_cache.truncate(thd, all);
1594  }
1595  }
1596  else
1597  {
1598  /*
1599  If a statement is being rolled back, it is necessary to know
1600  exactly why a statement may not be safely rolled back as in
1601  some specific situations the trx-cache can be truncated.
1602 
1603  If a temporary table is created or dropped, the trx-cache is not
1604  truncated. Note that if the stmt-cache is used, there is nothing
1605  to truncate in the trx-cache.
1606 
1607  If a non-transactional table is updated and the binlog format is
1608  statement, the trx-cache is not truncated. The trx-cache is used
1609  when the direct option is off and a transactional table has been
1610  updated before the current statement in the context of the
1611  current transaction. Note that if the stmt-cache is used there is
1612  nothing to truncate in the trx-cache.
1613 
1614  If other binlog formats are used, updates to non-transactional
1615  tables are written to the stmt-cache and trx-cache can be safely
1616  truncated, if necessary.
1617  */
1618  if (thd->transaction.stmt.has_dropped_temp_table() ||
1619  thd->transaction.stmt.has_created_temp_table() ||
1620  (thd->transaction.stmt.has_modified_non_trans_table() &&
1621  thd->variables.binlog_format == BINLOG_FORMAT_STMT))
1622  {
1623  /*
1624  If the statement is being rolled back and dropped or created a
1625  temporary table or modified a non-transactional table and the
1626  statement-based replication is in use, the statement's changes
1627  in the trx-cache are preserved.
1628  */
1629  cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
1630  }
1631  else
1632  {
1633  /*
1634  Otherwise, the statement's changes in the trx-cache are
1635  truncated.
1636  */
1637  error= cache_mngr->trx_cache.truncate(thd, all);
1638  }
1639  }
1640 
1641  DBUG_PRINT("debug", ("error: %d", error));
1642  if (error == 0 && stuff_logged)
1643  error= ordered_commit(thd, all, /* skip_commit */ true);
1644 
1645  if (check_write_error(thd))
1646  {
1647  /*
1648  "all == true" means that a "rollback statement" triggered the error and
1649  this function was called. However, this must not happen as a rollback
1650  is written directly to the binary log. And in auto-commit mode, a single
1651  statement that is rolled back has the flag all == false.
1652  */
1653  DBUG_ASSERT(!all);
1654  /*
1655  We reach this point if the effect of a statement did not properly get into
1656  a cache and need to be rolled back.
1657  */
1658  error |= cache_mngr->trx_cache.truncate(thd, all);
1659  }
1660 
1661 end:
1662  /*
1663  When a statement errors out on auto-commit mode it is rollback
1664  implicitly, so the same should happen to its GTID.
1665  */
1666  if (!thd->in_active_multi_stmt_transaction())
1667  gtid_rollback(thd);
1668 
1669  DBUG_PRINT("return", ("error: %d", error));
1670  DBUG_RETURN(error);
1671 }
1672 
1697 static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
1698 {
1699  DBUG_ENTER("binlog_savepoint_set");
1700  int error= 1;
1701 
1702  String log_query;
1703  if (log_query.append(STRING_WITH_LEN("SAVEPOINT ")))
1704  DBUG_RETURN(error);
1705  else
1706  append_identifier(thd, &log_query, thd->lex->ident.str,
1707  thd->lex->ident.length);
1708 
1709  int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
1710  Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
1711  TRUE, FALSE, TRUE, errcode);
1712  /*
1713  We cannot record the position before writing the statement
1714  because a rollback to a savepoint (.e.g. consider it "S") would
1715  prevent the savepoint statement (i.e. "SAVEPOINT S") from being
1716  written to the binary log despite the fact that the server could
1717  still issue other rollback statements to the same savepoint (i.e.
1718  "S").
1719  Given that the savepoint is valid until the server releases it,
1720  ie, until the transaction commits or it is released explicitly,
1721  we need to log it anyway so that we don't have "ROLLBACK TO S"
1722  or "RELEASE S" without the preceding "SAVEPOINT S" in the binary
1723  log.
1724  */
1725  if (!(error= mysql_bin_log.write_event(&qinfo)))
1726  binlog_trans_log_savepos(thd, (my_off_t*) sv);
1727 
1728  DBUG_RETURN(error);
1729 }
1730 
1731 static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
1732 {
1733  DBUG_ENTER("binlog_savepoint_rollback");
1734  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
1735  my_off_t pos= *(my_off_t*) sv;
1736  DBUG_ASSERT(pos != ~(my_off_t) 0);
1737 
1738  /*
1739  Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
1740  non-transactional table. Otherwise, truncate the binlog cache starting
1741  from the SAVEPOINT command.
1742  */
1744  {
1745  String log_query;
1746  if (log_query.append(STRING_WITH_LEN("ROLLBACK TO ")) ||
1747  log_query.append("`") ||
1748  log_query.append(thd->lex->ident.str, thd->lex->ident.length) ||
1749  log_query.append("`"))
1750  DBUG_RETURN(1);
1751  int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
1752  Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
1753  TRUE, FALSE, TRUE, errcode);
1754  DBUG_RETURN(mysql_bin_log.write_event(&qinfo));
1755  }
1756  // Otherwise, we truncate the cache
1757  cache_mngr->trx_cache.restore_savepoint(pos);
1758  if (cache_mngr->trx_cache.is_binlog_empty())
1759  cache_mngr->trx_cache.group_cache.clear();
1760  DBUG_RETURN(0);
1761 }
1762 
1763 #ifdef HAVE_REPLICATION
1764 
1765 /*
1766  Adjust the position pointer in the binary log file for all running slaves
1767 
1768  SYNOPSIS
1769  adjust_linfo_offsets()
1770  purge_offset Number of bytes removed from start of log index file
1771 
1772  NOTES
1773  - This is called when doing a PURGE when we delete lines from the
1774  index log file
1775 
1776  REQUIREMENTS
1777  - Before calling this function, we have to ensure that no threads are
1778  using any binary log file before purge_offset.a
1779 
1780  TODO
1781  - Inform the slave threads that they should sync the position
1782  in the binary log file with flush_relay_log_info.
1783  Now they sync is done for next read.
1784 */
1785 
1786 static void adjust_linfo_offsets(my_off_t purge_offset)
1787 {
1788  mysql_mutex_lock(&LOCK_thread_count);
1789 
1790  Thread_iterator it= global_thread_list_begin();
1791  Thread_iterator end= global_thread_list_end();
1792  for (; it != end; ++it)
1793  {
1794  LOG_INFO* linfo;
1795  if ((linfo = (*it)->current_linfo))
1796  {
1797  mysql_mutex_lock(&linfo->lock);
1798  /*
1799  Index file offset can be less that purge offset only if
1800  we just started reading the index file. In that case
1801  we have nothing to adjust
1802  */
1803  if (linfo->index_file_offset < purge_offset)
1804  linfo->fatal = (linfo->index_file_offset != 0);
1805  else
1806  linfo->index_file_offset -= purge_offset;
1807  mysql_mutex_unlock(&linfo->lock);
1808  }
1809  }
1810  mysql_mutex_unlock(&LOCK_thread_count);
1811 }
1812 
1813 
1814 static int log_in_use(const char* log_name)
1815 {
1816  size_t log_name_len = strlen(log_name) + 1;
1817  int thread_count=0;
1818 
1819  mysql_mutex_lock(&LOCK_thread_count);
1820 
1821  Thread_iterator it= global_thread_list_begin();
1822  Thread_iterator end= global_thread_list_end();
1823  for (; it != end; ++it)
1824  {
1825  LOG_INFO* linfo;
1826  if ((linfo = (*it)->current_linfo))
1827  {
1828  mysql_mutex_lock(&linfo->lock);
1829  if(!memcmp(log_name, linfo->log_file_name, log_name_len))
1830  {
1831  thread_count++;
1832  sql_print_warning("file %s was not purged because it was being read"
1833  "by thread number %llu", log_name,
1834  (ulonglong)(*it)->thread_id);
1835  }
1836  mysql_mutex_unlock(&linfo->lock);
1837  }
1838  }
1839 
1840  mysql_mutex_unlock(&LOCK_thread_count);
1841  return thread_count;
1842 }
1843 
1844 static bool purge_error_message(THD* thd, int res)
1845 {
1846  uint errcode;
1847 
1848  if ((errcode= purge_log_get_error_code(res)) != 0)
1849  {
1850  my_message(errcode, ER(errcode), MYF(0));
1851  return TRUE;
1852  }
1853  my_ok(thd);
1854  return FALSE;
1855 }
1856 
1857 #endif /* HAVE_REPLICATION */
1858 
1859 int check_binlog_magic(IO_CACHE* log, const char** errmsg)
1860 {
1861  char magic[4];
1862  DBUG_ASSERT(my_b_tell(log) == 0);
1863 
1864  if (my_b_read(log, (uchar*) magic, sizeof(magic)))
1865  {
1866  *errmsg = "I/O error reading the header from the binary log";
1867  sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
1868  log->error);
1869  return 1;
1870  }
1871  if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
1872  {
1873  *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL";
1874  return 1;
1875  }
1876  return 0;
1877 }
1878 
1879 
1880 File open_binlog_file(IO_CACHE *log, const char *log_file_name, const char **errmsg)
1881 {
1882  File file;
1883  DBUG_ENTER("open_binlog_file");
1884 
1885  if ((file= mysql_file_open(key_file_binlog,
1886  log_file_name, O_RDONLY | O_BINARY | O_SHARE,
1887  MYF(MY_WME))) < 0)
1888  {
1889  sql_print_error("Failed to open log (file '%s', errno %d)",
1890  log_file_name, my_errno);
1891  *errmsg = "Could not open log file";
1892  goto err;
1893  }
1894  if (init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
1895  MYF(MY_WME|MY_DONT_CHECK_FILESIZE)))
1896  {
1897  sql_print_error("Failed to create a cache on log (file '%s')",
1898  log_file_name);
1899  *errmsg = "Could not open log file";
1900  goto err;
1901  }
1902  if (check_binlog_magic(log,errmsg))
1903  goto err;
1904  DBUG_RETURN(file);
1905 
1906 err:
1907  if (file >= 0)
1908  {
1909  mysql_file_close(file, MYF(0));
1910  end_io_cache(log);
1911  }
1912  DBUG_RETURN(-1);
1913 }
1914 
1923 bool
1925 {
1926  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
1927 
1928  return (cache_mngr ? !cache_mngr->trx_cache.is_binlog_empty() : 0);
1929 }
1930 
1939 bool
1941 {
1942  Ha_trx_info *ha_info;
1943 
1944  for (ha_info= thd->transaction.stmt.ha_list; ha_info;
1945  ha_info= ha_info->next())
1946  {
1947  if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
1948  return (TRUE);
1949  }
1950  return (FALSE);
1951 }
1952 
1963 bool ending_trans(THD* thd, const bool all)
1964 {
1965  return (all || ending_single_stmt_trans(thd, all));
1966 }
1967 
1979 bool ending_single_stmt_trans(THD* thd, const bool all)
1980 {
1981  return (!all && !thd->in_multi_stmt_transaction_mode());
1982 }
1983 
1991 bool trans_cannot_safely_rollback(const THD* thd)
1992 {
1993  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
1994 
1995  return cache_mngr->trx_cache.cannot_rollback();
1996 }
1997 
2005 bool stmt_cannot_safely_rollback(const THD* thd)
2006 {
2007  return thd->transaction.stmt.cannot_safely_rollback();
2008 }
2009 
2010 #ifndef EMBEDDED_LIBRARY
2011 
2022 bool purge_master_logs(THD* thd, const char* to_log)
2023 {
2024  char search_file_name[FN_REFLEN];
2025  if (!mysql_bin_log.is_open())
2026  {
2027  my_ok(thd);
2028  return FALSE;
2029  }
2030 
2031  mysql_bin_log.make_log_name(search_file_name, to_log);
2032  return purge_error_message(thd,
2033  mysql_bin_log.purge_logs(search_file_name, false,
2034  true/*need_lock_index=true*/,
2035  true/*need_update_threads=true*/,
2036  NULL, false));
2037 }
2038 
2039 
2051 bool purge_master_logs_before_date(THD* thd, time_t purge_time)
2052 {
2053  if (!mysql_bin_log.is_open())
2054  {
2055  my_ok(thd);
2056  return 0;
2057  }
2058  return purge_error_message(thd,
2059  mysql_bin_log.purge_logs_before_date(purge_time,
2060  false));
2061 }
2062 #endif /* EMBEDDED_LIBRARY */
2063 
2064 /*
2065  Helper function to get the error code of the query to be binlogged.
2066  */
2067 int query_error_code(THD *thd, bool not_killed)
2068 {
2069  int error;
2070 
2071  if (not_killed || (thd->killed == THD::KILL_BAD_DATA))
2072  {
2073  error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
2074 
2075  /* thd->get_stmt_da()->sql_errno() might be ER_SERVER_SHUTDOWN or
2076  ER_QUERY_INTERRUPTED, So here we need to make sure that error
2077  is not set to these errors when specified not_killed by the
2078  caller.
2079  */
2080  if (error == ER_SERVER_SHUTDOWN || error == ER_QUERY_INTERRUPTED)
2081  error= 0;
2082  }
2083  else
2084  {
2085  /* killed status for DELAYED INSERT thread should never be used */
2086  DBUG_ASSERT(!(thd->system_thread & SYSTEM_THREAD_DELAYED_INSERT));
2087  error= thd->killed_errno();
2088  }
2089 
2090  return error;
2091 }
2092 
2093 
2111 static bool copy_file(IO_CACHE *from, IO_CACHE *to, my_off_t offset)
2112 {
2113  int bytes_read;
2114  uchar io_buf[IO_SIZE*2];
2115  DBUG_ENTER("copy_file");
2116 
2117  mysql_file_seek(from->file, offset, MY_SEEK_SET, MYF(0));
2118  while(TRUE)
2119  {
2120  if ((bytes_read= (int) mysql_file_read(from->file, io_buf, sizeof(io_buf),
2121  MYF(MY_WME)))
2122  < 0)
2123  goto err;
2124  if (DBUG_EVALUATE_IF("fault_injection_copy_part_file", 1, 0))
2125  bytes_read= bytes_read/2;
2126  if (!bytes_read)
2127  break; // end of file
2128  if (mysql_file_write(to->file, io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
2129  goto err;
2130  }
2131 
2132  DBUG_RETURN(0);
2133 
2134 err:
2135  DBUG_RETURN(1);
2136 }
2137 
2138 
2139 #ifdef HAVE_REPLICATION
2140 
2150 int log_loaded_block(IO_CACHE* file)
2151 {
2152  DBUG_ENTER("log_loaded_block");
2153  LOAD_FILE_INFO *lf_info;
2154  uint block_len;
2155  /* buffer contains position where we started last read */
2156  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
2157  uint max_event_size= current_thd->variables.max_allowed_packet;
2158  lf_info= (LOAD_FILE_INFO*) file->arg;
2159  if (lf_info->thd->is_current_stmt_binlog_format_row())
2160  DBUG_RETURN(0);
2161  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
2162  lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
2163  DBUG_RETURN(0);
2164 
2165  for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
2166  buffer += min(block_len, max_event_size),
2167  block_len -= min(block_len, max_event_size))
2168  {
2169  lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
2170  if (lf_info->wrote_create_file)
2171  {
2172  Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
2173  min(block_len, max_event_size),
2174  lf_info->log_delayed);
2175  if (mysql_bin_log.write_event(&a))
2176  DBUG_RETURN(1);
2177  }
2178  else
2179  {
2180  Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
2181  buffer,
2182  min(block_len, max_event_size),
2183  lf_info->log_delayed);
2184  if (mysql_bin_log.write_event(&b))
2185  DBUG_RETURN(1);
2186  lf_info->wrote_create_file= 1;
2187  }
2188  }
2189  DBUG_RETURN(0);
2190 }
2191 
2192 /* Helper function for SHOW BINLOG/RELAYLOG EVENTS */
2193 bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log)
2194 {
2195  Protocol *protocol= thd->protocol;
2196  List<Item> field_list;
2197  const char *errmsg = 0;
2198  bool ret = TRUE;
2199  IO_CACHE log;
2200  File file = -1;
2201  int old_max_allowed_packet= thd->variables.max_allowed_packet;
2202  LOG_INFO linfo;
2203 
2204  DBUG_ENTER("show_binlog_events");
2205 
2206  DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
2207  thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
2208 
2209  Format_description_log_event *description_event= new
2210  Format_description_log_event(3); /* MySQL 4.0 by default */
2211 
2212  if (binary_log->is_open())
2213  {
2214  LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
2215  SELECT_LEX_UNIT *unit= &thd->lex->unit;
2216  ha_rows event_count, limit_start, limit_end;
2217  my_off_t pos = max<my_off_t>(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
2218  char search_file_name[FN_REFLEN], *name;
2219  const char *log_file_name = lex_mi->log_file_name;
2220  mysql_mutex_t *log_lock = binary_log->get_log_lock();
2221  Log_event* ev;
2222 
2223  unit->set_limit(thd->lex->current_select);
2224  limit_start= unit->offset_limit_cnt;
2225  limit_end= unit->select_limit_cnt;
2226 
2227  name= search_file_name;
2228  if (log_file_name)
2229  binary_log->make_log_name(search_file_name, log_file_name);
2230  else
2231  name=0; // Find first log
2232 
2233  linfo.index_file_offset = 0;
2234 
2235  if (binary_log->find_log_pos(&linfo, name, true/*need_lock_index=true*/))
2236  {
2237  errmsg = "Could not find target log";
2238  goto err;
2239  }
2240 
2241  mysql_mutex_lock(&LOCK_thread_count);
2242  thd->current_linfo = &linfo;
2243  mysql_mutex_unlock(&LOCK_thread_count);
2244 
2245  if ((file=open_binlog_file(&log, linfo.log_file_name, &errmsg)) < 0)
2246  goto err;
2247 
2248  /*
2249  to account binlog event header size
2250  */
2251  thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
2252 
2253  DEBUG_SYNC(thd, "after_show_binlog_event_found_file");
2254 
2255  mysql_mutex_lock(log_lock);
2256 
2257  /*
2258  open_binlog_file() sought to position 4.
2259  Read the first event in case it's a Format_description_log_event, to
2260  know the format. If there's no such event, we are 3.23 or 4.x. This
2261  code, like before, can't read 3.23 binlogs.
2262  This code will fail on a mixed relay log (one which has Format_desc then
2263  Rotate then Format_desc).
2264  */
2265  ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event,
2266  opt_master_verify_checksum);
2267  if (ev)
2268  {
2269  if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
2270  {
2271  delete description_event;
2272  description_event= (Format_description_log_event*) ev;
2273  }
2274  else
2275  delete ev;
2276  }
2277 
2278  my_b_seek(&log, pos);
2279 
2280  if (!description_event->is_valid())
2281  {
2282  errmsg="Invalid Format_description event; could be out of memory";
2283  goto err;
2284  }
2285 
2286  for (event_count = 0;
2287  (ev = Log_event::read_log_event(&log, (mysql_mutex_t*) 0,
2288  description_event,
2289  opt_master_verify_checksum)); )
2290  {
2291  if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
2292  description_event->checksum_alg= ev->checksum_alg;
2293 
2294  if (event_count >= limit_start &&
2295  ev->net_send(protocol, linfo.log_file_name, pos))
2296  {
2297  errmsg = "Net error";
2298  delete ev;
2299  mysql_mutex_unlock(log_lock);
2300  goto err;
2301  }
2302 
2303  pos = my_b_tell(&log);
2304  delete ev;
2305 
2306  if (++event_count >= limit_end)
2307  break;
2308  }
2309 
2310  if (event_count < limit_end && log.error)
2311  {
2312  errmsg = "Wrong offset or I/O error";
2313  mysql_mutex_unlock(log_lock);
2314  goto err;
2315  }
2316 
2317  mysql_mutex_unlock(log_lock);
2318  }
2319  // Check that linfo is still on the function scope.
2320  DEBUG_SYNC(thd, "after_show_binlog_events");
2321 
2322  ret= FALSE;
2323 
2324 err:
2325  delete description_event;
2326  if (file >= 0)
2327  {
2328  end_io_cache(&log);
2329  mysql_file_close(file, MYF(MY_WME));
2330  }
2331 
2332  if (errmsg)
2333  my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
2334  "SHOW BINLOG EVENTS", errmsg);
2335  else
2336  my_eof(thd);
2337 
2338  mysql_mutex_lock(&LOCK_thread_count);
2339  thd->current_linfo = 0;
2340  mysql_mutex_unlock(&LOCK_thread_count);
2341  thd->variables.max_allowed_packet= old_max_allowed_packet;
2342  DBUG_RETURN(ret);
2343 }
2344 
2354 bool mysql_show_binlog_events(THD* thd)
2355 {
2356  Protocol *protocol= thd->protocol;
2357  List<Item> field_list;
2358  DBUG_ENTER("mysql_show_binlog_events");
2359 
2360  DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS);
2361 
2362  Log_event::init_show_field_list(&field_list);
2363  if (protocol->send_result_set_metadata(&field_list,
2364  Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2365  DBUG_RETURN(TRUE);
2366 
2367  /*
2368  Wait for handlers to insert any pending information
2369  into the binlog. For e.g. ndb which updates the binlog asynchronously
2370  this is needed so that the uses sees all its own commands in the binlog
2371  */
2372  ha_binlog_wait(thd);
2373 
2374  DBUG_RETURN(show_binlog_events(thd, &mysql_bin_log));
2375 }
2376 
2377 #endif /* HAVE_REPLICATION */
2378 
2379 
2380 MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
2381  :bytes_written(0), file_id(1), open_count(1),
2382  sync_period_ptr(sync_period), sync_counter(0),
2383  m_prep_xids(0),
2384  is_relay_log(0), signal_cnt(0),
2385  checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
2386  relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
2387  previous_gtid_set(0)
2388 {
2389  /*
2390  We don't want to initialize locks here as such initialization depends on
2391  safe_mutex (when using safe_mutex) which depends on MY_INIT(), which is
2392  called only in main(). Doing initialization here would make it happen
2393  before main().
2394  */
2395  index_file_name[0] = 0;
2396  memset(&index_file, 0, sizeof(index_file));
2397  memset(&purge_index_file, 0, sizeof(purge_index_file));
2398  memset(&crash_safe_index_file, 0, sizeof(crash_safe_index_file));
2399 }
2400 
2401 
2402 /* this is called only once */
2403 
2404 void MYSQL_BIN_LOG::cleanup()
2405 {
2406  DBUG_ENTER("cleanup");
2407  if (inited)
2408  {
2409  inited= 0;
2410  close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
2411  mysql_mutex_destroy(&LOCK_log);
2412  mysql_mutex_destroy(&LOCK_index);
2413  mysql_mutex_destroy(&LOCK_commit);
2414  mysql_mutex_destroy(&LOCK_sync);
2415  mysql_mutex_destroy(&LOCK_xids);
2416  mysql_cond_destroy(&update_cond);
2417  my_atomic_rwlock_destroy(&m_prep_xids_lock);
2418  mysql_cond_destroy(&m_prep_xids_cond);
2419  stage_manager.deinit();
2420  }
2421  DBUG_VOID_RETURN;
2422 }
2423 
2424 
2425 void MYSQL_BIN_LOG::init_pthread_objects()
2426 {
2427  MYSQL_LOG::init_pthread_objects();
2428  mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW);
2429  mysql_mutex_init(m_key_LOCK_commit, &LOCK_commit, MY_MUTEX_INIT_FAST);
2430  mysql_mutex_init(m_key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST);
2431  mysql_mutex_init(m_key_LOCK_xids, &LOCK_xids, MY_MUTEX_INIT_FAST);
2432  mysql_cond_init(m_key_update_cond, &update_cond, 0);
2433  my_atomic_rwlock_init(&m_prep_xids_lock);
2434  mysql_cond_init(m_key_prep_xids_cond, &m_prep_xids_cond, NULL);
2435  stage_manager.init(
2436 #ifdef HAVE_PSI_INTERFACE
2437  m_key_LOCK_flush_queue,
2438  m_key_LOCK_sync_queue,
2439  m_key_LOCK_commit_queue,
2440  m_key_LOCK_done, m_key_COND_done
2441 #endif
2442  );
2443 }
2444 
2445 bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg,
2446  const char *log_name, bool need_lock_index)
2447 {
2448  File index_file_nr= -1;
2449  DBUG_ASSERT(!my_b_inited(&index_file));
2450 
2451  /*
2452  First open of this class instance
2453  Create an index file that will hold all file names uses for logging.
2454  Add new entries to the end of it.
2455  */
2456  myf opt= MY_UNPACK_FILENAME;
2457  if (!index_file_name_arg)
2458  {
2459  index_file_name_arg= log_name; // Use same basename for index file
2460  opt= MY_UNPACK_FILENAME | MY_REPLACE_EXT;
2461  }
2462  fn_format(index_file_name, index_file_name_arg, mysql_data_home,
2463  ".index", opt);
2464 
2465  if (set_crash_safe_index_file_name(index_file_name_arg))
2466  {
2467  sql_print_error("MYSQL_BIN_LOG::set_crash_safe_index_file_name failed.");
2468  return TRUE;
2469  }
2470 
2471  /*
2472  We need move crash_safe_index_file to index_file if the index_file
2473  does not exist and crash_safe_index_file exists when mysqld server
2474  restarts.
2475  */
2476  if (my_access(index_file_name, F_OK) &&
2477  !my_access(crash_safe_index_file_name, F_OK) &&
2478  my_rename(crash_safe_index_file_name, index_file_name, MYF(MY_WME)))
2479  {
2480  sql_print_error("MYSQL_BIN_LOG::open_index_file failed to "
2481  "move crash_safe_index_file to index file.");
2482  return TRUE;
2483  }
2484 
2485  if ((index_file_nr= mysql_file_open(m_key_file_log_index,
2486  index_file_name,
2487  O_RDWR | O_CREAT | O_BINARY,
2488  MYF(MY_WME))) < 0 ||
2489  mysql_file_sync(index_file_nr, MYF(MY_WME)) ||
2490  init_io_cache(&index_file, index_file_nr,
2491  IO_SIZE, READ_CACHE,
2492  mysql_file_seek(index_file_nr, 0L, MY_SEEK_END, MYF(0)),
2493  0, MYF(MY_WME | MY_WAIT_IF_FULL)) ||
2494  DBUG_EVALUATE_IF("fault_injection_openning_index", 1, 0))
2495  {
2496  /*
2497  TODO: all operations creating/deleting the index file or a log, should
2498  call my_sync_dir() or my_sync_dir_by_file() to be durable.
2499  TODO: file creation should be done with mysql_file_create()
2500  not mysql_file_open().
2501  */
2502  if (index_file_nr >= 0)
2503  mysql_file_close(index_file_nr, MYF(0));
2504  return TRUE;
2505  }
2506 
2507 #ifdef HAVE_REPLICATION
2508  /*
2509  Sync the index by purging any binary log file that is not registered.
2510  In other words, either purge binary log files that were removed from
2511  the index but not purged from the file system due to a crash or purge
2512  any binary log file that was created but not register in the index
2513  due to a crash.
2514  */
2515 
2516  if (set_purge_index_file_name(index_file_name_arg) ||
2517  open_purge_index_file(FALSE) ||
2518  purge_index_entry(NULL, NULL, need_lock_index) ||
2519  close_purge_index_file() ||
2520  DBUG_EVALUATE_IF("fault_injection_recovering_index", 1, 0))
2521  {
2522  sql_print_error("MYSQL_BIN_LOG::open_index_file failed to sync the index "
2523  "file.");
2524  return TRUE;
2525  }
2526 #endif
2527 
2528  return FALSE;
2529 }
2530 
2531 
2556 { GOT_GTIDS, GOT_PREVIOUS_GTIDS, NO_GTIDS, ERROR, TRUNCATED };
2558 read_gtids_from_binlog(const char *filename, Gtid_set *all_gtids,
2559  Gtid_set *prev_gtids, bool verify_checksum)
2560 {
2561  DBUG_ENTER("read_gtids_from_binlog");
2562  DBUG_PRINT("info", ("Opening file %s", filename));
2563 
2564  /*
2565  Create a Format_description_log_event that is used to read the
2566  first event of the log.
2567  */
2568  Format_description_log_event fd_ev(BINLOG_VERSION), *fd_ev_p= &fd_ev;
2569  if (!fd_ev.is_valid())
2570  DBUG_RETURN(ERROR);
2571 
2572  File file;
2573  IO_CACHE log;
2574 
2575  const char *errmsg= NULL;
2576  if ((file= open_binlog_file(&log, filename, &errmsg)) < 0)
2577  {
2578  sql_print_error("%s", errmsg);
2579  /*
2580  We need to revisit the recovery procedure for relay log
2581  files. Currently, it is called after this routine.
2582  /Alfranio
2583  */
2584  DBUG_RETURN(TRUNCATED);
2585  }
2586 
2587  /*
2588  Seek for Previous_gtids_log_event and Gtid_log_event events to
2589  gather information what has been processed so far.
2590  */
2591  my_b_seek(&log, BIN_LOG_HEADER_SIZE);
2592  Log_event *ev= NULL;
2593  enum_read_gtids_from_binlog_status ret= NO_GTIDS;
2594  bool done= false;
2595  while (!done &&
2596  (ev= Log_event::read_log_event(&log, 0, fd_ev_p, verify_checksum)) !=
2597  NULL)
2598  {
2599  DBUG_PRINT("info", ("Read event of type %s", ev->get_type_str()));
2600  switch (ev->get_type_code())
2601  {
2602  case FORMAT_DESCRIPTION_EVENT:
2603  if (fd_ev_p != &fd_ev)
2604  delete fd_ev_p;
2605  fd_ev_p= (Format_description_log_event *)ev;
2606  break;
2607  case ROTATE_EVENT:
2608  // do nothing; just accept this event and go to next
2609  break;
2610  case PREVIOUS_GTIDS_LOG_EVENT:
2611  {
2612  if (gtid_mode == 0)
2613  {
2614  my_error(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF, MYF(0));
2615  ret= ERROR;
2616  }
2617  ret= GOT_PREVIOUS_GTIDS;
2618  // add events to sets
2619  Previous_gtids_log_event *prev_gtids_ev=
2621  if (all_gtids != NULL && prev_gtids_ev->add_to_set(all_gtids) != 0)
2622  ret= ERROR, done= true;
2623  else if (prev_gtids != NULL && prev_gtids_ev->add_to_set(prev_gtids) != 0)
2624  ret= ERROR, done= true;
2625 #ifndef DBUG_OFF
2626  char* prev_buffer= prev_gtids_ev->get_str(NULL, NULL);
2627  DBUG_PRINT("info", ("Got Previous_gtids from file '%s': Gtid_set='%s'.",
2628  filename, prev_buffer));
2629  my_free(prev_buffer);
2630 #endif
2631  break;
2632  }
2633  case GTID_LOG_EVENT:
2634  {
2635  DBUG_EXECUTE_IF("inject_fault_bug16502579", {
2636  DBUG_PRINT("debug", ("GTID_LOG_EVENT found. Injected ret=NO_GTIDS."));
2637  ret=NO_GTIDS;
2638  });
2639  if (ret != GOT_GTIDS)
2640  {
2641  if (ret != GOT_PREVIOUS_GTIDS)
2642  {
2643  /*
2644  Since this routine is run on startup, there may not be a
2645  THD instance. Therefore, ER(X) cannot be used.
2646  */
2647  const char* msg_fmt= (current_thd != NULL) ?
2648  ER(ER_BINLOG_LOGICAL_CORRUPTION) :
2649  ER_DEFAULT(ER_BINLOG_LOGICAL_CORRUPTION);
2650  my_printf_error(ER_BINLOG_LOGICAL_CORRUPTION,
2651  msg_fmt, MYF(0),
2652  filename,
2653  "The first global transaction identifier was read, but "
2654  "no other information regarding identifiers existing "
2655  "on the previous log files was found.");
2656  ret= ERROR, done= true;
2657  break;
2658  }
2659  else
2660  ret= GOT_GTIDS;
2661  }
2662  /*
2663  When all_gtids==NULL, we just check if the binary log contains
2664  at least one Gtid_log_event, so that we can distinguish the
2665  return values GOT_GTID and GOT_PREVIOUS_GTIDS. We don't need
2666  to read anything else from the binary log.
2667  */
2668  if (all_gtids == NULL)
2669  ret= GOT_GTIDS, done= true;
2670  else
2671  {
2672  Gtid_log_event *gtid_ev= (Gtid_log_event *)ev;
2673  rpl_sidno sidno= gtid_ev->get_sidno(false/*false=don't need lock*/);
2674  if (sidno < 0)
2675  ret= ERROR, done= true;
2676  {
2677  if (all_gtids->ensure_sidno(sidno) != RETURN_STATUS_OK)
2678  ret= ERROR, done= true;
2679  else if (all_gtids->_add_gtid(sidno, gtid_ev->get_gno()) !=
2680  RETURN_STATUS_OK)
2681  ret= ERROR, done= true;
2682  }
2683  DBUG_PRINT("info", ("Got Gtid from file '%s': Gtid(%d, %lld).",
2684  filename, sidno, gtid_ev->get_gno()));
2685  }
2686  break;
2687  }
2688  case ANONYMOUS_GTID_LOG_EVENT:
2689  default:
2690  // if we found any other event type without finding a
2691  // previous_gtids_log_event, then the rest of this binlog
2692  // cannot contain gtids
2693  if (ret != GOT_GTIDS && ret != GOT_PREVIOUS_GTIDS)
2694  done= true;
2695  break;
2696  }
2697  if (ev != fd_ev_p)
2698  delete ev;
2699  DBUG_PRINT("info", ("done=%d", done));
2700  }
2701 
2702  if (log.error < 0)
2703  {
2704  // This is not a fatal error; the log may just be truncated.
2705 
2706  // @todo but what other errors could happen? IO error?
2707  sql_print_warning("Error reading GTIDs from binary log: %d", log.error);
2708  }
2709 
2710  if (fd_ev_p != &fd_ev)
2711  {
2712  delete fd_ev_p;
2713  fd_ev_p= &fd_ev;
2714  }
2715 
2716  mysql_file_close(file, MYF(MY_WME));
2717  end_io_cache(&log);
2718 
2719  DBUG_PRINT("info", ("returning %d", ret));
2720  DBUG_RETURN(ret);
2721 }
2722 
2724  const Gtid_set *gtid_set,
2725  const char **errmsg)
2726 {
2727  DBUG_ENTER("MYSQL_BIN_LOG::gtid_read_start_binlog");
2728  /*
2729  Gather the set of files to be accessed.
2730  */
2731  list<string> filename_list;
2732  LOG_INFO linfo;
2733  int error;
2734 
2735  list<string>::reverse_iterator rit;
2736  Gtid_set previous_gtid_set(gtid_set->get_sid_map());
2737 
2738  mysql_mutex_lock(&LOCK_index);
2739  for (error= find_log_pos(&linfo, NULL, false/*need_lock_index=false*/);
2740  !error; error= find_next_log(&linfo, false/*need_lock_index=false*/))
2741  {
2742  DBUG_PRINT("info", ("read log filename '%s'", linfo.log_file_name));
2743  filename_list.push_back(string(linfo.log_file_name));
2744  }
2745  mysql_mutex_unlock(&LOCK_index);
2746  if (error != LOG_INFO_EOF)
2747  {
2748  *errmsg= "Failed to read the binary log index file while "
2749  "looking for the oldest binary log that contains any GTID "
2750  "that is not in the given gtid set";
2751  error= -1;
2752  goto end;
2753  }
2754 
2755  if (filename_list.empty())
2756  {
2757  *errmsg= "Could not find first log file name in binary log index file "
2758  "while looking for the oldest binary log that contains any GTID "
2759  "that is not in the given gtid set";
2760  error= -2;
2761  goto end;
2762  }
2763 
2764  /*
2765  Iterate over all the binary logs in reverse order, and read only
2766  the Previous_gtids_log_event, to find the first one, that is the
2767  subset of the given gtid set. Since every binary log begins with
2768  a Previous_gtids_log_event, that contains all GTIDs in all
2769  previous binary logs.
2770  */
2771  DBUG_PRINT("info", ("Iterating backwards through binary logs, and reading "
2772  "only the Previous_gtids_log_event, to find the first "
2773  "one, that is the subset of the given gtid set."));
2774  rit= filename_list.rbegin();
2775  error= 0;
2776  while (rit != filename_list.rend())
2777  {
2778  const char *filename= rit->c_str();
2779  DBUG_PRINT("info", ("Read Previous_gtids_log_event from filename='%s'",
2780  filename));
2781  switch (read_gtids_from_binlog(filename, NULL, &previous_gtid_set,
2782  opt_master_verify_checksum))
2783  {
2784  case ERROR:
2785  *errmsg= "Error reading header of binary log while looking for "
2786  "the oldest binary log that contains any GTID that is not in "
2787  "the given gtid set";
2788  error= -3;
2789  goto end;
2790  case NO_GTIDS:
2791  *errmsg= "Found old binary log without GTIDs while looking for "
2792  "the oldest binary log that contains any GTID that is not in "
2793  "the given gtid set";
2794  error= -4;
2795  goto end;
2796  case GOT_GTIDS:
2797  case GOT_PREVIOUS_GTIDS:
2798  if (previous_gtid_set.is_subset(gtid_set))
2799  {
2800  strcpy(binlog_file_name, filename);
2801  /*
2802  Verify that the selected binlog is not the first binlog,
2803  */
2804  DBUG_EXECUTE_IF("slave_reconnect_with_gtid_set_executed",
2805  DBUG_ASSERT(strcmp(filename_list.begin()->c_str(),
2806  binlog_file_name) != 0););
2807  goto end;
2808  }
2809  case TRUNCATED:
2810  break;
2811  }
2812  previous_gtid_set.clear();
2813 
2814  rit++;
2815  }
2816 
2817  if (rit == filename_list.rend())
2818  {
2819  *errmsg= ER(ER_MASTER_HAS_PURGED_REQUIRED_GTIDS);
2820  error= -5;
2821  }
2822 
2823 end:
2824  if (error)
2825  DBUG_PRINT("error", ("'%s'", *errmsg));
2826  filename_list.clear();
2827  DBUG_PRINT("info", ("returning %d", error));
2828  DBUG_RETURN(error != 0 ? true : false);
2829 }
2830 
2831 bool MYSQL_BIN_LOG::init_gtid_sets(Gtid_set *all_gtids, Gtid_set *lost_gtids,
2832  bool verify_checksum, bool need_lock)
2833 {
2834  DBUG_ENTER("MYSQL_BIN_LOG::init_gtid_sets");
2835  DBUG_PRINT("info", ("lost_gtids=%p; so we are recovering a %s log",
2836  lost_gtids, lost_gtids == NULL ? "relay" : "binary"));
2837 
2838  /*
2839  Acquires the necessary locks to ensure that logs are not either
2840  removed or updated when we are reading from it.
2841  */
2842  if (need_lock)
2843  {
2844  // We don't need LOCK_log if we are only going to read the initial
2845  // Prevoius_gtids_log_event and ignore the Gtid_log_events.
2846  if (all_gtids != NULL)
2847  mysql_mutex_lock(&LOCK_log);
2848  mysql_mutex_lock(&LOCK_index);
2849  global_sid_lock->wrlock();
2850  }
2851  else
2852  {
2853  if (all_gtids != NULL)
2854  mysql_mutex_assert_owner(&LOCK_log);
2855  mysql_mutex_assert_owner(&LOCK_index);
2856  global_sid_lock->assert_some_wrlock();
2857  }
2858 
2859  // Gather the set of files to be accessed.
2860  list<string> filename_list;
2861  LOG_INFO linfo;
2862  int error;
2863 
2864  list<string>::iterator it;
2865  list<string>::reverse_iterator rit;
2866  bool reached_first_file= false;
2867 
2868  for (error= find_log_pos(&linfo, NULL, false/*need_lock_index=false*/); !error;
2869  error= find_next_log(&linfo, false/*need_lock_index=false*/))
2870  {
2871  DBUG_PRINT("info", ("read log filename '%s'", linfo.log_file_name));
2872  filename_list.push_back(string(linfo.log_file_name));
2873  }
2874  if (error != LOG_INFO_EOF)
2875  {
2876  DBUG_PRINT("error", ("Error reading binlog index"));
2877  goto end;
2878  }
2879  error= 0;
2880 
2881  if (all_gtids != NULL)
2882  {
2883  DBUG_PRINT("info", ("Iterating backwards through binary logs, looking for the last binary log that contains a Previous_gtids_log_event."));
2884  // Iterate over all files in reverse order until we find one that
2885  // contains a Previous_gtids_log_event.
2886  rit= filename_list.rbegin();
2887  bool got_gtids= false;
2888  reached_first_file= (rit == filename_list.rend());
2889  DBUG_PRINT("info", ("filename='%s' reached_first_file=%d",
2890  rit->c_str(), reached_first_file));
2891  while (!got_gtids && !reached_first_file)
2892  {
2893  const char *filename= rit->c_str();
2894  rit++;
2895  reached_first_file= (rit == filename_list.rend());
2896  DBUG_PRINT("info", ("filename='%s' got_gtids=%d reached_first_file=%d",
2897  filename, got_gtids, reached_first_file));
2898  switch (read_gtids_from_binlog(filename, all_gtids,
2899  reached_first_file ? lost_gtids : NULL,
2900  verify_checksum))
2901  {
2902  case ERROR:
2903  error= 1;
2904  goto end;
2905  case GOT_GTIDS:
2906  case GOT_PREVIOUS_GTIDS:
2907  got_gtids= true;
2908  /*FALLTHROUGH*/
2909  case NO_GTIDS:
2910  case TRUNCATED:
2911  break;
2912  }
2913  }
2914  }
2915  if (lost_gtids != NULL && !reached_first_file)
2916  {
2917  DBUG_PRINT("info", ("Iterating forwards through binary logs, looking for the first binary log that contains a Previous_gtids_log_event."));
2918  for (it= filename_list.begin(); it != filename_list.end(); it++)
2919  {
2920  const char *filename= it->c_str();
2921  DBUG_PRINT("info", ("filename='%s'", filename));
2922  switch (read_gtids_from_binlog(filename, NULL, lost_gtids,
2923  verify_checksum))
2924  {
2925  case ERROR:
2926  error= 1;
2927  /*FALLTHROUGH*/
2928  case GOT_GTIDS:
2929  goto end;
2930  case GOT_PREVIOUS_GTIDS:
2931  case NO_GTIDS:
2932  case TRUNCATED:
2933  break;
2934  }
2935  }
2936  }
2937 end:
2938  if (all_gtids)
2939  all_gtids->dbug_print("all_gtids");
2940  if (lost_gtids)
2941  lost_gtids->dbug_print("lost_gtids");
2942  if (need_lock)
2943  {
2944  global_sid_lock->unlock();
2945  mysql_mutex_unlock(&LOCK_index);
2946  if (all_gtids != NULL)
2947  mysql_mutex_unlock(&LOCK_log);
2948  }
2949  filename_list.clear();
2950  DBUG_PRINT("info", ("returning %d", error));
2951  DBUG_RETURN(error != 0 ? true : false);
2952 }
2953 
2954 
2969 bool MYSQL_BIN_LOG::open_binlog(const char *log_name,
2970  const char *new_name,
2971  enum cache_type io_cache_type_arg,
2972  ulong max_size_arg,
2973  bool null_created_arg,
2974  bool need_lock_index,
2975  bool need_sid_lock,
2976  Format_description_log_event *extra_description_event)
2977 {
2978  File file= -1;
2979 
2980  // lock_index must be acquired *before* sid_lock.
2981  DBUG_ASSERT(need_sid_lock || !need_lock_index);
2982  DBUG_ENTER("MYSQL_BIN_LOG::open_binlog(const char *, ...)");
2983  DBUG_PRINT("enter",("name: %s", log_name));
2984 
2985  if (init_and_set_log_file_name(log_name, new_name, LOG_BIN,
2986  io_cache_type_arg))
2987  {
2988  sql_print_error("MYSQL_BIN_LOG::open failed to generate new file name.");
2989  DBUG_RETURN(1);
2990  }
2991 
2992 #ifdef HAVE_REPLICATION
2993  if (open_purge_index_file(TRUE) ||
2994  register_create_index_entry(log_file_name) ||
2995  sync_purge_index_file() ||
2996  DBUG_EVALUATE_IF("fault_injection_registering_index", 1, 0))
2997  {
3007  DBUG_EXECUTE_IF("fault_injection_registering_index", {
3008  if (my_b_inited(&purge_index_file))
3009  {
3010  end_io_cache(&purge_index_file);
3011  my_close(purge_index_file.file, MYF(0));
3012  }
3013  });
3014 
3015  sql_print_error("MYSQL_BIN_LOG::open failed to sync the index file.");
3016  DBUG_RETURN(1);
3017  }
3018  DBUG_EXECUTE_IF("crash_create_non_critical_before_update_index", DBUG_SUICIDE(););
3019 #endif
3020 
3021  write_error= 0;
3022 
3023  /* open the main log file */
3024  if (MYSQL_LOG::open(
3025 #ifdef HAVE_PSI_INTERFACE
3026  m_key_file_log,
3027 #endif
3028  log_name, LOG_BIN, new_name, io_cache_type_arg))
3029  {
3030 #ifdef HAVE_REPLICATION
3031  close_purge_index_file();
3032 #endif
3033  DBUG_RETURN(1); /* all warnings issued */
3034  }
3035 
3036  max_size= max_size_arg;
3037 
3038  open_count++;
3039 
3040  bool write_file_name_to_index_file=0;
3041 
3042  /* This must be before goto err. */
3043  Format_description_log_event s(BINLOG_VERSION);
3044 
3045  if (!my_b_filelength(&log_file))
3046  {
3047  /*
3048  The binary log file was empty (probably newly created)
3049  This is the normal case and happens when the user doesn't specify
3050  an extension for the binary log files.
3051  In this case we write a standard header to it.
3052  */
3053  if (my_b_safe_write(&log_file, (uchar*) BINLOG_MAGIC,
3054  BIN_LOG_HEADER_SIZE))
3055  goto err;
3056  bytes_written+= BIN_LOG_HEADER_SIZE;
3057  write_file_name_to_index_file= 1;
3058  }
3059 
3060  /*
3061  don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache
3062  as we won't be able to reset it later
3063  */
3064  if (io_cache_type == WRITE_CACHE)
3065  s.flags |= LOG_EVENT_BINLOG_IN_USE_F;
3066  s.checksum_alg= is_relay_log ?
3067  /* relay-log */
3068  /* inherit master's A descriptor if one has been received */
3069  (relay_log_checksum_alg=
3070  (relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
3071  relay_log_checksum_alg :
3072  /* otherwise use slave's local preference of RL events verification */
3073  (opt_slave_sql_verify_checksum == 0) ?
3074  (uint8) BINLOG_CHECKSUM_ALG_OFF : binlog_checksum_options):
3075  /* binlog */
3076  binlog_checksum_options;
3077  DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
3078  if (!s.is_valid())
3079  goto err;
3080  s.dont_set_created= null_created_arg;
3081  /* Set LOG_EVENT_RELAY_LOG_F flag for relay log's FD */
3082  if (is_relay_log)
3083  s.set_relay_log_event();
3084  if (s.write(&log_file))
3085  goto err;
3086  bytes_written+= s.data_written;
3087  /*
3088  We need to revisit this code and improve it.
3089  See further comments in the mysqld.
3090  /Alfranio
3091  */
3092  if (current_thd && gtid_mode > 0)
3093  {
3094  if (need_sid_lock)
3095  global_sid_lock->wrlock();
3096  else
3097  global_sid_lock->assert_some_wrlock();
3098  Previous_gtids_log_event prev_gtids_ev(previous_gtid_set);
3099  if (need_sid_lock)
3100  global_sid_lock->unlock();
3101  prev_gtids_ev.checksum_alg= s.checksum_alg;
3102  if (prev_gtids_ev.write(&log_file))
3103  goto err;
3104  bytes_written+= prev_gtids_ev.data_written;
3105  }
3106  if (extra_description_event &&
3107  extra_description_event->binlog_version>=4)
3108  {
3109  /*
3110  This is a relay log written to by the I/O slave thread.
3111  Write the event so that others can later know the format of this relay
3112  log.
3113  Note that this event is very close to the original event from the
3114  master (it has binlog version of the master, event types of the
3115  master), so this is suitable to parse the next relay log's event. It
3116  has been produced by
3117  Format_description_log_event::Format_description_log_event(char* buf,).
3118  Why don't we want to write the mi_description_event if this
3119  event is for format<4 (3.23 or 4.x): this is because in that case, the
3120  mi_description_event describes the data received from the
3121  master, but not the data written to the relay log (*conversion*),
3122  which is in format 4 (slave's).
3123  */
3124  /*
3125  Set 'created' to 0, so that in next relay logs this event does not
3126  trigger cleaning actions on the slave in
3127  Format_description_log_event::apply_event_impl().
3128  */
3129  extra_description_event->created= 0;
3130  /* Don't set log_pos in event header */
3131  extra_description_event->set_artificial_event();
3132 
3133  if (extra_description_event->write(&log_file))
3134  goto err;
3135  bytes_written+= extra_description_event->data_written;
3136  }
3137  if (flush_io_cache(&log_file) ||
3138  mysql_file_sync(log_file.file, MYF(MY_WME)))
3139  goto err;
3140 
3141  if (write_file_name_to_index_file)
3142  {
3143 #ifdef HAVE_REPLICATION
3144  DBUG_EXECUTE_IF("crash_create_critical_before_update_index", DBUG_SUICIDE(););
3145 #endif
3146 
3147  DBUG_ASSERT(my_b_inited(&index_file) != 0);
3148 
3149  /*
3150  The new log file name is appended into crash safe index file after
3151  all the content of index file is copyed into the crash safe index
3152  file. Then move the crash safe index file to index file.
3153  */
3154  if (DBUG_EVALUATE_IF("fault_injection_updating_index", 1, 0) ||
3155  add_log_to_index((uchar*) log_file_name, strlen(log_file_name),
3156  need_lock_index))
3157  goto err;
3158 
3159 #ifdef HAVE_REPLICATION
3160  DBUG_EXECUTE_IF("crash_create_after_update_index", DBUG_SUICIDE(););
3161 #endif
3162  }
3163 
3164  log_state= LOG_OPENED;
3165 
3166 #ifdef HAVE_REPLICATION
3167  close_purge_index_file();
3168 #endif
3169 
3170  DBUG_RETURN(0);
3171 
3172 err:
3173 #ifdef HAVE_REPLICATION
3174  if (is_inited_purge_index_file())
3175  purge_index_entry(NULL, NULL, need_lock_index);
3176  close_purge_index_file();
3177 #endif
3178  sql_print_error("Could not use %s for logging (error %d). \
3179 Turning logging off for the whole duration of the MySQL server process. \
3180 To turn it on again: fix the cause, \
3181 shutdown the MySQL server and restart it.", name, errno);
3182  if (file >= 0)
3183  mysql_file_close(file, MYF(0));
3184  end_io_cache(&log_file);
3185  end_io_cache(&index_file);
3186  my_free(name);
3187  name= NULL;
3188  log_state= LOG_CLOSED;
3189  DBUG_RETURN(1);
3190 }
3191 
3192 
3203 {
3204  int error= 0;
3205  File fd= -1;
3206  DBUG_ENTER("MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file");
3207 
3208  if (need_lock_index)
3209  mysql_mutex_lock(&LOCK_index);
3210  else
3211  mysql_mutex_assert_owner(&LOCK_index);
3212 
3213  if (my_b_inited(&index_file))
3214  {
3215  end_io_cache(&index_file);
3216  if (mysql_file_close(index_file.file, MYF(0)) < 0)
3217  {
3218  error= -1;
3219  sql_print_error("MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file "
3220  "failed to close the index file.");
3221  goto err;
3222  }
3223  mysql_file_delete(key_file_binlog_index, index_file_name, MYF(MY_WME));
3224  }
3225 
3226  DBUG_EXECUTE_IF("crash_create_before_rename_index_file", DBUG_SUICIDE(););
3227  if (my_rename(crash_safe_index_file_name, index_file_name, MYF(MY_WME)))
3228  {
3229  error= -1;
3230  sql_print_error("MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file "
3231  "failed to move crash_safe_index_file to index file.");
3232  goto err;
3233  }
3234  DBUG_EXECUTE_IF("crash_create_after_rename_index_file", DBUG_SUICIDE(););
3235 
3236  if ((fd= mysql_file_open(key_file_binlog_index,
3237  index_file_name,
3238  O_RDWR | O_CREAT | O_BINARY,
3239  MYF(MY_WME))) < 0 ||
3240  mysql_file_sync(fd, MYF(MY_WME)) ||
3241  init_io_cache(&index_file, fd, IO_SIZE, READ_CACHE,
3242  mysql_file_seek(fd, 0L, MY_SEEK_END, MYF(0)),
3243  0, MYF(MY_WME | MY_WAIT_IF_FULL)))
3244  {
3245  error= -1;
3246  sql_print_error("MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file "
3247  "failed to open the index file.");
3248  goto err;
3249  }
3250 
3251 err:
3252  if (need_lock_index)
3253  mysql_mutex_unlock(&LOCK_index);
3254  DBUG_RETURN(error);
3255 }
3256 
3257 
3272  int log_name_len, bool need_lock_index)
3273 {
3274  DBUG_ENTER("MYSQL_BIN_LOG::add_log_to_index");
3275 
3277  {
3278  sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
3279  "open the crash safe index file.");
3280  goto err;
3281  }
3282 
3283  if (copy_file(&index_file, &crash_safe_index_file, 0))
3284  {
3285  sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
3286  "copy index file to crash safe index file.");
3287  goto err;
3288  }
3289 
3290  if (my_b_write(&crash_safe_index_file, log_name, log_name_len) ||
3291  my_b_write(&crash_safe_index_file, (uchar*) "\n", 1) ||
3292  flush_io_cache(&crash_safe_index_file) ||
3293  mysql_file_sync(crash_safe_index_file.file, MYF(MY_WME)))
3294  {
3295  sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
3296  "append log file name: %s, to crash "
3297  "safe index file.", log_name);
3298  goto err;
3299  }
3300 
3302  {
3303  sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
3304  "close the crash safe index file.");
3305  goto err;
3306  }
3307 
3308  if (move_crash_safe_index_file_to_index_file(need_lock_index))
3309  {
3310  sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
3311  "move crash safe index file to index file.");
3312  goto err;
3313  }
3314 
3315  DBUG_RETURN(0);
3316 
3317 err:
3318  DBUG_RETURN(-1);
3319 }
3320 
3321 int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo)
3322 {
3323  mysql_mutex_lock(&LOCK_log);
3324  int ret = raw_get_current_log(linfo);
3325  mysql_mutex_unlock(&LOCK_log);
3326  return ret;
3327 }
3328 
3329 int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
3330 {
3331  strmake(linfo->log_file_name, log_file_name, sizeof(linfo->log_file_name)-1);
3332  linfo->pos = my_b_tell(&log_file);
3333  return 0;
3334 }
3335 
3336 bool MYSQL_BIN_LOG::check_write_error(THD *thd)
3337 {
3338  DBUG_ENTER("MYSQL_BIN_LOG::check_write_error");
3339 
3340  bool checked= FALSE;
3341 
3342  if (!thd->is_error())
3343  DBUG_RETURN(checked);
3344 
3345  switch (thd->get_stmt_da()->sql_errno())
3346  {
3347  case ER_TRANS_CACHE_FULL:
3348  case ER_STMT_CACHE_FULL:
3349  case ER_ERROR_ON_WRITE:
3350  case ER_BINLOG_LOGGING_IMPOSSIBLE:
3351  checked= TRUE;
3352  break;
3353  }
3354  DBUG_PRINT("return", ("checked: %s", YESNO(checked)));
3355  DBUG_RETURN(checked);
3356 }
3357 
3358 void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional)
3359 {
3360  DBUG_ENTER("MYSQL_BIN_LOG::set_write_error");
3361 
3362  write_error= 1;
3363 
3364  if (check_write_error(thd))
3365  DBUG_VOID_RETURN;
3366 
3367  if (my_errno == EFBIG)
3368  {
3369  if (is_transactional)
3370  {
3371  my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME));
3372  }
3373  else
3374  {
3375  my_message(ER_STMT_CACHE_FULL, ER(ER_STMT_CACHE_FULL), MYF(MY_WME));
3376  }
3377  }
3378  else
3379  {
3380  char errbuf[MYSYS_STRERROR_SIZE];
3381  my_error(ER_ERROR_ON_WRITE, MYF(MY_WME), name,
3382  errno, my_strerror(errbuf, sizeof(errbuf), errno));
3383  }
3384 
3385  DBUG_VOID_RETURN;
3386 }
3387 
3410 int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
3411  bool need_lock_index)
3412 {
3413  int error= 0;
3414  char *full_fname= linfo->log_file_name;
3415  char full_log_name[FN_REFLEN], fname[FN_REFLEN];
3416  uint log_name_len= 0, fname_len= 0;
3417  DBUG_ENTER("find_log_pos");
3418  full_log_name[0]= full_fname[0]= 0;
3419 
3420  /*
3421  Mutex needed because we need to make sure the file pointer does not
3422  move from under our feet
3423  */
3424  if (need_lock_index)
3425  mysql_mutex_lock(&LOCK_index);
3426  else
3427  mysql_mutex_assert_owner(&LOCK_index);
3428 
3429  // extend relative paths for log_name to be searched
3430  if (log_name)
3431  {
3432  if(normalize_binlog_name(full_log_name, log_name, is_relay_log))
3433  {
3434  error= LOG_INFO_EOF;
3435  goto end;
3436  }
3437  }
3438 
3439  log_name_len= log_name ? (uint) strlen(full_log_name) : 0;
3440  DBUG_PRINT("enter", ("log_name: %s, full_log_name: %s",
3441  log_name ? log_name : "NULL", full_log_name));
3442 
3443  /* As the file is flushed, we can't get an error here */
3444  my_b_seek(&index_file, (my_off_t) 0);
3445 
3446  for (;;)
3447  {
3448  uint length;
3449  my_off_t offset= my_b_tell(&index_file);
3450 
3451  DBUG_EXECUTE_IF("simulate_find_log_pos_error",
3452  error= LOG_INFO_EOF; break;);
3453  /* If we get 0 or 1 characters, this is the end of the file */
3454  if ((length= my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
3455  {
3456  /* Did not find the given entry; Return not found or error */
3457  error= !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
3458  break;
3459  }
3460 
3461  // extend relative paths and match against full path
3462  if (normalize_binlog_name(full_fname, fname, is_relay_log))
3463  {
3464  error= LOG_INFO_EOF;
3465  break;
3466  }
3467  fname_len= (uint) strlen(full_fname);
3468 
3469  // if the log entry matches, null string matching anything
3470  if (!log_name ||
3471  (log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' &&
3472  !memcmp(full_fname, full_log_name, log_name_len)))
3473  {
3474  DBUG_PRINT("info", ("Found log file entry"));
3475  full_fname[fname_len-1]= 0; // remove last \n
3476  linfo->index_file_start_offset= offset;
3477  linfo->index_file_offset = my_b_tell(&index_file);
3478  break;
3479  }
3480  linfo->entry_index++;
3481  }
3482 
3483 end:
3484  if (need_lock_index)
3485  mysql_mutex_unlock(&LOCK_index);
3486  DBUG_RETURN(error);
3487 }
3488 
3489 
3509 int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock_index)
3510 {
3511  int error= 0;
3512  uint length;
3513  char fname[FN_REFLEN];
3514  char *full_fname= linfo->log_file_name;
3515 
3516  if (need_lock_index)
3517  mysql_mutex_lock(&LOCK_index);
3518  else
3519  mysql_mutex_assert_owner(&LOCK_index);
3520 
3521  /* As the file is flushed, we can't get an error here */
3522  my_b_seek(&index_file, linfo->index_file_offset);
3523 
3524  linfo->index_file_start_offset= linfo->index_file_offset;
3525  if ((length=my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
3526  {
3527  error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
3528  goto err;
3529  }
3530 
3531  if (fname[0] != 0)
3532  {
3533  if(normalize_binlog_name(full_fname, fname, is_relay_log))
3534  {
3535  error= LOG_INFO_EOF;
3536  goto err;
3537  }
3538  length= strlen(full_fname);
3539  }
3540 
3541  full_fname[length-1]= 0; // kill \n
3542  linfo->index_file_offset= my_b_tell(&index_file);
3543 
3544 err:
3545  if (need_lock_index)
3546  mysql_mutex_unlock(&LOCK_index);
3547  return error;
3548 }
3549 
3550 
3569 {
3570  LOG_INFO linfo;
3571  bool error=0;
3572  int err;
3573  const char* save_name;
3574  DBUG_ENTER("reset_logs");
3575 
3576  /*
3577  Flush logs for storage engines, so that the last transaction
3578  is fsynced inside storage engines.
3579  */
3580  if (ha_flush_logs(NULL))
3581  DBUG_RETURN(1);
3582 
3583  ha_reset_logs(thd);
3584 
3585  /*
3586  The following mutex is needed to ensure that no threads call
3587  'delete thd' as we would then risk missing a 'rollback' from this
3588  thread. If the transaction involved MyISAM tables, it should go
3589  into binlog even on rollback.
3590  */
3591  mysql_mutex_lock(&LOCK_thread_count);
3592 
3593  /*
3594  We need to get both locks to be sure that no one is trying to
3595  write to the index log file.
3596  */
3597  mysql_mutex_lock(&LOCK_log);
3598  mysql_mutex_lock(&LOCK_index);
3599 
3600  global_sid_lock->wrlock();
3601 
3602  /* Save variables so that we can reopen the log */
3603  save_name=name;
3604  name=0; // Protect against free
3605  close(LOG_CLOSE_TO_BE_OPENED);
3606 
3607  /*
3608  First delete all old log files and then update the index file.
3609  As we first delete the log files and do not use sort of logging,
3610  a crash may lead to an inconsistent state where the index has
3611  references to non-existent files.
3612 
3613  We need to invert the steps and use the purge_index_file methods
3614  in order to make the operation safe.
3615  */
3616 
3617  if ((err= find_log_pos(&linfo, NullS, false/*need_lock_index=false*/)) != 0)
3618  {
3619  uint errcode= purge_log_get_error_code(err);
3620  sql_print_error("Failed to locate old binlog or relay log files");
3621  my_message(errcode, ER(errcode), MYF(0));
3622  error= 1;
3623  goto err;
3624  }
3625 
3626  for (;;)
3627  {
3628  if ((error= my_delete_allow_opened(linfo.log_file_name, MYF(0))) != 0)
3629  {
3630  if (my_errno == ENOENT)
3631  {
3632  push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
3633  ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
3634  linfo.log_file_name);
3635  sql_print_information("Failed to delete file '%s'",
3636  linfo.log_file_name);
3637  my_errno= 0;
3638  error= 0;
3639  }
3640  else
3641  {
3642  push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
3643  ER_BINLOG_PURGE_FATAL_ERR,
3644  "a problem with deleting %s; "
3645  "consider examining correspondence "
3646  "of your binlog index file "
3647  "to the actual binlog files",
3648  linfo.log_file_name);
3649  error= 1;
3650  goto err;
3651  }
3652  }
3653  if (find_next_log(&linfo, false/*need_lock_index=false*/))
3654  break;
3655  }
3656 
3657  /* Start logging with a new file */
3658  close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED);
3659  if ((error= my_delete_allow_opened(index_file_name, MYF(0)))) // Reset (open will update)
3660  {
3661  if (my_errno == ENOENT)
3662  {
3663  push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
3664  ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
3665  index_file_name);
3666  sql_print_information("Failed to delete file '%s'",
3667  index_file_name);
3668  my_errno= 0;
3669  error= 0;
3670  }
3671  else
3672  {
3673  push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
3674  ER_BINLOG_PURGE_FATAL_ERR,
3675  "a problem with deleting %s; "
3676  "consider examining correspondence "
3677  "of your binlog index file "
3678  "to the actual binlog files",
3679  index_file_name);
3680  error= 1;
3681  goto err;
3682  }
3683  }
3684 
3685 #ifdef HAVE_REPLICATION
3686  if (is_relay_log)
3687  {
3688  DBUG_ASSERT(active_mi != NULL);
3689  DBUG_ASSERT(active_mi->rli != NULL);
3690  (const_cast<Gtid_set *>(active_mi->rli->get_gtid_set()))->clear();
3691  }
3692  else
3693  {
3694  gtid_state->clear();
3695  // don't clear global_sid_map because it's used by the relay log too
3696  if (gtid_state->init() != 0)
3697  goto err;
3698  }
3699 #endif
3700 
3701  if (!open_index_file(index_file_name, 0, false/*need_lock_index=false*/))
3702  if ((error= open_binlog(save_name, 0, io_cache_type,
3703  max_size, false,
3704  false/*need_lock_index=false*/,
3705  false/*need_sid_lock=false*/,
3706  NULL)))
3707  goto err;
3708  my_free((void *) save_name);
3709 
3710 err:
3711  if (error == 1)
3712  name= const_cast<char*>(save_name);
3713  global_sid_lock->unlock();
3714  mysql_mutex_unlock(&LOCK_thread_count);
3715  mysql_mutex_unlock(&LOCK_index);
3716  mysql_mutex_unlock(&LOCK_log);
3717  DBUG_RETURN(error);
3718 }
3719 
3720 
3729 int MYSQL_BIN_LOG::set_crash_safe_index_file_name(const char *base_file_name)
3730 {
3731  int error= 0;
3732  DBUG_ENTER("MYSQL_BIN_LOG::set_crash_safe_index_file_name");
3733  if (fn_format(crash_safe_index_file_name, base_file_name, mysql_data_home,
3734  ".index_crash_safe", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH |
3735  MY_REPLACE_EXT)) == NULL)
3736  {
3737  error= 1;
3738  sql_print_error("MYSQL_BIN_LOG::set_crash_safe_index_file_name failed "
3739  "to set file name.");
3740  }
3741  DBUG_RETURN(error);
3742 }
3743 
3744 
3757 {
3758  int error= 0;
3759  File file= -1;
3760 
3761  DBUG_ENTER("MYSQL_BIN_LOG::open_crash_safe_index_file");
3762 
3763  if (!my_b_inited(&crash_safe_index_file))
3764  {
3765  if ((file= my_open(crash_safe_index_file_name, O_RDWR | O_CREAT | O_BINARY,
3766  MYF(MY_WME | ME_WAITTANG))) < 0 ||
3767  init_io_cache(&crash_safe_index_file, file, IO_SIZE, WRITE_CACHE,
3768  0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
3769  {
3770  error= 1;
3771  sql_print_error("MYSQL_BIN_LOG::open_crash_safe_index_file failed "
3772  "to open temporary index file.");
3773  }
3774  }
3775  DBUG_RETURN(error);
3776 }
3777 
3778 
3791 {
3792  int error= 0;
3793 
3794  DBUG_ENTER("MYSQL_BIN_LOG::close_crash_safe_index_file");
3795 
3796  if (my_b_inited(&crash_safe_index_file))
3797  {
3798  end_io_cache(&crash_safe_index_file);
3799  error= my_close(crash_safe_index_file.file, MYF(0));
3800  }
3801  memset(&crash_safe_index_file, 0, sizeof(crash_safe_index_file));
3802 
3803  DBUG_RETURN(error);
3804 }
3805 
3806 
3848 #ifdef HAVE_REPLICATION
3849 
3850 int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
3851 {
3852  int error;
3853  char *to_purge_if_included= NULL;
3854  DBUG_ENTER("purge_first_log");
3855 
3856  DBUG_ASSERT(current_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
3857  DBUG_ASSERT(is_relay_log);
3858  DBUG_ASSERT(is_open());
3859  DBUG_ASSERT(rli->slave_running == 1);
3860  DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->get_event_relay_log_name()));
3861 
3862  mysql_mutex_assert_owner(&rli->data_lock);
3863 
3864  mysql_mutex_lock(&LOCK_index);
3865  to_purge_if_included= my_strdup(rli->get_group_relay_log_name(), MYF(0));
3866 
3867  /*
3868  Read the next log file name from the index file and pass it back to
3869  the caller.
3870  */
3871  if((error=find_log_pos(&rli->linfo, rli->get_event_relay_log_name(),
3872  false/*need_lock_index=false*/)) ||
3873  (error=find_next_log(&rli->linfo, false/*need_lock_index=false*/)))
3874  {
3875  char buff[22];
3876  sql_print_error("next log error: %d offset: %s log: %s included: %d",
3877  error,
3878  llstr(rli->linfo.index_file_offset,buff),
3879  rli->get_event_relay_log_name(),
3880  included);
3881  goto err;
3882  }
3883 
3884  /*
3885  Reset rli's coordinates to the current log.
3886  */
3887  rli->set_event_relay_log_pos(BIN_LOG_HEADER_SIZE);
3888  rli->set_event_relay_log_name(rli->linfo.log_file_name);
3889 
3890  /*
3891  If we removed the rli->group_relay_log_name file,
3892  we must update the rli->group* coordinates, otherwise do not touch it as the
3893  group's execution is not finished (e.g. COMMIT not executed)
3894  */
3895  if (included)
3896  {
3897  rli->set_group_relay_log_pos(BIN_LOG_HEADER_SIZE);
3898  rli->set_group_relay_log_name(rli->linfo.log_file_name);
3900  }
3901 
3902  /* Store where we are in the new file for the execution thread */
3903  rli->flush_info(TRUE);
3904 
3905  DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE(););
3906 
3907  mysql_mutex_lock(&rli->log_space_lock);
3908  rli->relay_log.purge_logs(to_purge_if_included, included,
3909  false/*need_lock_index=false*/,
3910  false/*need_update_threads=false*/,
3911  &rli->log_space_total, true);
3912  // Tell the I/O thread to take the relay_log_space_limit into account
3913  rli->ignore_log_space_limit= 0;
3914  mysql_mutex_unlock(&rli->log_space_lock);
3915 
3916  /*
3917  Ok to broadcast after the critical region as there is no risk of
3918  the mutex being destroyed by this thread later - this helps save
3919  context switches
3920  */
3921  mysql_cond_broadcast(&rli->log_space_cond);
3922 
3923  /*
3924  * Need to update the log pos because purge logs has been called
3925  * after fetching initially the log pos at the begining of the method.
3926  */
3927  if((error=find_log_pos(&rli->linfo, rli->get_event_relay_log_name(),
3928  false/*need_lock_index=false*/)))
3929  {
3930  char buff[22];
3931  sql_print_error("next log error: %d offset: %s log: %s included: %d",
3932  error,
3933  llstr(rli->linfo.index_file_offset,buff),
3934  rli->get_group_relay_log_name(),
3935  included);
3936  goto err;
3937  }
3938 
3939  /* If included was passed, rli->linfo should be the first entry. */
3940  DBUG_ASSERT(!included || rli->linfo.index_file_start_offset == 0);
3941 
3942 err:
3943  my_free(to_purge_if_included);
3944  mysql_mutex_unlock(&LOCK_index);
3945  DBUG_RETURN(error);
3946 }
3947 
3948 
3970 int MYSQL_BIN_LOG::remove_logs_from_index(LOG_INFO* log_info, bool need_update_threads)
3971 {
3973  {
3974  sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to "
3975  "open the crash safe index file.");
3976  goto err;
3977  }
3978 
3979  if (copy_file(&index_file, &crash_safe_index_file,
3980  log_info->index_file_start_offset))
3981  {
3982  sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to "
3983  "copy index file to crash safe index file.");
3984  goto err;
3985  }
3986 
3988  {
3989  sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to "
3990  "close the crash safe index file.");
3991  goto err;
3992  }
3993  DBUG_EXECUTE_IF("fault_injection_copy_part_file", DBUG_SUICIDE(););
3994 
3995  if (move_crash_safe_index_file_to_index_file(false/*need_lock_index=false*/))
3996  {
3997  sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to "
3998  "move crash safe index file to index file.");
3999  goto err;
4000  }
4001 
4002  // now update offsets in index file for running threads
4003  if (need_update_threads)
4004  adjust_linfo_offsets(log_info->index_file_start_offset);
4005  return 0;
4006 
4007 err:
4008  return LOG_INFO_IO;
4009 }
4010 
4036 int MYSQL_BIN_LOG::purge_logs(const char *to_log,
4037  bool included,
4038  bool need_lock_index,
4039  bool need_update_threads,
4040  ulonglong *decrease_log_space,
4041  bool auto_purge)
4042 {
4043  int error= 0, no_of_log_files_to_purge= 0, no_of_log_files_purged= 0;
4044  int no_of_threads_locking_log= 0;
4045  bool exit_loop= 0;
4046  LOG_INFO log_info;
4047  THD *thd= current_thd;
4048  DBUG_ENTER("purge_logs");
4049  DBUG_PRINT("info",("to_log= %s",to_log));
4050 
4051  if (need_lock_index)
4052  mysql_mutex_lock(&LOCK_index);
4053  else
4054  mysql_mutex_assert_owner(&LOCK_index);
4055  if ((error=find_log_pos(&log_info, to_log, false/*need_lock_index=false*/)))
4056  {
4057  sql_print_error("MYSQL_BIN_LOG::purge_logs was called with file %s not "
4058  "listed in the index.", to_log);
4059  goto err;
4060  }
4061 
4062  no_of_log_files_to_purge= log_info.entry_index;
4063 
4064  if ((error= open_purge_index_file(TRUE)))
4065  {
4066  sql_print_error("MYSQL_BIN_LOG::purge_logs failed to sync the index file.");
4067  goto err;
4068  }
4069 
4070  /*
4071  File name exists in index file; delete until we find this file
4072  or a file that is used.
4073  */
4074  if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/)))
4075  goto err;
4076 
4077  while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)))
4078  {
4079  if(is_active(log_info.log_file_name))
4080  {
4081  if(!auto_purge)
4082  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4083  ER_WARN_PURGE_LOG_IS_ACTIVE,
4084  ER(ER_WARN_PURGE_LOG_IS_ACTIVE),
4085  log_info.log_file_name);
4086  break;
4087  }
4088 
4089  if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name)))
4090  {
4091  if(!auto_purge)
4092  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4093  ER_WARN_PURGE_LOG_IN_USE,
4094  ER(ER_WARN_PURGE_LOG_IN_USE),
4095  log_info.log_file_name, no_of_threads_locking_log,
4096  no_of_log_files_purged, no_of_log_files_to_purge);
4097  break;
4098  }
4099  no_of_log_files_purged++;
4100 
4101  if ((error= register_purge_index_entry(log_info.log_file_name)))
4102  {
4103  sql_print_error("MYSQL_BIN_LOG::purge_logs failed to copy %s to register file.",
4104  log_info.log_file_name);
4105  goto err;
4106  }
4107 
4108  if (find_next_log(&log_info, false/*need_lock_index=false*/) || exit_loop)
4109  break;
4110  }
4111 
4112  DBUG_EXECUTE_IF("crash_purge_before_update_index", DBUG_SUICIDE(););
4113 
4114  if ((error= sync_purge_index_file()))
4115  {
4116  sql_print_error("MYSQL_BIN_LOG::purge_logs failed to flush register file.");
4117  goto err;
4118  }
4119 
4120  /* We know how many files to delete. Update index file. */
4121  if ((error=remove_logs_from_index(&log_info, need_update_threads)))
4122  {
4123  sql_print_error("MYSQL_BIN_LOG::purge_logs failed to update the index file");
4124  goto err;
4125  }
4126 
4127  // Update gtid_state->lost_gtids
4128  if (gtid_mode > 0 && !is_relay_log)
4129  {
4130  global_sid_lock->wrlock();
4131  error= init_gtid_sets(NULL,
4132  const_cast<Gtid_set *>(gtid_state->get_lost_gtids()),
4133  opt_master_verify_checksum,
4134  false/*false=don't need lock*/);
4135  global_sid_lock->unlock();
4136  if (error)
4137  goto err;
4138  }
4139 
4140  DBUG_EXECUTE_IF("crash_purge_critical_after_update_index", DBUG_SUICIDE(););
4141 
4142 err:
4143 
4144  int error_index= 0, close_error_index= 0;
4145  /* Read each entry from purge_index_file and delete the file. */
4146  if (is_inited_purge_index_file() &&
4147  (error_index= purge_index_entry(thd, decrease_log_space, false/*need_lock_index=false*/)))
4148  sql_print_error("MYSQL_BIN_LOG::purge_logs failed to process registered files"
4149  " that would be purged.");
4150 
4151  close_error_index= close_purge_index_file();
4152 
4153  DBUG_EXECUTE_IF("crash_purge_non_critical_after_update_index", DBUG_SUICIDE(););
4154 
4155  if (need_lock_index)
4156  mysql_mutex_unlock(&LOCK_index);
4157 
4158  /*
4159  Error codes from purge logs take precedence.
4160  Then error codes from purging the index entry.
4161  Finally, error codes from closing the purge index file.
4162  */
4163  error= error ? error : (error_index ? error_index :
4164  close_error_index);
4165 
4166  DBUG_RETURN(error);
4167 }
4168 
4169 int MYSQL_BIN_LOG::set_purge_index_file_name(const char *base_file_name)
4170 {
4171  int error= 0;
4172  DBUG_ENTER("MYSQL_BIN_LOG::set_purge_index_file_name");
4173  if (fn_format(purge_index_file_name, base_file_name, mysql_data_home,
4174  ".~rec~", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH |
4175  MY_REPLACE_EXT)) == NULL)
4176  {
4177  error= 1;
4178  sql_print_error("MYSQL_BIN_LOG::set_purge_index_file_name failed to set "
4179  "file name.");
4180  }
4181  DBUG_RETURN(error);
4182 }
4183 
4184 int MYSQL_BIN_LOG::open_purge_index_file(bool destroy)
4185 {
4186  int error= 0;
4187  File file= -1;
4188 
4189  DBUG_ENTER("MYSQL_BIN_LOG::open_purge_index_file");
4190 
4191  if (destroy)
4192  close_purge_index_file();
4193 
4194  if (!my_b_inited(&purge_index_file))
4195  {
4196  if ((file= my_open(purge_index_file_name, O_RDWR | O_CREAT | O_BINARY,
4197  MYF(MY_WME | ME_WAITTANG))) < 0 ||
4198  init_io_cache(&purge_index_file, file, IO_SIZE,
4199  (destroy ? WRITE_CACHE : READ_CACHE),
4200  0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
4201  {
4202  error= 1;
4203  sql_print_error("MYSQL_BIN_LOG::open_purge_index_file failed to open register "
4204  " file.");
4205  }
4206  }
4207  DBUG_RETURN(error);
4208 }
4209 
4210 int MYSQL_BIN_LOG::close_purge_index_file()
4211 {
4212  int error= 0;
4213 
4214  DBUG_ENTER("MYSQL_BIN_LOG::close_purge_index_file");
4215 
4216  if (my_b_inited(&purge_index_file))
4217  {
4218  end_io_cache(&purge_index_file);
4219  error= my_close(purge_index_file.file, MYF(0));
4220  }
4221  my_delete(purge_index_file_name, MYF(0));
4222  memset(&purge_index_file, 0, sizeof(purge_index_file));
4223 
4224  DBUG_RETURN(error);
4225 }
4226 
4227 bool MYSQL_BIN_LOG::is_inited_purge_index_file()
4228 {
4229  DBUG_ENTER("MYSQL_BIN_LOG::is_inited_purge_index_file");
4230  DBUG_RETURN (my_b_inited(&purge_index_file));
4231 }
4232 
4233 int MYSQL_BIN_LOG::sync_purge_index_file()
4234 {
4235  int error= 0;
4236  DBUG_ENTER("MYSQL_BIN_LOG::sync_purge_index_file");
4237 
4238  if ((error= flush_io_cache(&purge_index_file)) ||
4239  (error= my_sync(purge_index_file.file, MYF(MY_WME))))
4240  DBUG_RETURN(error);
4241 
4242  DBUG_RETURN(error);
4243 }
4244 
4245 int MYSQL_BIN_LOG::register_purge_index_entry(const char *entry)
4246 {
4247  int error= 0;
4248  DBUG_ENTER("MYSQL_BIN_LOG::register_purge_index_entry");
4249 
4250  if ((error=my_b_write(&purge_index_file, (const uchar*)entry, strlen(entry))) ||
4251  (error=my_b_write(&purge_index_file, (const uchar*)"\n", 1)))
4252  DBUG_RETURN (error);
4253 
4254  DBUG_RETURN(error);
4255 }
4256 
4257 int MYSQL_BIN_LOG::register_create_index_entry(const char *entry)
4258 {
4259  DBUG_ENTER("MYSQL_BIN_LOG::register_create_index_entry");
4260  DBUG_RETURN(register_purge_index_entry(entry));
4261 }
4262 
4263 int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space,
4264  bool need_lock_index)
4265 {
4266  MY_STAT s;
4267  int error= 0;
4268  LOG_INFO log_info;
4269  LOG_INFO check_log_info;
4270 
4271  DBUG_ENTER("MYSQL_BIN_LOG:purge_index_entry");
4272 
4273  DBUG_ASSERT(my_b_inited(&purge_index_file));
4274 
4275  if ((error=reinit_io_cache(&purge_index_file, READ_CACHE, 0, 0, 0)))
4276  {
4277  sql_print_error("MYSQL_BIN_LOG::purge_index_entry failed to reinit register file "
4278  "for read");
4279  goto err;
4280  }
4281 
4282  for (;;)
4283  {
4284  uint length;
4285 
4286  if ((length=my_b_gets(&purge_index_file, log_info.log_file_name,
4287  FN_REFLEN)) <= 1)
4288  {
4289  if (purge_index_file.error)
4290  {
4291  error= purge_index_file.error;
4292  sql_print_error("MYSQL_BIN_LOG::purge_index_entry error %d reading from "
4293  "register file.", error);
4294  goto err;
4295  }
4296 
4297  /* Reached EOF */
4298  break;
4299  }
4300 
4301  /* Get rid of the trailing '\n' */
4302  log_info.log_file_name[length-1]= 0;
4303 
4304  if (!mysql_file_stat(m_key_file_log, log_info.log_file_name, &s, MYF(0)))
4305  {
4306  if (my_errno == ENOENT)
4307  {
4308  /*
4309  It's not fatal if we can't stat a log file that does not exist;
4310  If we could not stat, we won't delete.
4311  */
4312  if (thd)
4313  {
4314  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4315  ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
4316  log_info.log_file_name);
4317  }
4318  sql_print_information("Failed to execute mysql_file_stat on file '%s'",
4319  log_info.log_file_name);
4320  my_errno= 0;
4321  }
4322  else
4323  {
4324  /*
4325  Other than ENOENT are fatal
4326  */
4327  if (thd)
4328  {
4329  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4330  ER_BINLOG_PURGE_FATAL_ERR,
4331  "a problem with getting info on being purged %s; "
4332  "consider examining correspondence "
4333  "of your binlog index file "
4334  "to the actual binlog files",
4335  log_info.log_file_name);
4336  }
4337  else
4338  {
4339  sql_print_information("Failed to delete log file '%s'; "
4340  "consider examining correspondence "
4341  "of your binlog index file "
4342  "to the actual binlog files",
4343  log_info.log_file_name);
4344  }
4345  error= LOG_INFO_FATAL;
4346  goto err;
4347  }
4348  }
4349  else
4350  {
4351  if ((error= find_log_pos(&check_log_info, log_info.log_file_name,
4352  need_lock_index)))
4353  {
4354  if (error != LOG_INFO_EOF)
4355  {
4356  if (thd)
4357  {
4358  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4359  ER_BINLOG_PURGE_FATAL_ERR,
4360  "a problem with deleting %s and "
4361  "reading the binlog index file",
4362  log_info.log_file_name);
4363  }
4364  else
4365  {
4366  sql_print_information("Failed to delete file '%s' and "
4367  "read the binlog index file",
4368  log_info.log_file_name);
4369  }
4370  goto err;
4371  }
4372 
4373  error= 0;
4374  if (!need_lock_index)
4375  {
4376  /*
4377  This is to avoid triggering an error in NDB.
4378 
4379  @todo: This is weird, what does NDB errors have to do with
4380  need_lock_index? Explain better or refactor /Sven
4381  */
4382  ha_binlog_index_purge_file(current_thd, log_info.log_file_name);
4383  }
4384 
4385  DBUG_PRINT("info",("purging %s",log_info.log_file_name));
4386  if (!my_delete(log_info.log_file_name, MYF(0)))
4387  {
4388  if (decrease_log_space)
4389  *decrease_log_space-= s.st_size;
4390  }
4391  else
4392  {
4393  if (my_errno == ENOENT)
4394  {
4395  if (thd)
4396  {
4397  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4398  ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
4399  log_info.log_file_name);
4400  }
4401  sql_print_information("Failed to delete file '%s'",
4402  log_info.log_file_name);
4403  my_errno= 0;
4404  }
4405  else
4406  {
4407  if (thd)
4408  {
4409  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4410  ER_BINLOG_PURGE_FATAL_ERR,
4411  "a problem with deleting %s; "
4412  "consider examining correspondence "
4413  "of your binlog index file "
4414  "to the actual binlog files",
4415  log_info.log_file_name);
4416  }
4417  else
4418  {
4419  sql_print_information("Failed to delete file '%s'; "
4420  "consider examining correspondence "
4421  "of your binlog index file "
4422  "to the actual binlog files",
4423  log_info.log_file_name);
4424  }
4425  if (my_errno == EMFILE)
4426  {
4427  DBUG_PRINT("info",
4428  ("my_errno: %d, set ret = LOG_INFO_EMFILE", my_errno));
4429  error= LOG_INFO_EMFILE;
4430  goto err;
4431  }
4432  error= LOG_INFO_FATAL;
4433  goto err;
4434  }
4435  }
4436  }
4437  }
4438  }
4439 
4440 err:
4441  DBUG_RETURN(error);
4442 }
4443 
4464 int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time, bool auto_purge)
4465 {
4466  int error;
4467  int no_of_threads_locking_log= 0, no_of_log_files_purged= 0;
4468  bool log_is_active= false, log_is_in_use= false;
4469  char to_log[FN_REFLEN], copy_log_in_use[FN_REFLEN];
4470  LOG_INFO log_info;
4471  MY_STAT stat_area;
4472  THD *thd= current_thd;
4473 
4474  DBUG_ENTER("purge_logs_before_date");
4475 
4476  mysql_mutex_lock(&LOCK_index);
4477  to_log[0]= 0;
4478 
4479  if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/)))
4480  goto err;
4481 
4482  while (!(log_is_active= is_active(log_info.log_file_name)))
4483  {
4484  if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name)))
4485  {
4486  if (!auto_purge)
4487  {
4488  log_is_in_use= true;
4489  strcpy(copy_log_in_use, log_info.log_file_name);
4490  }
4491  break;
4492  }
4493  no_of_log_files_purged++;
4494 
4495  if (!mysql_file_stat(m_key_file_log,
4496  log_info.log_file_name, &stat_area, MYF(0)))
4497  {
4498  if (my_errno == ENOENT)
4499  {
4500  /*
4501  It's not fatal if we can't stat a log file that does not exist.
4502  */
4503  my_errno= 0;
4504  }
4505  else
4506  {
4507  /*
4508  Other than ENOENT are fatal
4509  */
4510  if (thd)
4511  {
4512  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4513  ER_BINLOG_PURGE_FATAL_ERR,
4514  "a problem with getting info on being purged %s; "
4515  "consider examining correspondence "
4516  "of your binlog index file "
4517  "to the actual binlog files",
4518  log_info.log_file_name);
4519  }
4520  else
4521  {
4522  sql_print_information("Failed to delete log file '%s'",
4523  log_info.log_file_name);
4524  }
4525  error= LOG_INFO_FATAL;
4526  goto err;
4527  }
4528  }
4529  else
4530  {
4531  if (stat_area.st_mtime < purge_time)
4532  strmake(to_log,
4533  log_info.log_file_name,
4534  sizeof(log_info.log_file_name) - 1);
4535  else
4536  break;
4537  }
4538  if (find_next_log(&log_info, false/*need_lock_index=false*/))
4539  break;
4540  }
4541 
4542  if (log_is_active)
4543  {
4544  if(!auto_purge)
4545  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4546  ER_WARN_PURGE_LOG_IS_ACTIVE,
4547  ER(ER_WARN_PURGE_LOG_IS_ACTIVE),
4548  log_info.log_file_name);
4549 
4550  }
4551 
4552  if (log_is_in_use)
4553  {
4554  int no_of_log_files_to_purge= no_of_log_files_purged+1;
4555  while (strcmp(log_file_name, log_info.log_file_name))
4556  {
4557  if (mysql_file_stat(m_key_file_log, log_info.log_file_name,
4558  &stat_area, MYF(0)))
4559  {
4560  if (stat_area.st_mtime < purge_time)
4561  no_of_log_files_to_purge++;
4562  else
4563  break;
4564  }
4565  if (find_next_log(&log_info, false/*need_lock_index=false*/))
4566  {
4567  no_of_log_files_to_purge++;
4568  break;
4569  }
4570  }
4571 
4572  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4573  ER_WARN_PURGE_LOG_IN_USE,
4574  ER(ER_WARN_PURGE_LOG_IN_USE),
4575  copy_log_in_use, no_of_threads_locking_log,
4576  no_of_log_files_purged, no_of_log_files_to_purge);
4577  }
4578 
4579  error= (to_log[0] ? purge_logs(to_log, true,
4580  false/*need_lock_index=false*/,
4581  true/*need_update_threads=true*/,
4582  (ulonglong *) 0, auto_purge) : 0);
4583 
4584 err:
4585  mysql_mutex_unlock(&LOCK_index);
4586  DBUG_RETURN(error);
4587 }
4588 #endif /* HAVE_REPLICATION */
4589 
4590 
4600 void MYSQL_BIN_LOG::make_log_name(char* buf, const char* log_ident)
4601 {
4602  uint dir_len = dirname_length(log_file_name);
4603  if (dir_len >= FN_REFLEN)
4604  dir_len=FN_REFLEN-1;
4605  strnmov(buf, log_file_name, dir_len);
4606  strmake(buf+dir_len, log_ident, FN_REFLEN - dir_len -1);
4607 }
4608 
4609 
4614 bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg)
4615 {
4616  return !strcmp(log_file_name, log_file_name_arg);
4617 }
4618 
4619 
4620 /*
4621  Wrappers around new_file_impl to avoid using argument
4622  to control locking. The argument 1) less readable 2) breaks
4623  incapsulation 3) allows external access to the class without
4624  a lock (which is not possible with private new_file_without_locking
4625  method).
4626 
4627  @retval
4628  nonzero - error
4629 
4630 */
4631 
4632 int MYSQL_BIN_LOG::new_file(Format_description_log_event *extra_description_event)
4633 {
4634  return new_file_impl(true/*need_lock_log=true*/, extra_description_event);
4635 }
4636 
4637 /*
4638  @retval
4639  nonzero - error
4640 */
4641 int MYSQL_BIN_LOG::new_file_without_locking(Format_description_log_event *extra_description_event)
4642 {
4643  return new_file_impl(false/*need_lock_log=false*/, extra_description_event);
4644 }
4645 
4646 
4658 int MYSQL_BIN_LOG::new_file_impl(bool need_lock_log, Format_description_log_event *extra_description_event)
4659 {
4660  int error= 0, close_on_error= FALSE;
4661  char new_name[FN_REFLEN], *new_name_ptr, *old_name, *file_to_open;
4662 
4663  DBUG_ENTER("MYSQL_BIN_LOG::new_file_impl");
4664  if (!is_open())
4665  {
4666  DBUG_PRINT("info",("log is closed"));
4667  DBUG_RETURN(error);
4668  }
4669 
4670  if (need_lock_log)
4671  mysql_mutex_lock(&LOCK_log);
4672  else
4673  mysql_mutex_assert_owner(&LOCK_log);
4674  DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
4675  DEBUG_SYNC(current_thd, "before_rotate_binlog"););
4676  mysql_mutex_lock(&LOCK_xids);
4677  /*
4678  We need to ensure that the number of prepared XIDs are 0.
4679 
4680  If m_prep_xids is not zero:
4681  - We wait for storage engine commit, hence decrease m_prep_xids
4682  - We keep the LOCK_log to block new transactions from being
4683  written to the binary log.
4684  */
4685  while (get_prep_xids() > 0)
4686  mysql_cond_wait(&m_prep_xids_cond, &LOCK_xids);
4687  mysql_mutex_unlock(&LOCK_xids);
4688 
4689  mysql_mutex_lock(&LOCK_index);
4690 
4691  if (DBUG_EVALUATE_IF("expire_logs_always", 0, 1)
4692  && (error= ha_flush_logs(NULL)))
4693  goto end;
4694 
4695  mysql_mutex_assert_owner(&LOCK_log);
4696  mysql_mutex_assert_owner(&LOCK_index);
4697 
4698  /* Reuse old name if not binlog and not update log */
4699  new_name_ptr= name;
4700 
4701  /*
4702  If user hasn't specified an extension, generate a new log name
4703  We have to do this here and not in open as we want to store the
4704  new file name in the current binary log file.
4705  */
4706  if ((error= generate_new_name(new_name, name)))
4707  goto end;
4708  else
4709  {
4710  new_name_ptr=new_name;
4711  /*
4712  We log the whole file name for log file as the user may decide
4713  to change base names at some point.
4714  */
4715  Rotate_log_event r(new_name+dirname_length(new_name), 0, LOG_EVENT_OFFSET,
4716  is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
4717  /*
4718  The current relay-log's closing Rotate event must have checksum
4719  value computed with an algorithm of the last relay-logged FD event.
4720  */
4721  if (is_relay_log)
4722  r.checksum_alg= relay_log_checksum_alg;
4723  DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
4724  if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
4725  (error= r.write(&log_file)))
4726  {
4727  char errbuf[MYSYS_STRERROR_SIZE];
4728  DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;);
4729  close_on_error= TRUE;
4730  my_printf_error(ER_ERROR_ON_WRITE, ER(ER_CANT_OPEN_FILE),
4731  MYF(ME_FATALERROR), name,
4732  errno, my_strerror(errbuf, sizeof(errbuf), errno));
4733  goto end;
4734  }
4735  bytes_written += r.data_written;
4736  }
4737  /*
4738  Update needs to be signalled even if there is no rotate event
4739  log rotation should give the waiting thread a signal to
4740  discover EOF and move on to the next log.
4741  */
4742  signal_update();
4743 
4744  old_name=name;
4745  name=0; // Don't free name
4746  close(LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX);
4747 
4748  if (checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF)
4749  {
4750  DBUG_ASSERT(!is_relay_log);
4751  DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset);
4752  binlog_checksum_options= checksum_alg_reset;
4753  }
4754  /*
4755  Note that at this point, log_state != LOG_CLOSED (important for is_open()).
4756  */
4757 
4758  /*
4759  new_file() is only used for rotation (in FLUSH LOGS or because size >
4760  max_binlog_size or max_relay_log_size).
4761  If this is a binary log, the Format_description_log_event at the beginning of
4762  the new file should have created=0 (to distinguish with the
4763  Format_description_log_event written at server startup, which should
4764  trigger temp tables deletion on slaves.
4765  */
4766 
4767  /* reopen index binlog file, BUG#34582 */
4768  file_to_open= index_file_name;
4769  error= open_index_file(index_file_name, 0, false/*need_lock_index=false*/);
4770  if (!error)
4771  {
4772  /* reopen the binary log file. */
4773  file_to_open= new_name_ptr;
4774  error= open_binlog(old_name, new_name_ptr, io_cache_type,
4775  max_size, true,
4776  false/*need_lock_index=false*/,
4777  true/*need_sid_lock=true*/,
4778  extra_description_event);
4779  }
4780 
4781  /* handle reopening errors */
4782  if (error)
4783  {
4784  char errbuf[MYSYS_STRERROR_SIZE];
4785  my_printf_error(ER_CANT_OPEN_FILE, ER(ER_CANT_OPEN_FILE),
4786  MYF(ME_FATALERROR), file_to_open,
4787  error, my_strerror(errbuf, sizeof(errbuf), error));
4788  close_on_error= TRUE;
4789  }
4790  my_free(old_name);
4791 
4792 end:
4793 
4794  if (error && close_on_error /* rotate or reopen failed */)
4795  {
4796  /*
4797  Close whatever was left opened.
4798 
4799  We are keeping the behavior as it exists today, ie,
4800  we disable logging and move on (see: BUG#51014).
4801 
4802  TODO: as part of WL#1790 consider other approaches:
4803  - kill mysql (safety);
4804  - try multiple locations for opening a log file;
4805  - switch server to protected/readonly mode
4806  - ...
4807  */
4808  close(LOG_CLOSE_INDEX);
4809  sql_print_error("Could not open %s for logging (error %d). "
4810  "Turning logging off for the whole duration "
4811  "of the MySQL server process. To turn it on "
4812  "again: fix the cause, shutdown the MySQL "
4813  "server and restart it.",
4814  new_name_ptr, errno);
4815  }
4816 
4817  mysql_mutex_unlock(&LOCK_index);
4818  if (need_lock_log)
4819  mysql_mutex_unlock(&LOCK_log);
4820 
4821  DBUG_RETURN(error);
4822 }
4823 
4824 
4825 #ifdef HAVE_REPLICATION
4826 
4842 bool MYSQL_BIN_LOG::after_append_to_relay_log(Master_info *mi)
4843 {
4844  DBUG_ENTER("MYSQL_BIN_LOG::after_append_to_relay_log");
4845  DBUG_PRINT("info",("max_size: %lu",max_size));
4846 
4847  // Check pre-conditions
4848  mysql_mutex_assert_owner(&LOCK_log);
4849  mysql_mutex_assert_owner(&mi->data_lock);
4850  DBUG_ASSERT(is_relay_log);
4851  DBUG_ASSERT(current_thd->system_thread == SYSTEM_THREAD_SLAVE_IO);
4852 
4853  // Flush and sync
4854  bool error= false;
4855  if (flush_and_sync(0) == 0)
4856  {
4857  // If relay log is too big, rotate
4858  if ((uint) my_b_append_tell(&log_file) >
4859  DBUG_EVALUATE_IF("rotate_slave_debug_group", 500, max_size))
4860  {
4861  error= new_file_without_locking(mi->get_mi_description_event());
4862  }
4863  }
4864 
4865  signal_update();
4866 
4867  DBUG_RETURN(error);
4868 }
4869 
4870 
4871 bool MYSQL_BIN_LOG::append_event(Log_event* ev, Master_info *mi)
4872 {
4873  DBUG_ENTER("MYSQL_BIN_LOG::append");
4874 
4875  // check preconditions
4876  DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
4877  DBUG_ASSERT(is_relay_log);
4878 
4879  // acquire locks
4880  mysql_mutex_lock(&LOCK_log);
4881 
4882  // write data
4883  bool error = false;
4884  if (ev->write(&log_file) == 0)
4885  {
4886  bytes_written+= ev->data_written;
4887  error= after_append_to_relay_log(mi);
4888  }
4889  else
4890  error= true;
4891 
4892  mysql_mutex_unlock(&LOCK_log);
4893  DBUG_RETURN(error);
4894 }
4895 
4896 
4897 bool MYSQL_BIN_LOG::append_buffer(const char* buf, uint len, Master_info *mi)
4898 {
4899  DBUG_ENTER("MYSQL_BIN_LOG::append_buffer");
4900 
4901  // check preconditions
4902  DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
4903  DBUG_ASSERT(is_relay_log);
4904  mysql_mutex_assert_owner(&LOCK_log);
4905 
4906  // write data
4907  bool error= false;
4908  if (my_b_append(&log_file,(uchar*) buf,len) == 0)
4909  {
4910  bytes_written += len;
4911  error= after_append_to_relay_log(mi);
4912  }
4913  else
4914  error= true;
4915 
4916  DBUG_RETURN(error);
4917 }
4918 #endif // ifdef HAVE_REPLICATION
4919 
4920 bool MYSQL_BIN_LOG::flush_and_sync(const bool force)
4921 {
4922  mysql_mutex_assert_owner(&LOCK_log);
4923 
4924  if (flush_io_cache(&log_file))
4925  return 1;
4926 
4927  std::pair<bool, bool> result= sync_binlog_file(force);
4928 
4929  return result.first;
4930 }
4931 
4932 void MYSQL_BIN_LOG::start_union_events(THD *thd, query_id_t query_id_param)
4933 {
4934  DBUG_ASSERT(!thd->binlog_evt_union.do_union);
4935  thd->binlog_evt_union.do_union= TRUE;
4936  thd->binlog_evt_union.unioned_events= FALSE;
4937  thd->binlog_evt_union.unioned_events_trans= FALSE;
4938  thd->binlog_evt_union.first_query_id= query_id_param;
4939 }
4940 
4941 void MYSQL_BIN_LOG::stop_union_events(THD *thd)
4942 {
4943  DBUG_ASSERT(thd->binlog_evt_union.do_union);
4944  thd->binlog_evt_union.do_union= FALSE;
4945 }
4946 
4947 bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param)
4948 {
4949  return (thd->binlog_evt_union.do_union &&
4950  query_id_param >= thd->binlog_evt_union.first_query_id);
4951 }
4952 
4953 /*
4954  Updates thd's position-of-next-event variables
4955  after a *real* write a file.
4956  */
4957 void MYSQL_BIN_LOG::update_thd_next_event_pos(THD* thd)
4958 {
4959  if (likely(thd != NULL))
4960  {
4961  thd->set_next_event_pos(log_file_name,
4962  my_b_tell(&log_file));
4963  }
4964 }
4965 
4966 /*
4967  Moves the last bunch of rows from the pending Rows event to a cache (either
4968  transactional cache if is_transaction is @c true, or the non-transactional
4969  cache otherwise. Sets a new pending event.
4970 
4971  @param thd a pointer to the user thread.
4972  @param evt a pointer to the row event.
4973  @param is_transactional @c true indicates a transactional cache,
4974  otherwise @c false a non-transactional.
4975 */
4976 int
4977 MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
4978  Rows_log_event* event,
4979  bool is_transactional)
4980 {
4981  DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
4982  DBUG_ASSERT(mysql_bin_log.is_open());
4983  DBUG_PRINT("enter", ("event: 0x%lx", (long) event));
4984 
4985  int error= 0;
4986  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
4987 
4988  DBUG_ASSERT(cache_mngr);
4989 
4990  binlog_cache_data *cache_data=
4991  cache_mngr->get_binlog_cache_data(is_transactional);
4992 
4993  DBUG_PRINT("info", ("cache_mngr->pending(): 0x%lx", (long) cache_data->pending()));
4994 
4995  if (Rows_log_event* pending= cache_data->pending())
4996  {
4997  /*
4998  Write pending event to the cache.
4999  */
5000  if (cache_data->write_event(thd, pending))
5001  {
5002  set_write_error(thd, is_transactional);
5003  if (check_write_error(thd) && cache_data &&
5005  cache_data->set_incident();
5006  delete pending;
5007  cache_data->set_pending(NULL);
5008  DBUG_RETURN(1);
5009  }
5010 
5011  delete pending;
5012  }
5013 
5014  cache_data->set_pending(event);
5015 
5016  DBUG_RETURN(error);
5017 }
5018 
5024 {
5025  THD *thd= event_info->thd;
5026  bool error= 1;
5027  DBUG_ENTER("MYSQL_BIN_LOG::write_event(Log_event *)");
5028 
5029  if (thd->binlog_evt_union.do_union)
5030  {
5031  /*
5032  In Stored function; Remember that function call caused an update.
5033  We will log the function call to the binary log on function exit
5034  */
5035  thd->binlog_evt_union.unioned_events= TRUE;
5036  thd->binlog_evt_union.unioned_events_trans |=
5037  event_info->is_using_trans_cache();
5038  DBUG_RETURN(0);
5039  }
5040 
5041  /*
5042  We only end the statement if we are in a top-level statement. If
5043  we are inside a stored function, we do not end the statement since
5044  this will close all tables on the slave.
5045  */
5046  bool const end_stmt=
5047  thd->locked_tables_mode && thd->lex->requires_prelocking();
5048  if (thd->binlog_flush_pending_rows_event(end_stmt,
5049  event_info->is_using_trans_cache()))
5050  DBUG_RETURN(error);
5051 
5052  /*
5053  In most cases this is only called if 'is_open()' is true; in fact this is
5054  mostly called if is_open() *was* true a few instructions before, but it
5055  could have changed since.
5056  */
5057  if (likely(is_open()))
5058  {
5059 #ifdef HAVE_REPLICATION
5060  /*
5061  In the future we need to add to the following if tests like
5062  "do the involved tables match (to be implemented)
5063  binlog_[wild_]{do|ignore}_table?" (WL#1049)"
5064  */
5065  const char *local_db= event_info->get_db();
5066  if ((thd && !(thd->variables.option_bits & OPTION_BIN_LOG)) ||
5067  (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT &&
5068  thd->lex->sql_command != SQLCOM_SAVEPOINT &&
5069  (!event_info->is_no_filter_event() &&
5070  !binlog_filter->db_ok(local_db))))
5071  DBUG_RETURN(0);
5072 #endif /* HAVE_REPLICATION */
5073 
5074  DBUG_ASSERT(event_info->is_using_trans_cache() || event_info->is_using_stmt_cache());
5075 
5076  if (binlog_start_trans_and_stmt(thd, event_info))
5077  DBUG_RETURN(error);
5078 
5079  bool is_trans_cache= event_info->is_using_trans_cache();
5080  binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
5081  binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(is_trans_cache);
5082 
5083  DBUG_PRINT("info",("event type: %d",event_info->get_type_code()));
5084 
5085  /*
5086  No check for auto events flag here - this write method should
5087  never be called if auto-events are enabled.
5088 
5089  Write first log events which describe the 'run environment'
5090  of the SQL command. If row-based binlogging, Insert_id, Rand
5091  and other kind of "setting context" events are not needed.
5092  */
5093  if (thd)
5094  {
5095  if (!thd->is_current_stmt_binlog_format_row())
5096  {
5097  if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt)
5098  {
5099  Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
5100  thd->first_successful_insert_id_in_prev_stmt_for_binlog,
5101  event_info->event_cache_type, event_info->event_logging_type);
5102  if (cache_data->write_event(thd, &e))
5103  goto err;
5104  }
5105  if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
5106  {
5107  DBUG_PRINT("info",("number of auto_inc intervals: %u",
5108  thd->auto_inc_intervals_in_cur_stmt_for_binlog.
5109  nb_elements()));
5110  Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
5111  thd->auto_inc_intervals_in_cur_stmt_for_binlog.
5112  minimum(), event_info->event_cache_type,
5113  event_info->event_logging_type);
5114  if (cache_data->write_event(thd, &e))
5115  goto err;
5116  }
5117  if (thd->rand_used)
5118  {
5119  Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
5120  event_info->event_cache_type,
5121  event_info->event_logging_type);
5122  if (cache_data->write_event(thd, &e))
5123  goto err;
5124  }
5125  if (thd->user_var_events.elements)
5126  {
5127  for (uint i= 0; i < thd->user_var_events.elements; i++)
5128  {
5129  BINLOG_USER_VAR_EVENT *user_var_event;
5130  get_dynamic(&thd->user_var_events,(uchar*) &user_var_event, i);
5131 
5132  /* setting flags for user var log event */
5133  uchar flags= User_var_log_event::UNDEF_F;
5134  if (user_var_event->unsigned_flag)
5135  flags|= User_var_log_event::UNSIGNED_F;
5136 
5137  User_var_log_event e(thd,
5138  user_var_event->user_var_event->entry_name.ptr(),
5139  user_var_event->user_var_event->entry_name.length(),
5140  user_var_event->value,
5141  user_var_event->length,
5142  user_var_event->type,
5143  user_var_event->charset_number, flags,
5144  event_info->event_cache_type,
5145  event_info->event_logging_type);
5146  if (cache_data->write_event(thd, &e))
5147  goto err;
5148  }
5149  }
5150  }
5151  }
5152 
5153  /*
5154  Write the event.
5155  */
5156  if (cache_data->write_event(thd, event_info) ||
5157  DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
5158  goto err;
5159 
5160  /*
5161  After writing the event, if the trx-cache was used and any unsafe
5162  change was written into it, the cache is marked as cannot safely
5163  roll back.
5164  */
5165  if (is_trans_cache && stmt_cannot_safely_rollback(thd))
5166  cache_mngr->trx_cache.set_cannot_rollback();
5167 
5168  error= 0;
5169 
5170 err:
5171  if (error)
5172  {
5173  set_write_error(thd, is_trans_cache);
5174  if (check_write_error(thd) && cache_data &&
5176  cache_data->set_incident();
5177  }
5178  }
5179 
5180  DBUG_RETURN(error);
5181 }
5182 
5200 int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
5201 {
5202  int error= 0;
5203  DBUG_ENTER("MYSQL_BIN_LOG::rotate");
5204 
5205  DBUG_ASSERT(!is_relay_log);
5206  mysql_mutex_assert_owner(&LOCK_log);
5207 
5208  *check_purge= false;
5209 
5210  if (force_rotate || (my_b_tell(&log_file) >= (my_off_t) max_size))
5211  {
5212  if ((error= new_file_without_locking(NULL)))
5222  if (!write_incident(current_thd, false/*need_lock_log=false*/,
5223  false/*do_flush_and_sync==false*/))
5224  {
5225  /*
5226  Write an error to log. So that user might have a chance
5227  to be alerted and explore incident details before its
5228  slave servers would stop.
5229  */
5230  sql_print_error("The server was unable to create a new log file. "
5231  "An incident event has been written to the binary "
5232  "log which will stop the slaves.");
5233  flush_and_sync(0);
5234  }
5235 
5236  *check_purge= true;
5237  }
5238  DBUG_RETURN(error);
5239 }
5240 
5248 {
5249 #ifdef HAVE_REPLICATION
5250  if (expire_logs_days)
5251  {
5252  DEBUG_SYNC(current_thd, "at_purge_logs_before_date");
5253  time_t purge_time= my_time(0) - expire_logs_days*24*60*60;
5254  DBUG_EXECUTE_IF("expire_logs_always",
5255  { purge_time= my_time(0);});
5256  if (purge_time >= 0)
5257  {
5258  /*
5259  Flush logs for storage engines, so that the last transaction
5260  is fsynced inside storage engines.
5261  */
5262  ha_flush_logs(NULL);
5263  purge_logs_before_date(purge_time, true);
5264  }
5265  }
5266 #endif
5267 }
5268 
5278 int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate)
5279 {
5280  int error= 0;
5281  DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge");
5282  bool check_purge= false;
5283 
5284  DBUG_ASSERT(!is_relay_log);
5285  mysql_mutex_lock(&LOCK_log);
5286  error= rotate(force_rotate, &check_purge);
5287  /*
5288  NOTE: Run purge_logs wo/ holding LOCK_log because it does not need
5289  the mutex. Otherwise causes various deadlocks.
5290  */
5291  mysql_mutex_unlock(&LOCK_log);
5292 
5293  if (!error && check_purge)
5294  purge();
5295 
5296  DBUG_RETURN(error);
5297 }
5298 
5299 uint MYSQL_BIN_LOG::next_file_id()
5300 {
5301  uint res;
5302  mysql_mutex_lock(&LOCK_log);
5303  res = file_id++;
5304  mysql_mutex_unlock(&LOCK_log);
5305  return res;
5306 }
5307 
5308 
5325  static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len,
5326  uint length, ha_checksum *crc)
5327 {
5328  ulong ret;
5329  uchar *event_begin= buf + off;
5330  uint16 flags= uint2korr(event_begin + FLAGS_OFFSET);
5331 
5332  DBUG_ASSERT(length >= off + LOG_EVENT_HEADER_LEN); //at least common header in
5333  int2store(event_begin + FLAGS_OFFSET, flags);
5334  ret= length >= off + event_len ? 0 : off + event_len - length;
5335  *crc= my_checksum(*crc, event_begin, event_len - ret);
5336  return ret;
5337 }
5338 
5339 /*
5340  Write the contents of a cache to the binary log.
5341 
5342  SYNOPSIS
5343  do_write_cache()
5344  cache Cache to write to the binary log
5345  lock_log True if the LOCK_log mutex should be aquired, false otherwise
5346 
5347  DESCRIPTION
5348  Write the contents of the cache to the binary log. The cache will
5349  be reset as a READ_CACHE to be able to read the contents from it.
5350 
5351  Reading from the trans cache with possible (per @c binlog_checksum_options)
5352  adding checksum value and then fixing the length and the end_log_pos of
5353  events prior to fill in the binlog cache.
5354 */
5355 
5356 int MYSQL_BIN_LOG::do_write_cache(IO_CACHE *cache)
5357 {
5358  DBUG_ENTER("MYSQL_BIN_LOG::do_write_cache(IO_CACHE *)");
5359 
5360  DBUG_EXECUTE_IF("simulate_do_write_cache_failure",
5361  {
5362  /*
5363  see binlog_cache_data::write_event() that reacts on
5364  @c simulate_disk_full_at_flush_pending.
5365  */
5366  DBUG_SET("-d,simulate_do_write_cache_failure");
5367  DBUG_RETURN(ER_ERROR_ON_WRITE);
5368  });
5369 
5370  if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
5371  DBUG_RETURN(ER_ERROR_ON_WRITE);
5372  uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
5373  ulong remains= 0; // part of unprocessed yet netto length of the event
5374  long val;
5375  ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
5376  uchar header[LOG_EVENT_HEADER_LEN];
5377  ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy
5378  my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
5379  uchar buf[BINLOG_CHECKSUM_LEN];
5380 
5381  // while there is just one alg the following must hold:
5382  DBUG_ASSERT(!do_checksum ||
5383  binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
5384 
5385  /*
5386  The events in the buffer have incorrect end_log_pos data
5387  (relative to beginning of group rather than absolute),
5388  so we'll recalculate them in situ so the binlog is always
5389  correct, even in the middle of a group. This is possible
5390  because we now know the start position of the group (the
5391  offset of this cache in the log, if you will); all we need
5392  to do is to find all event-headers, and add the position of
5393  the group to the end_log_pos of each event. This is pretty
5394  straight forward, except that we read the cache in segments,
5395  so an event-header might end up on the cache-border and get
5396  split.
5397  */
5398 
5399  group= (uint)my_b_tell(&log_file);
5400  DBUG_PRINT("debug", ("length: %llu, group: %llu",
5401  (ulonglong) length, (ulonglong) group));
5402  hdr_offs= carry= 0;
5403  if (do_checksum)
5404  crc= crc_0= my_checksum(0L, NULL, 0);
5405 
5406  if (DBUG_EVALUATE_IF("fault_injection_crc_value", 1, 0))
5407  crc= crc - 1;
5408 
5409  do
5410  {
5411  /*
5412  if we only got a partial header in the last iteration,
5413  get the other half now and process a full header.
5414  */
5415  if (unlikely(carry > 0))
5416  {
5417  DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
5418 
5419  /* assemble both halves */
5420  memcpy(&header[carry], (char *)cache->read_pos,
5421  LOG_EVENT_HEADER_LEN - carry);
5422 
5423  /* fix end_log_pos */
5424  val=uint4korr(header + LOG_POS_OFFSET);
5425  val+= group +
5426  (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
5427  int4store(&header[LOG_POS_OFFSET], val);
5428 
5429  if (do_checksum)
5430  {
5431  ulong len= uint4korr(header + EVENT_LEN_OFFSET);
5432  /* fix len */
5433  int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN);
5434  }
5435 
5436  /* write the first half of the split header */
5437  if (my_b_write(&log_file, header, carry))
5438  DBUG_RETURN(ER_ERROR_ON_WRITE);
5439 
5440  /*
5441  copy fixed second half of header to cache so the correct
5442  version will be written later.
5443  */
5444  memcpy((char *)cache->read_pos, &header[carry],
5445  LOG_EVENT_HEADER_LEN - carry);
5446 
5447  /* next event header at ... */
5448  hdr_offs= uint4korr(header + EVENT_LEN_OFFSET) - carry -
5449  (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
5450 
5451  if (do_checksum)
5452  {
5453  DBUG_ASSERT(crc == crc_0 && remains == 0);
5454  crc= my_checksum(crc, header, carry);
5455  remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
5457  }
5458  carry= 0;
5459  }
5460 
5461  /* if there is anything to write, process it. */
5462 
5463  if (likely(length > 0))
5464  {
5465  /*
5466  process all event-headers in this (partial) cache.
5467  if next header is beyond current read-buffer,
5468  we'll get it later (though not necessarily in the
5469  very next iteration, just "eventually").
5470  */
5471 
5472  /* crc-calc the whole buffer */
5473  if (do_checksum && hdr_offs >= length)
5474  {
5475 
5476  DBUG_ASSERT(remains != 0 && crc != crc_0);
5477 
5478  crc= my_checksum(crc, cache->read_pos, length);
5479  remains -= length;
5480  if (my_b_write(&log_file, cache->read_pos, length))
5481  DBUG_RETURN(ER_ERROR_ON_WRITE);
5482  if (remains == 0)
5483  {
5484  int4store(buf, crc);
5485  if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
5486  DBUG_RETURN(ER_ERROR_ON_WRITE);
5487  crc= crc_0;
5488  }
5489  }
5490 
5491  while (hdr_offs < length)
5492  {
5493  /*
5494  partial header only? save what we can get, process once
5495  we get the rest.
5496  */
5497 
5498  if (do_checksum)
5499  {
5500  if (remains != 0)
5501  {
5502  /*
5503  finish off with remains of the last event that crawls
5504  from previous into the current buffer
5505  */
5506  DBUG_ASSERT(crc != crc_0);
5507  crc= my_checksum(crc, cache->read_pos, hdr_offs);
5508  int4store(buf, crc);
5509  remains -= hdr_offs;
5510  DBUG_ASSERT(remains == 0);
5511  if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
5512  my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
5513  DBUG_RETURN(ER_ERROR_ON_WRITE);
5514  crc= crc_0;
5515  }
5516  }
5517 
5518  if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
5519  {
5520  carry= length - hdr_offs;
5521  memcpy(header, (char *)cache->read_pos + hdr_offs, carry);
5522  length= hdr_offs;
5523  }
5524  else
5525  {
5526  /* we've got a full event-header, and it came in one piece */
5527  uchar *ev= (uchar *)cache->read_pos + hdr_offs;
5528  uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
5529  uchar *log_pos= ev + LOG_POS_OFFSET;
5530 
5531  /* fix end_log_pos */
5532  val= uint4korr(log_pos) + group +
5533  (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
5534  int4store(log_pos, val);
5535 
5536  /* fix CRC */
5537  if (do_checksum)
5538  {
5539  /* fix length */
5540  int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN);
5541  remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len,
5542  length, &crc);
5543  if (my_b_write(&log_file, ev,
5544  remains == 0 ? event_len : length - hdr_offs))
5545  DBUG_RETURN(ER_ERROR_ON_WRITE);
5546  if (remains == 0)
5547  {
5548  int4store(buf, crc);
5549  if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
5550  DBUG_RETURN(ER_ERROR_ON_WRITE);
5551  crc= crc_0; // crc is complete
5552  }
5553  }
5554 
5555  /* next event header at ... */
5556  hdr_offs += event_len; // incr by the netto len
5557 
5558  DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length);
5559  }
5560  }
5561 
5562  /*
5563  Adjust hdr_offs. Note that it may still point beyond the segment
5564  read in the next iteration; if the current event is very long,
5565  it may take a couple of read-iterations (and subsequent adjustments
5566  of hdr_offs) for it to point into the then-current segment.
5567  If we have a split header (!carry), hdr_offs will be set at the
5568  beginning of the next iteration, overwriting the value we set here:
5569  */
5570  hdr_offs -= length;
5571  }
5572 
5573  /* Write the entire buf to the binary log file */
5574  if (!do_checksum)
5575  if (my_b_write(&log_file, cache->read_pos, length))
5576  DBUG_RETURN(ER_ERROR_ON_WRITE);
5577  cache->read_pos=cache->read_end; // Mark buffer used up
5578  } while ((length= my_b_fill(cache)));
5579 
5580  DBUG_ASSERT(carry == 0);
5581  DBUG_ASSERT(!do_checksum || remains == 0);
5582  DBUG_ASSERT(!do_checksum || crc == crc_0);
5583 
5584  DBUG_RETURN(0); // All OK
5585 }
5586 
5600  bool do_flush_and_sync)
5601 {
5602  uint error= 0;
5603  DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
5604 
5605  if (!is_open())
5606  DBUG_RETURN(error);
5607 
5608  if (need_lock_log)
5609  mysql_mutex_lock(&LOCK_log);
5610  else
5611  mysql_mutex_assert_owner(&LOCK_log);
5612 
5613  // @todo make this work with the group log. /sven
5614 
5615  error= ev->write(&log_file);
5616 
5617  if (do_flush_and_sync)
5618  {
5619  if (!error && !(error= flush_and_sync()))
5620  {
5621  bool check_purge= false;
5622  signal_update();
5623  error= rotate(true, &check_purge);
5624  if (!error && check_purge)
5625  purge();
5626  }
5627  }
5628 
5629  if (need_lock_log)
5630  mysql_mutex_unlock(&LOCK_log);
5631 
5632  DBUG_RETURN(error);
5633 }
5646 bool MYSQL_BIN_LOG::write_incident(THD *thd, bool need_lock_log,
5647  bool do_flush_and_sync)
5648 {
5649  DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
5650 
5651  if (!is_open())
5652  DBUG_RETURN(0);
5653 
5654  LEX_STRING const write_error_msg=
5655  { C_STRING_WITH_LEN("error writing to the binary log") };
5656  Incident incident= INCIDENT_LOST_EVENTS;
5657  Incident_log_event ev(thd, incident, write_error_msg);
5658 
5659  DBUG_RETURN(write_incident(&ev, need_lock_log, do_flush_and_sync));
5660 }
5661 
5681 {
5682  DBUG_ENTER("MYSQL_BIN_LOG::write_cache(THD *, binlog_cache_data *, bool)");
5683 
5684  IO_CACHE *cache= &cache_data->cache_log;
5685  bool incident= cache_data->has_incident();
5686 
5687  DBUG_EXECUTE_IF("simulate_binlog_flush_error",
5688  {
5689  if (rand() % 3 == 0)
5690  {
5691  write_error=1;
5692  goto err;
5693  }
5694  };);
5695 
5696  mysql_mutex_assert_owner(&LOCK_log);
5697 
5698  DBUG_ASSERT(is_open());
5699  if (likely(is_open())) // Should always be true
5700  {
5701  /*
5702  We only bother to write to the binary log if there is anything
5703  to write.
5704  */
5705  if (my_b_tell(cache) > 0)
5706  {
5707  DBUG_EXECUTE_IF("crash_before_writing_xid",
5708  {
5709  if ((write_error= do_write_cache(cache)))
5710  DBUG_PRINT("info", ("error writing binlog cache: %d",
5711  write_error));
5712  flush_and_sync(true);
5713  DBUG_PRINT("info", ("crashing before writing xid"));
5714  DBUG_SUICIDE();
5715  });
5716 
5717  if ((write_error= do_write_cache(cache)))
5718  goto err;
5719 
5720  if (incident && write_incident(thd, false/*need_lock_log=false*/,
5721  false/*do_flush_and_sync==false*/))
5722  goto err;
5723 
5724  DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_SUICIDE(););
5725  if (cache->error) // Error on read
5726  {
5727  char errbuf[MYSYS_STRERROR_SIZE];
5728  sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name,
5729  errno, my_strerror(errbuf, sizeof(errbuf), errno));
5730  write_error=1; // Don't give more errors
5731  goto err;
5732  }
5733 
5734  global_sid_lock->rdlock();
5735  if (gtid_state->update_on_flush(thd) != RETURN_STATUS_OK)
5736  {
5737  global_sid_lock->unlock();
5738  goto err;
5739  }
5740  global_sid_lock->unlock();
5741  }
5742  update_thd_next_event_pos(thd);
5743  }
5744 
5745  DBUG_RETURN(0);
5746 
5747 err:
5748  if (!write_error)
5749  {
5750  char errbuf[MYSYS_STRERROR_SIZE];
5751  write_error= 1;
5752  sql_print_error(ER(ER_ERROR_ON_WRITE), name,
5753  errno, my_strerror(errbuf, sizeof(errbuf), errno));
5754  }
5755  thd->commit_error= THD::CE_FLUSH_ERROR;
5756 
5757  DBUG_RETURN(1);
5758 }
5759 
5760 
5775 int MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd, const struct timespec *timeout)
5776 {
5777  int ret= 0;
5778  PSI_stage_info old_stage;
5779  DBUG_ENTER("wait_for_update_relay_log");
5780 
5781  thd->ENTER_COND(&update_cond, &LOCK_log,
5782  &stage_slave_has_read_all_relay_log,
5783  &old_stage);
5784 
5785  if (!timeout)
5786  mysql_cond_wait(&update_cond, &LOCK_log);
5787  else
5788  ret= mysql_cond_timedwait(&update_cond, &LOCK_log,
5789  const_cast<struct timespec *>(timeout));
5790  thd->EXIT_COND(&old_stage);
5791 
5792  DBUG_RETURN(ret);
5793 }
5794 
5812  const struct timespec *timeout)
5813 {
5814  int ret= 0;
5815  DBUG_ENTER("wait_for_update_bin_log");
5816 
5817  if (!timeout)
5818  mysql_cond_wait(&update_cond, &LOCK_log);
5819  else
5820  ret= mysql_cond_timedwait(&update_cond, &LOCK_log,
5821  const_cast<struct timespec *>(timeout));
5822  DBUG_RETURN(ret);
5823 }
5824 
5825 
5840 void MYSQL_BIN_LOG::close(uint exiting)
5841 { // One can't set log_type here!
5842  DBUG_ENTER("MYSQL_BIN_LOG::close");
5843  DBUG_PRINT("enter",("exiting: %d", (int) exiting));
5844  if (log_state == LOG_OPENED)
5845  {
5846 #ifdef HAVE_REPLICATION
5847  if ((exiting & LOG_CLOSE_STOP_EVENT) != 0)
5848  {
5849  Stop_log_event s;
5850  // the checksumming rule for relay-log case is similar to Rotate
5851  s.checksum_alg= is_relay_log ?
5852  relay_log_checksum_alg : binlog_checksum_options;
5853  DBUG_ASSERT(!is_relay_log ||
5854  relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
5855  s.write(&log_file);
5856  bytes_written+= s.data_written;
5857  signal_update();
5858  }
5859 #endif /* HAVE_REPLICATION */
5860 
5861  /* don't pwrite in a file opened with O_APPEND - it doesn't work */
5862  if (log_file.type == WRITE_CACHE)
5863  {
5864  my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
5865  my_off_t org_position= mysql_file_tell(log_file.file, MYF(0));
5866  uchar flags= 0; // clearing LOG_EVENT_BINLOG_IN_USE_F
5867  mysql_file_pwrite(log_file.file, &flags, 1, offset, MYF(0));
5868  /*
5869  Restore position so that anything we have in the IO_cache is written
5870  to the correct position.
5871  We need the seek here, as mysql_file_pwrite() is not guaranteed to keep the
5872  original position on system that doesn't support pwrite().
5873  */
5874  mysql_file_seek(log_file.file, org_position, MY_SEEK_SET, MYF(0));
5875  }
5876 
5877  /* this will cleanup IO_CACHE, sync and close the file */
5878  MYSQL_LOG::close(exiting);
5879  }
5880 
5881  /*
5882  The following test is needed even if is_open() is not set, as we may have
5883  called a not complete close earlier and the index file is still open.
5884  */
5885 
5886  if ((exiting & LOG_CLOSE_INDEX) && my_b_inited(&index_file))
5887  {
5888  end_io_cache(&index_file);
5889  if (mysql_file_close(index_file.file, MYF(0)) < 0 && ! write_error)
5890  {
5891  char errbuf[MYSYS_STRERROR_SIZE];
5892  write_error= 1;
5893  sql_print_error(ER(ER_ERROR_ON_WRITE), index_file_name,
5894  errno, my_strerror(errbuf, sizeof(errbuf), errno));
5895  }
5896  }
5897  log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
5898  my_free(name);
5899  name= NULL;
5900  DBUG_VOID_RETURN;
5901 }
5902 
5903 
5904 void MYSQL_BIN_LOG::set_max_size(ulong max_size_arg)
5905 {
5906  /*
5907  We need to take locks, otherwise this may happen:
5908  new_file() is called, calls open(old_max_size), then before open() starts,
5909  set_max_size() sets max_size to max_size_arg, then open() starts and
5910  uses the old_max_size argument, so max_size_arg has been overwritten and
5911  it's like if the SET command was never run.
5912  */
5913  DBUG_ENTER("MYSQL_BIN_LOG::set_max_size");
5914  mysql_mutex_lock(&LOCK_log);
5915  if (is_open())
5916  max_size= max_size_arg;
5917  mysql_mutex_unlock(&LOCK_log);
5918  DBUG_VOID_RETURN;
5919 }
5920 
5921 
5922 void MYSQL_BIN_LOG::signal_update()
5923 {
5924  DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
5925  signal_cnt++;
5926  mysql_cond_broadcast(&update_cond);
5927  DBUG_VOID_RETURN;
5928 }
5929 
5930 /****** transaction coordinator log for 2pc - binlog() based solution ******/
5931 
5940 int MYSQL_BIN_LOG::open_binlog(const char *opt_name)
5941 {
5942  LOG_INFO log_info;
5943  int error= 1;
5944 
5945  /*
5946  This function is used for 2pc transaction coordination. Hence, it
5947  is never used for relay logs.
5948  */
5949  DBUG_ASSERT(!is_relay_log);
5950  DBUG_ASSERT(total_ha_2pc > 1 || (1 == total_ha_2pc && opt_bin_log));
5951  DBUG_ASSERT(opt_name && opt_name[0]);
5952 
5953  if (!my_b_inited(&index_file))
5954  {
5955  /* There was a failure to open the index file, can't open the binlog */
5956  cleanup();
5957  return 1;
5958  }
5959 
5961  {
5962  /* generate a new binlog to mask a corrupted one */
5963  open_binlog(opt_name, 0, WRITE_CACHE, max_binlog_size, false,
5964  true/*need_lock_index=true*/,
5965  true/*need_sid_lock=true*/,
5966  NULL);
5967  cleanup();
5968  return 1;
5969  }
5970 
5971  if ((error= find_log_pos(&log_info, NullS, true/*need_lock_index=true*/)))
5972  {
5973  if (error != LOG_INFO_EOF)
5974  sql_print_error("find_log_pos() failed (error: %d)", error);
5975  else
5976  error= 0;
5977  goto err;
5978  }
5979 
5980  {
5981  const char *errmsg;
5982  IO_CACHE log;
5983  File file;
5984  Log_event *ev=0;
5985  Format_description_log_event fdle(BINLOG_VERSION);
5986  char log_name[FN_REFLEN];
5987  my_off_t valid_pos= 0;
5988  my_off_t binlog_size;
5989  MY_STAT s;
5990 
5991  if (! fdle.is_valid())
5992  goto err;
5993 
5994  do
5995  {
5996  strmake(log_name, log_info.log_file_name, sizeof(log_name)-1);
5997  } while (!(error= find_next_log(&log_info, true/*need_lock_index=true*/)));
5998 
5999  if (error != LOG_INFO_EOF)
6000  {
6001  sql_print_error("find_log_pos() failed (error: %d)", error);
6002  goto err;
6003  }
6004 
6005  if ((file= open_binlog_file(&log, log_name, &errmsg)) < 0)
6006  {
6007  sql_print_error("%s", errmsg);
6008  goto err;
6009  }
6010 
6011  my_stat(log_name, &s, MYF(0));
6012  binlog_size= s.st_size;
6013 
6014  if ((ev= Log_event::read_log_event(&log, 0, &fdle,
6015  opt_master_verify_checksum)) &&
6016  ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
6017  ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
6018  {
6019  sql_print_information("Recovering after a crash using %s", opt_name);
6020  valid_pos= my_b_tell(&log);
6021  error= recover(&log, (Format_description_log_event *)ev, &valid_pos);
6022  }
6023  else
6024  error=0;
6025 
6026  delete ev;
6027  end_io_cache(&log);
6028  mysql_file_close(file, MYF(MY_WME));
6029 
6030  if (error)
6031  goto err;
6032 
6033  /* Trim the crashed binlog file to last valid transaction
6034  or event (non-transaction) base on valid_pos. */
6035  if (valid_pos > 0)
6036  {
6037  if ((file= mysql_file_open(key_file_binlog, log_name,
6038  O_RDWR | O_BINARY, MYF(MY_WME))) < 0)
6039  {
6040  sql_print_error("Failed to open the crashed binlog file "
6041  "when master server is recovering it.");
6042  return -1;
6043  }
6044 
6045  /* Change binlog file size to valid_pos */
6046  if (valid_pos < binlog_size)
6047  {
6048  if (my_chsize(file, valid_pos, 0, MYF(MY_WME)))
6049  {
6050  sql_print_error("Failed to trim the crashed binlog file "
6051  "when master server is recovering it.");
6052  mysql_file_close(file, MYF(MY_WME));
6053  return -1;
6054  }
6055  else
6056  {
6057  sql_print_information("Crashed binlog file %s size is %llu, "
6058  "but recovered up to %llu. Binlog trimmed to %llu bytes.",
6059  log_name, binlog_size, valid_pos, valid_pos);
6060  }
6061  }
6062 
6063  /* Clear LOG_EVENT_BINLOG_IN_USE_F */
6064  my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
6065  uchar flags= 0;
6066  if (mysql_file_pwrite(file, &flags, 1, offset, MYF(0)) != 1)
6067  {
6068  sql_print_error("Failed to clear LOG_EVENT_BINLOG_IN_USE_F "
6069  "for the crashed binlog file when master "
6070  "server is recovering it.");
6071  mysql_file_close(file, MYF(MY_WME));
6072  return -1;
6073  }
6074 
6075  mysql_file_close(file, MYF(MY_WME));
6076  } //end if
6077  }
6078 
6079 err:
6080  return error;
6081 }
6082 
6085 {
6086 }
6087 
6088 /*
6089  Prepare the transaction in the transaction coordinator.
6090 
6091  This function will prepare the transaction in the storage engines
6092  (by calling @c ha_prepare_low) what will write a prepare record
6093  to the log buffers.
6094 
6095  @retval 0 success
6096  @retval 1 error
6097 */
6098 int MYSQL_BIN_LOG::prepare(THD *thd, bool all)
6099 {
6100  DBUG_ENTER("MYSQL_BIN_LOG::prepare");
6101 
6102  int error= ha_prepare_low(thd, all);
6103 
6104  DBUG_RETURN(error);
6105 }
6106 
6133 TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all)
6134 {
6135  DBUG_ENTER("MYSQL_BIN_LOG::commit");
6136 
6137  binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
6138  my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
6139  int error= RESULT_SUCCESS;
6140  bool stuff_logged= false;
6141 
6142  DBUG_PRINT("enter", ("thd: 0x%llx, all: %s, xid: %llu, cache_mngr: 0x%llx",
6143  (ulonglong) thd, YESNO(all), (ulonglong) xid,
6144  (ulonglong) cache_mngr));
6145 
6146  /*
6147  No cache manager means nothing to log, but we still have to commit
6148  the transaction.
6149  */
6150  if (cache_mngr == NULL)
6151  {
6152  if (ha_commit_low(thd, all))
6153  DBUG_RETURN(RESULT_ABORTED);
6154  DBUG_RETURN(RESULT_SUCCESS);
6155  }
6156 
6157  THD_TRANS *trans= all ? &thd->transaction.all : &thd->transaction.stmt;
6158 
6159  DBUG_PRINT("debug", ("in_transaction: %s, no_2pc: %s, rw_ha_count: %d",
6160  YESNO(thd->in_multi_stmt_transaction_mode()),
6161  YESNO(trans->no_2pc),
6162  trans->rw_ha_count));
6163  DBUG_PRINT("debug",
6164  ("all.cannot_safely_rollback(): %s, trx_cache_empty: %s",
6165  YESNO(thd->transaction.all.cannot_safely_rollback()),
6166  YESNO(cache_mngr->trx_cache.is_binlog_empty())));
6167  DBUG_PRINT("debug",
6168  ("stmt.cannot_safely_rollback(): %s, stmt_cache_empty: %s",
6169  YESNO(thd->transaction.stmt.cannot_safely_rollback()),
6170  YESNO(cache_mngr->stmt_cache.is_binlog_empty())));
6171 
6172 
6173  /*
6174  If there are no handlertons registered, there is nothing to
6175  commit. Note that DDLs are written earlier in this case (inside
6176  binlog_query).
6177 
6178  TODO: This can be a problem in those cases that there are no
6179  handlertons registered. DDLs are one example, but the other case
6180  is MyISAM. In this case, we could register a dummy handlerton to
6181  trigger the commit.
6182 
6183  Any statement that requires logging will call binlog_query before
6184  trans_commit_stmt, so an alternative is to use the condition
6185  "binlog_query called or stmt.ha_list != 0".
6186  */
6187  if (!all && trans->ha_list == 0 &&
6188  cache_mngr->stmt_cache.is_binlog_empty())
6189  DBUG_RETURN(RESULT_SUCCESS);
6190 
6191  /*
6192  If there is anything in the stmt cache, and GTIDs are enabled,
6193  then this is a single statement outside a transaction and it is
6194  impossible that there is anything in the trx cache. Hence, we
6195  write any empty group(s) to the stmt cache.
6196 
6197  Otherwise, we write any empty group(s) to the trx cache at the end
6198  of the transaction.
6199  */
6200  if (!cache_mngr->stmt_cache.is_binlog_empty())
6201  {
6202  error= write_empty_groups_to_cache(thd, &cache_mngr->stmt_cache);
6203  if (error == 0)
6204  {
6205  if (cache_mngr->stmt_cache.finalize(thd))
6206  DBUG_RETURN(RESULT_ABORTED);
6207  stuff_logged= true;
6208  }
6209  }
6210 
6211  /*
6212  We commit the transaction if:
6213  - We are not in a transaction and committing a statement, or
6214  - We are in a transaction and a full transaction is committed.
6215  Otherwise, we accumulate the changes.
6216  */
6217  if (!error && !cache_mngr->trx_cache.is_binlog_empty() &&
6218  ending_trans(thd, all))
6219  {
6220  const bool real_trans= (all || thd->transaction.all.ha_list == 0);
6221  /*
6222  We are committing an XA transaction if it is a "real" transaction
6223  and have an XID assigned (because some handlerton registered). A
6224  transaction is "real" if either 'all' is true or the 'all.ha_list'
6225  is empty.
6226 
6227  Note: This is kind of strange since registering the binlog
6228  handlerton will then make the transaction XA, which is not really
6229  true. This occurs for example if a MyISAM statement is executed
6230  with row-based replication on.
6231  */
6232  if (real_trans && xid && trans->rw_ha_count > 1 && !trans->no_2pc)
6233  {
6234  Xid_log_event end_evt(thd, xid);
6235  if (cache_mngr->trx_cache.finalize(thd, &end_evt))
6236  DBUG_RETURN(RESULT_ABORTED);
6237  }
6238  else
6239  {
6240  Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
6241  true, FALSE, TRUE, 0, TRUE);
6242  if (cache_mngr->trx_cache.finalize(thd, &end_evt))
6243  DBUG_RETURN(RESULT_ABORTED);
6244  }
6245  stuff_logged= true;
6246  }
6247 
6248  /*
6249  This is part of the stmt rollback.
6250  */
6251  if (!all)
6252  cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
6253 
6254  DBUG_PRINT("debug", ("error: %d", error));
6255 
6256  if (error)
6257  DBUG_RETURN(RESULT_ABORTED);
6258 
6259  /*
6260  Now all the events are written to the caches, so we will commit
6261  the transaction in the engines. This is done using the group
6262  commit logic in ordered_commit, which will return when the
6263  transaction is committed.
6264 
6265  If the commit in the engines fail, we still have something logged
6266  to the binary log so we have to report this as a "bad" failure
6267  (failed to commit, but logged something).
6268  */
6269  if (stuff_logged)
6270  {
6271  if (ordered_commit(thd, all))
6272  DBUG_RETURN(RESULT_INCONSISTENT);
6273  }
6274  else
6275  {
6276  if (ha_commit_low(thd, all))
6277  DBUG_RETURN(RESULT_INCONSISTENT);
6278  }
6279 
6280  DBUG_RETURN(error ? RESULT_INCONSISTENT : RESULT_SUCCESS);
6281 }
6282 
6283 
6298 std::pair<int,my_off_t>
6299 MYSQL_BIN_LOG::flush_thread_caches(THD *thd)
6300 {
6301  binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
6302  my_off_t bytes= 0;
6303  bool wrote_xid= false;
6304  int error= cache_mngr->flush(thd, &bytes, &wrote_xid);
6305  if (!error && bytes > 0)
6306  {
6307  /*
6308  Note that set_trans_pos does not copy the file name. See
6309  this function documentation for more info.
6310  */
6311  thd->set_trans_pos(log_file_name, my_b_tell(&log_file));
6312  if (wrote_xid)
6313  inc_prep_xids(thd);
6314  }
6315  DBUG_PRINT("debug", ("bytes: %llu", bytes));
6316  return std::make_pair(error, bytes);
6317 }
6318 
6319 
6333 int
6334 MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
6335  bool *rotate_var,
6336  THD **out_queue_var)
6337 {
6338  DBUG_ASSERT(total_bytes_var && rotate_var && out_queue_var);
6339  my_off_t total_bytes= 0;
6340  int flush_error= 1;
6341  mysql_mutex_assert_owner(&LOCK_log);
6342 
6343  my_atomic_rwlock_rdlock(&opt_binlog_max_flush_queue_time_lock);
6344  const ulonglong max_udelay= my_atomic_load32(&opt_binlog_max_flush_queue_time);
6345  my_atomic_rwlock_rdunlock(&opt_binlog_max_flush_queue_time_lock);
6346  const ulonglong start_utime= max_udelay > 0 ? my_micro_time() : 0;
6347 
6348  /*
6349  First we read the queue until it either is empty or the difference
6350  between the time we started and the current time is too large.
6351 
6352  We remember the first thread we unqueued, because this will be the
6353  beginning of the out queue.
6354  */
6355  bool has_more= true;
6356  THD *first_seen= NULL;
6357  while ((max_udelay == 0 || my_micro_time() < start_utime + max_udelay) && has_more)
6358  {
6359  std::pair<bool,THD*> current= stage_manager.pop_front(Stage_manager::FLUSH_STAGE);
6360  std::pair<int,my_off_t> result= flush_thread_caches(current.second);
6361  has_more= current.first;
6362  total_bytes+= result.second;
6363  if (flush_error == 1)
6364  flush_error= result.first;
6365  if (first_seen == NULL)
6366  first_seen= current.second;
6367  }
6368 
6369  /*
6370  Either the queue is empty, or we ran out of time. If we ran out of
6371  time, we have to fetch the entire queue (and flush it) since
6372  otherwise the next batch will not have a leader.
6373  */
6374  if (has_more)
6375  {
6376  THD *queue= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE);
6377  for (THD *head= queue ; head ; head = head->next_to_commit)
6378  {
6379  std::pair<int,my_off_t> result= flush_thread_caches(head);
6380  total_bytes+= result.second;
6381  if (flush_error == 1)
6382  flush_error= result.first;
6383  }
6384  if (first_seen == NULL)
6385  first_seen= queue;
6386  }
6387 
6388  *out_queue_var= first_seen;
6389  *total_bytes_var= total_bytes;
6390  if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t) max_size)
6391  *rotate_var= true;
6392  return flush_error;
6393 }
6394 
6395 
6410 void
6411 MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first)
6412 {
6413  mysql_mutex_assert_owner(&LOCK_commit);
6414  Thread_excursion excursion(thd);
6415 #ifndef DBUG_OFF
6416  thd->transaction.flags.ready_preempt= 1; // formality by the leader
6417 #endif
6418  for (THD *head= first ; head ; head = head->next_to_commit)
6419  {
6420  DBUG_PRINT("debug", ("Thread ID: %lu, commit_error: %d, flags.pending: %s",
6421  head->thread_id, head->commit_error,
6422  YESNO(head->transaction.flags.pending)));
6423  /*
6424  If flushing failed, set commit_error for the session, skip the
6425  transaction and proceed with the next transaction instead. This
6426  will mark all threads as failed, since the flush failed.
6427 
6428  If flush succeeded, attach to the session and commit it in the
6429  engines.
6430  */
6431 #ifndef DBUG_OFF
6432  stage_manager.clear_preempt_status(head);
6433 #endif
6434  if (head->commit_error == THD::CE_NONE)
6435  {
6436  excursion.try_to_attach_to(head);
6437  bool all= head->transaction.flags.real_commit;
6438  if (head->transaction.flags.commit_low)
6439  {
6440  /* head is parked to have exited append() */
6441  DBUG_ASSERT(head->transaction.flags.ready_preempt);
6442  /*
6443  storage engine commit
6444  */
6445  if (ha_commit_low(head, all, false))
6446  head->commit_error= THD::CE_COMMIT_ERROR;
6447  }
6448  DBUG_PRINT("debug", ("commit_error: %d, flags.pending: %s",
6449  head->commit_error,
6450  YESNO(head->transaction.flags.pending)));
6451  }
6452  /*
6453  Decrement the prepared XID counter after storage engine commit.
6454  We also need decrement the prepared XID when encountering a
6455  flush error or session attach error for avoiding 3-way deadlock
6456  among user thread, rotate thread and dump thread.
6457  */
6458  if (head->transaction.flags.xid_written)
6459  dec_prep_xids(head);
6460  }
6461 }
6462 
6470 void
6471 MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first)
6472 {
6473  Thread_excursion excursion(thd);
6474  for (THD *head= first; head; head= head->next_to_commit)
6475  {
6476  if (head->transaction.flags.run_hooks &&
6477  head->commit_error == THD::CE_NONE)
6478  {
6479  excursion.try_to_attach_to(head);
6480  bool all= head->transaction.flags.real_commit;
6481  (void) RUN_HOOK(transaction, after_commit, (head, all));
6482  /*
6483  When after_commit finished for the transaction, clear the run_hooks flag.
6484  This allow other parts of the system to check if after_commit was called.
6485  */
6486  head->transaction.flags.run_hooks= false;
6487  }
6488  }
6489 }
6490 
6491 #ifndef DBUG_OFF
6492 
6493 static const char* g_stage_name[] = {
6494  "FLUSH",
6495  "SYNC",
6496  "COMMIT",
6497 };
6498 #endif
6499 
6500 
6529 bool
6530 MYSQL_BIN_LOG::change_stage(THD *thd,
6531  Stage_manager::StageID stage, THD *queue,
6532  mysql_mutex_t *leave_mutex,
6533  mysql_mutex_t *enter_mutex)
6534 {
6535  DBUG_ENTER("MYSQL_BIN_LOG::change_stage");
6536  DBUG_PRINT("enter", ("thd: 0x%llx, stage: %s, queue: 0x%llx",
6537  (ulonglong) thd, g_stage_name[stage], (ulonglong) queue));
6538  DBUG_ASSERT(0 <= stage && stage < Stage_manager::STAGE_COUNTER);
6539  DBUG_ASSERT(enter_mutex);
6540  DBUG_ASSERT(queue);
6541  /*
6542  enroll_for will release the leave_mutex once the sessions are
6543  queued.
6544  */
6545  if (!stage_manager.enroll_for(stage, queue, leave_mutex))
6546  {
6547  DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized());
6548  DBUG_RETURN(true);
6549  }
6550  mysql_mutex_lock(enter_mutex);
6551  DBUG_RETURN(false);
6552 }
6553 
6554 
6555 
6564 int
6565 MYSQL_BIN_LOG::flush_cache_to_file(my_off_t *end_pos_var)
6566 {
6567  if (flush_io_cache(&log_file))
6568  return ER_ERROR_ON_WRITE;
6569  *end_pos_var= my_b_tell(&log_file);
6570  return 0;
6571 }
6572 
6573 
6577 std::pair<bool, bool>
6578 MYSQL_BIN_LOG::sync_binlog_file(bool force)
6579 {
6580  bool synced= false;
6581  unsigned int sync_period= get_sync_period();
6582  if (force || (sync_period && ++sync_counter >= sync_period))
6583  {
6584  sync_counter= 0;
6585  if (mysql_file_sync(log_file.file, MYF(MY_WME)))
6586  return std::make_pair(true, synced);
6587  synced= true;
6588  }
6589  return std::make_pair(false, synced);
6590 }
6591 
6592 
6613 int
6614 MYSQL_BIN_LOG::finish_commit(THD *thd)
6615 {
6616  if (thd->transaction.flags.commit_low)
6617  {
6618  const bool all= thd->transaction.flags.real_commit;
6619  /*
6620  storage engine commit
6621  */
6622  if (thd->commit_error == THD::CE_NONE &&
6623  ha_commit_low(thd, all, false))
6624  thd->commit_error= THD::CE_COMMIT_ERROR;
6625  /*
6626  Decrement the prepared XID counter after storage engine commit
6627  */
6628  if (thd->transaction.flags.xid_written)
6629  dec_prep_xids(thd);
6630  /*
6631  If commit succeeded, we call the after_commit hook
6632  */
6633  if (thd->commit_error == THD::CE_NONE)
6634  {
6635  (void) RUN_HOOK(transaction, after_commit, (thd, all));
6636  thd->transaction.flags.run_hooks= false;
6637  }
6638  }
6639  else if (thd->transaction.flags.xid_written)
6640  dec_prep_xids(thd);
6641 
6642  /*
6643  Remove committed GTID from owned_gtids, it was already logged on
6644  MYSQL_BIN_LOG::write_cache().
6645  */
6646  global_sid_lock->rdlock();
6647  gtid_state->update_on_commit(thd);
6648  global_sid_lock->unlock();
6649 
6650  DBUG_ASSERT(thd->commit_error || !thd->transaction.flags.run_hooks);
6651  DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized());
6652  DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
6653  thd->thread_id, thd->commit_error));
6654  return thd->commit_error;
6655 }
6656 
6657 
6707 int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
6708 {
6709  DBUG_ENTER("MYSQL_BIN_LOG::ordered_commit");
6710  int flush_error= 0;
6711  my_off_t total_bytes= 0;
6712  bool do_rotate= false;
6713 
6714  /*
6715  These values are used while flushing a transaction, so clear
6716  everything.
6717 
6718  Notes:
6719 
6720  - It would be good if we could keep transaction coordinator
6721  log-specific data out of the THD structure, but that is not the
6722  case right now.
6723 
6724  - Everything in the transaction structure is reset when calling
6725  ha_commit_low since that calls st_transaction::cleanup.
6726  */
6727  thd->transaction.flags.pending= true;
6728  thd->commit_error= THD::CE_NONE;
6729  thd->next_to_commit= NULL;
6730  thd->durability_property= HA_IGNORE_DURABILITY;
6731  thd->transaction.flags.real_commit= all;
6732  thd->transaction.flags.xid_written= false;
6733  thd->transaction.flags.commit_low= !skip_commit;
6734  thd->transaction.flags.run_hooks= !skip_commit;
6735 #ifndef DBUG_OFF
6736  /*
6737  The group commit Leader may have to wait for follower whose transaction
6738  is not ready to be preempted. Initially the status is pessimistic.
6739  Preemption guarding logics is necessary only when DBUG_ON is set.
6740  It won't be required for the dbug-off case as long as the follower won't
6741  execute any thread-specific write access code in this method, which is
6742  the case as of current.
6743  */
6744  thd->transaction.flags.ready_preempt= 0;
6745 #endif
6746 
6747  DBUG_PRINT("enter", ("flags.pending: %s, commit_error: %d, thread_id: %lu",
6748  YESNO(thd->transaction.flags.pending),
6749  thd->commit_error, thd->thread_id));
6750 
6751  /*
6752  Stage #1: flushing transactions to binary log
6753 
6754  While flushing, we allow new threads to enter and will process
6755  them in due time. Once the queue was empty, we cannot reap
6756  anything more since it is possible that a thread entered and
6757  appointed itself leader for the flush phase.
6758  */
6759  if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
6760  {
6761  DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
6762  thd->thread_id, thd->commit_error));
6763  DBUG_RETURN(finish_commit(thd));
6764  }
6765 
6766  THD *wait_queue= NULL;
6767  flush_error= process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);
6768 
6769  my_off_t flush_end_pos= 0;
6770  if (flush_error == 0 && total_bytes > 0)
6771  flush_error= flush_cache_to_file(&flush_end_pos);
6772 
6773  /*
6774  If the flush finished successfully, we can call the after_flush
6775  hook. Being invoked here, we have the guarantee that the hook is
6776  executed before the before/after_send_hooks on the dump thread
6777  preventing race conditions among these plug-ins.
6778  */
6779  if (flush_error == 0)
6780  {
6781  const char *file_name_ptr= log_file_name + dirname_length(log_file_name);
6782  DBUG_ASSERT(flush_end_pos != 0);
6783  if (RUN_HOOK(binlog_storage, after_flush,
6784  (thd, file_name_ptr, flush_end_pos)))
6785  {
6786  sql_print_error("Failed to run 'after_flush' hooks");
6787  flush_error= ER_ERROR_ON_WRITE;
6788  }
6789 
6790  signal_update();
6791  DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
6792  }
6793 
6794  /*
6795  Stage #2: Syncing binary log file to disk
6796  */
6797  if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync))
6798  {
6799  DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
6800  thd->thread_id, thd->commit_error));
6801  DBUG_RETURN(finish_commit(thd));
6802  }
6803  THD *final_queue= stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE);
6804  if (flush_error == 0 && total_bytes > 0)
6805  {
6806  std::pair<bool, bool> result= sync_binlog_file(false);
6807  flush_error= result.first;
6808  }
6809 
6810  /*
6811  Stage #3: Commit all transactions in order.
6812 
6813  This stage is skipped if we do not need to order the commits and
6814  each thread have to execute the handlerton commit instead.
6815 
6816  Howver, since we are keeping the lock from the previous stage, we
6817  need to unlock it if we skip the stage.
6818  */
6819  if (opt_binlog_order_commits)
6820  {
6821  if (change_stage(thd, Stage_manager::COMMIT_STAGE,
6822  final_queue, &LOCK_sync, &LOCK_commit))
6823  {
6824  DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
6825  thd->thread_id, thd->commit_error));
6826  DBUG_RETURN(finish_commit(thd));
6827  }
6828  THD *commit_queue= stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE);
6829  DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
6830  DEBUG_SYNC(thd, "before_process_commit_stage_queue"););
6831  process_commit_stage_queue(thd, commit_queue);
6832  mysql_mutex_unlock(&LOCK_commit);
6833  /*
6834  Process after_commit after LOCK_commit is released for avoiding
6835  3-way deadlock among user thread, rotate thread and dump thread.
6836  */
6837  process_after_commit_stage_queue(thd, commit_queue);
6838  final_queue= commit_queue;
6839  }
6840  else
6841  mysql_mutex_unlock(&LOCK_sync);
6842 
6843  /* Commit done so signal all waiting threads */
6844  stage_manager.signal_done(final_queue);
6845 
6846  /*
6847  Finish the commit before executing a rotate, or run the risk of a
6848  deadlock. We don't need the return value here since it is in
6849  thd->commit_error, which is returned below.
6850  */
6851  (void) finish_commit(thd);
6852 
6853  /*
6854  If we need to rotate, we do it without commit error.
6855  Otherwise the thd->commit_error will be possibly reset.
6856  */
6857  if (do_rotate && thd->commit_error == THD::CE_NONE)
6858  {
6859  /*
6860  Do not force the rotate as several consecutive groups may
6861  request unnecessary rotations.
6862 
6863  NOTE: Run purge_logs wo/ holding LOCK_log because it does not
6864  need the mutex. Otherwise causes various deadlocks.
6865  */
6866 
6867  DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
6868  DEBUG_SYNC(thd, "ready_to_do_rotation");
6869  bool check_purge= false;
6870  mysql_mutex_lock(&LOCK_log);
6871  int error= rotate(false, &check_purge);
6872  mysql_mutex_unlock(&LOCK_log);
6873 
6874  if (error)
6875  thd->commit_error= THD::CE_COMMIT_ERROR;
6876  else if (check_purge)
6877  purge();
6878  }
6879  DBUG_RETURN(thd->commit_error);
6880 }
6881 
6882 
6897  my_off_t *valid_pos)
6898 {
6899  Log_event *ev;
6900  HASH xids;
6901  MEM_ROOT mem_root;
6902  /*
6903  The flag is used for handling the case that a transaction
6904  is partially written to the binlog.
6905  */
6906  bool in_transaction= FALSE;
6907 
6908  if (! fdle->is_valid() ||
6909  my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
6910  sizeof(my_xid), 0, 0, MYF(0)))
6911  goto err1;
6912 
6913  init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE);
6914 
6915  while ((ev= Log_event::read_log_event(log, 0, fdle, TRUE))
6916  && ev->is_valid())
6917  {
6918  if (ev->get_type_code() == QUERY_EVENT &&
6919  !strcmp(((Query_log_event*)ev)->query, "BEGIN"))
6920  in_transaction= TRUE;
6921 
6922  if (ev->get_type_code() == QUERY_EVENT &&
6923  !strcmp(((Query_log_event*)ev)->query, "COMMIT"))
6924  {
6925  DBUG_ASSERT(in_transaction == TRUE);
6926  in_transaction= FALSE;
6927  }
6928  else if (ev->get_type_code() == XID_EVENT)
6929  {
6930  DBUG_ASSERT(in_transaction == TRUE);
6931  in_transaction= FALSE;
6932  Xid_log_event *xev=(Xid_log_event *)ev;
6933  uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid,
6934  sizeof(xev->xid));
6935  if (!x || my_hash_insert(&xids, x))
6936  goto err2;
6937  }
6938 
6939  /*
6940  Recorded valid position for the crashed binlog file
6941  which did not contain incorrect events. The following
6942  positions increase the variable valid_pos:
6943 
6944  1 -
6945  ...
6946  <---> HERE IS VALID <--->
6947  GTID
6948  BEGIN
6949  ...
6950  COMMIT
6951  ...
6952 
6953  2 -
6954  ...
6955  <---> HERE IS VALID <--->
6956  GTID
6957  DDL/UTILITY
6958  ...
6959 
6960  In other words, the following positions do not increase
6961  the variable valid_pos:
6962 
6963  1 -
6964  GTID
6965  <---> HERE IS VALID <--->
6966  ...
6967 
6968  2 -
6969  GTID
6970  BEGIN
6971  <---> HERE IS VALID <--->
6972  ...
6973  */
6974  if (!log->error && !in_transaction &&
6975  !is_gtid_event(ev))
6976  *valid_pos= my_b_tell(log);
6977 
6978  delete ev;
6979  }
6980 
6981  if (ha_recover(&xids))
6982  goto err2;
6983 
6984  free_root(&mem_root, MYF(0));
6985  my_hash_free(&xids);
6986  return 0;
6987 
6988 err2:
6989  free_root(&mem_root, MYF(0));
6990  my_hash_free(&xids);
6991 err1:
6992  sql_print_error("Crash recovery failed. Either correct the problem "
6993  "(if it's, for example, out of memory error) and restart, "
6994  "or delete (or rename) binary log and start mysqld with "
6995  "--tc-heuristic-recover={commit|rollback}");
6996  return 1;
6997 }
6998 
6999 Group_cache *THD::get_group_cache(bool is_transactional)
7000 {
7001  DBUG_ENTER("THD::get_group_cache(bool)");
7002 
7003  // If opt_bin_log==0, it is not safe to call thd_get_cache_mngr
7004  // because binlog_hton has not been completely set up.
7005  DBUG_ASSERT(opt_bin_log);
7006  binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(this);
7007 
7008  // cache_mngr is NULL until we call thd->binlog_setup_trx_data, so
7009  // we assert that this has been done.
7010  DBUG_ASSERT(cache_mngr != NULL);
7011 
7012  binlog_cache_data *cache_data=
7013  cache_mngr->get_binlog_cache_data(is_transactional);
7014  DBUG_ASSERT(cache_data != NULL);
7015 
7016  DBUG_RETURN(&cache_data->group_cache);
7017 }
7018 
7019 /*
7020  These functions are placed in this file since they need access to
7021  binlog_hton, which has internal linkage.
7022 */
7023 
7024 int THD::binlog_setup_trx_data()
7025 {
7026  DBUG_ENTER("THD::binlog_setup_trx_data");
7027  binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(this);
7028 
7029  if (cache_mngr)
7030  DBUG_RETURN(0); // Already set up
7031 
7032  cache_mngr= (binlog_cache_mngr*) my_malloc(sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL));
7033  if (!cache_mngr ||
7034  open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
7035  LOG_PREFIX, binlog_stmt_cache_size, MYF(MY_WME)) ||
7036  open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir,
7037  LOG_PREFIX, binlog_cache_size, MYF(MY_WME)))
7038  {
7039  my_free(cache_mngr);
7040  DBUG_RETURN(1); // Didn't manage to set it up
7041  }
7042  DBUG_PRINT("debug", ("Set ha_data slot %d to 0x%llx", binlog_hton->slot, (ulonglong) cache_mngr));
7043  thd_set_ha_data(this, binlog_hton, cache_mngr);
7044 
7045  cache_mngr= new (thd_get_cache_mngr(this))
7046  binlog_cache_mngr(max_binlog_stmt_cache_size,
7047  &binlog_stmt_cache_use,
7048  &binlog_stmt_cache_disk_use,
7049  max_binlog_cache_size,
7050  &binlog_cache_use,
7051  &binlog_cache_disk_use);
7052  DBUG_RETURN(0);
7053 }
7054 
7058 void register_binlog_handler(THD *thd, bool trx)
7059 {
7060  DBUG_ENTER("register_binlog_handler");
7061  /*
7062  If this is the first call to this function while processing a statement,
7063  the transactional cache does not have a savepoint defined. So, in what
7064  follows:
7065  . an implicit savepoint is defined;
7066  . callbacks are registered;
7067  . binary log is set as read/write.
7068 
7069  The savepoint allows for truncating the trx-cache transactional changes
7070  fail. Callbacks are necessary to flush caches upon committing or rolling
7071  back a statement or a transaction. However, notifications do not happen
7072  if the binary log is set as read/write.
7073  */
7074  binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
7075  if (cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF)
7076  {
7077  /*
7078  Set an implicit savepoint in order to be able to truncate a trx-cache.
7079  */
7080  my_off_t pos= 0;
7081  binlog_trans_log_savepos(thd, &pos);
7082  cache_mngr->trx_cache.set_prev_position(pos);
7083 
7084  /*
7085  Set callbacks in order to be able to call commmit or rollback.
7086  */
7087  if (trx)
7088  trans_register_ha(thd, TRUE, binlog_hton);
7089  trans_register_ha(thd, FALSE, binlog_hton);
7090 
7091  /*
7092  Set the binary log as read/write otherwise callbacks are not called.
7093  */
7094  thd->ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write();
7095  }
7096  DBUG_VOID_RETURN;
7097 }
7098 
7127 static int binlog_start_trans_and_stmt(THD *thd, Log_event *start_event)
7128 {
7129  DBUG_ENTER("binlog_start_trans_and_stmt");
7130 
7131  /*
7132  Initialize the cache manager if this was not done yet.
7133  */
7134  if (thd->binlog_setup_trx_data())
7135  DBUG_RETURN(1);
7136 
7137  /*
7138  Retrieve the appropriated cache.
7139  */
7140  bool is_transactional= start_event->is_using_trans_cache();
7141  binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
7142  binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(is_transactional);
7143 
7144  /*
7145  If the event is requesting immediatly logging, there is no need to go
7146  further down and set savepoint and register callbacks.
7147  */
7148  if (start_event->is_using_immediate_logging())
7149  DBUG_RETURN(0);
7150 
7151  register_binlog_handler(thd, thd->in_multi_stmt_transaction_mode());
7152 
7153  /*
7154  If the cache is empty log "BEGIN" at the beginning of every transaction.
7155  Here, a transaction is either a BEGIN..COMMIT/ROLLBACK block or a single
7156  statement in autocommit mode.
7157  */
7158  if (cache_data->is_binlog_empty())
7159  {
7160  Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"),
7161  is_transactional, FALSE, TRUE, 0, TRUE);
7162  if (cache_data->write_event(thd, &qinfo))
7163  DBUG_RETURN(1);
7164  }
7165 
7166  DBUG_RETURN(0);
7167 }
7168 
7188 int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
7189  bool binlog_rows_query)
7190 {
7191  int error;
7192  DBUG_ENTER("THD::binlog_write_table_map");
7193  DBUG_PRINT("enter", ("table: 0x%lx (%s: #%llu)",
7194  (long) table, table->s->table_name.str,
7195  table->s->table_map_id.id()));
7196 
7197  /* Pre-conditions */
7198  DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
7199  DBUG_ASSERT(table->s->table_map_id.is_valid());
7200 
7202  the_event(this, table, table->s->table_map_id, is_transactional);
7203 
7204  binlog_start_trans_and_stmt(this, &the_event);
7205 
7206  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(this);
7207 
7208  binlog_cache_data *cache_data=
7209  cache_mngr->get_binlog_cache_data(is_transactional);
7210 
7211  if (binlog_rows_query && this->query())
7212  {
7213  /* Write the Rows_query_log_event into binlog before the table map */
7215  rows_query_ev(this, this->query(), this->query_length());
7216  if ((error= cache_data->write_event(this, &rows_query_ev)))
7217  DBUG_RETURN(error);
7218  }
7219 
7220  if ((error= cache_data->write_event(this, &the_event)))
7221  DBUG_RETURN(error);
7222 
7223  binlog_table_maps++;
7224  DBUG_RETURN(0);
7225 }
7226 
7239 THD::binlog_get_pending_rows_event(bool is_transactional) const
7240 {
7241  Rows_log_event* rows= NULL;
7242  binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(this);
7243 
7244  /*
7245  This is less than ideal, but here's the story: If there is no cache_mngr,
7246  prepare_pending_rows_event() has never been called (since the cache_mngr
7247  is set up there). In that case, we just return NULL.
7248  */
7249  if (cache_mngr)
7250  {
7251  binlog_cache_data *cache_data=
7252  cache_mngr->get_binlog_cache_data(is_transactional);
7253 
7254  rows= cache_data->pending();
7255  }
7256  return (rows);
7257 }
7258 
7268 void
7269 THD::add_to_binlog_accessed_dbs(const char *db_param)
7270 {
7271  char *after_db;
7272  MEM_ROOT *db_mem_root= &main_mem_root;
7273 
7274  if (!binlog_accessed_db_names)
7275  binlog_accessed_db_names= new (db_mem_root) List<char>;
7276 
7277  if (binlog_accessed_db_names->elements > MAX_DBS_IN_EVENT_MTS)
7278  {
7279  push_warning_printf(this, Sql_condition::WARN_LEVEL_WARN,
7280  ER_MTS_UPDATED_DBS_GREATER_MAX,
7281  ER(ER_MTS_UPDATED_DBS_GREATER_MAX),
7282  MAX_DBS_IN_EVENT_MTS);
7283  return;
7284  }
7285 
7286  after_db= strdup_root(db_mem_root, db_param);
7287 
7288  /*
7289  sorted insertion is implemented with first rearranging data
7290  (pointer to char*) of the links and final appending of the least
7291  ordered data to create a new link in the list.
7292  */
7293  if (binlog_accessed_db_names->elements != 0)
7294  {
7295  List_iterator<char> it(*get_binlog_accessed_db_names());
7296 
7297  while (it++)
7298  {
7299  char *swap= NULL;
7300  char **ref_cur_db= it.ref();
7301  int cmp= strcmp(after_db, *ref_cur_db);
7302 
7303  DBUG_ASSERT(!swap || cmp < 0);
7304 
7305  if (cmp == 0)
7306  {
7307  after_db= NULL; /* dup to ignore */
7308  break;
7309  }
7310  else if (swap || cmp > 0)
7311  {
7312  swap= *ref_cur_db;
7313  *ref_cur_db= after_db;
7314  after_db= swap;
7315  }
7316  }
7317  }
7318  if (after_db)
7319  binlog_accessed_db_names->push_back(after_db, &main_mem_root);
7320 }
7321 
7322 
7421 int THD::decide_logging_format(TABLE_LIST *tables)
7422 {
7423  DBUG_ENTER("THD::decide_logging_format");
7424  DBUG_PRINT("info", ("query: %s", query()));
7425  DBUG_PRINT("info", ("variables.binlog_format: %lu",
7426  variables.binlog_format));
7427  DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x",
7428  lex->get_stmt_unsafe_flags()));
7429 
7430  reset_binlog_local_stmt_filter();
7431 
7432  /*
7433  We should not decide logging format if the binlog is closed or
7434  binlogging is off, or if the statement is filtered out from the
7435  binlog by filtering rules.
7436  */
7437  if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
7438  !(variables.binlog_format == BINLOG_FORMAT_STMT &&
7439  !binlog_filter->db_ok(db)))
7440  {
7441  /*
7442  Compute one bit field with the union of all the engine
7443  capabilities, and one with the intersection of all the engine
7444  capabilities.
7445  */
7446  handler::Table_flags flags_write_some_set= 0;
7447  handler::Table_flags flags_access_some_set= 0;
7448  handler::Table_flags flags_write_all_set=
7449  HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE;
7450 
7451  /*
7452  If different types of engines are about to be updated.
7453  For example: Innodb and Falcon; Innodb and MyIsam.
7454  */
7455  my_bool multi_write_engine= FALSE;
7456  /*
7457  If different types of engines are about to be accessed
7458  and any of them is about to be updated. For example:
7459  Innodb and Falcon; Innodb and MyIsam.
7460  */
7461  my_bool multi_access_engine= FALSE;
7462  /*
7463  Identifies if a table is changed.
7464  */
7465  my_bool is_write= FALSE;
7466  /*
7467  A pointer to a previous table that was changed.
7468  */
7469  TABLE* prev_write_table= NULL;
7470  /*
7471  A pointer to a previous table that was accessed.
7472  */
7473  TABLE* prev_access_table= NULL;
7474  /*
7475  True if at least one table is transactional.
7476  */
7477  bool write_to_some_transactional_table= false;
7478  /*
7479  True if at least one table is non-transactional.
7480  */
7481  bool write_to_some_non_transactional_table= false;
7482  /*
7483  True if all non-transactional tables that has been updated
7484  are temporary.
7485  */
7486  bool write_all_non_transactional_are_tmp_tables= true;
7491  uint replicated_tables_count= 0;
7508  uint non_replicated_tables_count= 0;
7509 #ifndef DBUG_OFF
7510  {
7511  static const char *prelocked_mode_name[] = {
7512  "NON_PRELOCKED",
7513  "PRELOCKED",
7514  "PRELOCKED_UNDER_LOCK_TABLES",
7515  };
7516  DBUG_PRINT("debug", ("prelocked_mode: %s",
7517  prelocked_mode_name[locked_tables_mode]));
7518  }
7519 #endif
7520 
7521  /*
7522  Get the capabilities vector for all involved storage engines and
7523  mask out the flags for the binary log.
7524  */
7525  for (TABLE_LIST *table= tables; table; table= table->next_global)
7526  {
7527  if (table->placeholder())
7528  continue;
7529 
7530  handler::Table_flags const flags= table->table->file->ha_table_flags();
7531 
7532  DBUG_PRINT("info", ("table: %s; ha_table_flags: 0x%llx",
7533  table->table_name, flags));
7534 
7535  if (table->table->no_replicate)
7536  {
7537  /*
7538  The statement uses a table that is not replicated.
7539  The following properties about the table:
7540  - persistent / transient
7541  - transactional / non transactional
7542  - temporary / permanent
7543  - read or write
7544  - multiple engines involved because of this table
7545  are not relevant, as this table is completely ignored.
7546  Because the statement uses a non replicated table,
7547  using STATEMENT format in the binlog is impossible.
7548  Either this statement will be discarded entirely,
7549  or it will be logged (possibly partially) in ROW format.
7550  */
7551  lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE);
7552 
7553  if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
7554  {
7555  non_replicated_tables_count++;
7556  continue;
7557  }
7558  }
7559 
7560  replicated_tables_count++;
7561 
7562  my_bool trans= table->table->file->has_transactions();
7563 
7564  if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
7565  {
7566  write_to_some_transactional_table=
7567  write_to_some_transactional_table || trans;
7568 
7569  write_to_some_non_transactional_table=
7570  write_to_some_non_transactional_table || !trans;
7571 
7572  if (prev_write_table && prev_write_table->file->ht !=
7573  table->table->file->ht)
7574  multi_write_engine= TRUE;
7575 
7576  if (table->table->s->tmp_table)
7577  lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TEMP_TRANS_TABLE :
7578  LEX::STMT_WRITES_TEMP_NON_TRANS_TABLE);
7579  else
7580  lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TRANS_TABLE :
7581  LEX::STMT_WRITES_NON_TRANS_TABLE);
7582 
7583  /*
7584  Non-transactional updates are allowed when row binlog format is
7585  used and all non-transactional tables are temporary.
7586  Binlog format is checked on THD::is_dml_gtid_compatible() method.
7587  */
7588  if (!trans)
7589  write_all_non_transactional_are_tmp_tables=
7590  write_all_non_transactional_are_tmp_tables &&
7591  table->table->s->tmp_table;
7592 
7593  flags_write_all_set &= flags;
7594  flags_write_some_set |= flags;
7595  is_write= TRUE;
7596 
7597  prev_write_table= table->table;
7598  }
7599  flags_access_some_set |= flags;
7600 
7601  if (lex->sql_command != SQLCOM_CREATE_TABLE ||
7602  (lex->sql_command == SQLCOM_CREATE_TABLE &&
7603  (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)))
7604  {
7605  if (table->table->s->tmp_table)
7606  lex->set_stmt_accessed_table(trans ? LEX::STMT_READS_TEMP_TRANS_TABLE :
7607  LEX::STMT_READS_TEMP_NON_TRANS_TABLE);
7608  else
7609  lex->set_stmt_accessed_table(trans ? LEX::STMT_READS_TRANS_TABLE :
7610  LEX::STMT_READS_NON_TRANS_TABLE);
7611  }
7612 
7613  if (prev_access_table && prev_access_table->file->ht !=
7614  table->table->file->ht)
7615  multi_access_engine= TRUE;
7616 
7617  prev_access_table= table->table;
7618  }
7619  DBUG_ASSERT(!is_write ||
7620  write_to_some_transactional_table ||
7621  write_to_some_non_transactional_table);
7622  /*
7623  write_all_non_transactional_are_tmp_tables may be true if any
7624  non-transactional table was not updated, so we fix its value here.
7625  */
7626  write_all_non_transactional_are_tmp_tables=
7627  write_all_non_transactional_are_tmp_tables &&
7628  write_to_some_non_transactional_table;
7629 
7630  DBUG_PRINT("info", ("flags_write_all_set: 0x%llx", flags_write_all_set));
7631  DBUG_PRINT("info", ("flags_write_some_set: 0x%llx", flags_write_some_set));
7632  DBUG_PRINT("info", ("flags_access_some_set: 0x%llx", flags_access_some_set));
7633  DBUG_PRINT("info", ("multi_write_engine: %d", multi_write_engine));
7634  DBUG_PRINT("info", ("multi_access_engine: %d", multi_access_engine));
7635 
7636  int error= 0;
7637  int unsafe_flags;
7638 
7639  bool multi_stmt_trans= in_multi_stmt_transaction_mode();
7640  bool trans_table= trans_has_updated_trans_table(this);
7641  bool binlog_direct= variables.binlog_direct_non_trans_update;
7642 
7643  if (lex->is_mixed_stmt_unsafe(multi_stmt_trans, binlog_direct,
7644  trans_table, tx_isolation))
7645  lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_MIXED_STATEMENT);
7646  else if (multi_stmt_trans && trans_table && !binlog_direct &&
7647  lex->stmt_accessed_table(LEX::STMT_WRITES_NON_TRANS_TABLE))
7648  lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS);
7649 
7650  /*
7651  If more than one engine is involved in the statement and at
7652  least one is doing it's own logging (is *self-logging*), the
7653  statement cannot be logged atomically, so we generate an error
7654  rather than allowing the binlog to become corrupt.
7655  */
7656  if (multi_write_engine &&
7657  (flags_write_some_set & HA_HAS_OWN_BINLOGGING))
7658  my_error((error= ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE),
7659  MYF(0));
7660  else if (multi_access_engine && flags_access_some_set & HA_HAS_OWN_BINLOGGING)
7661  lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE);
7662 
7663  /* both statement-only and row-only engines involved */
7664  if ((flags_write_all_set & (HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE)) == 0)
7665  {
7666  /*
7667  1. Error: Binary logging impossible since both row-incapable
7668  engines and statement-incapable engines are involved
7669  */
7670  my_error((error= ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE), MYF(0));
7671  }
7672  /* statement-only engines involved */
7673  else if ((flags_write_all_set & HA_BINLOG_ROW_CAPABLE) == 0)
7674  {
7675  if (lex->is_stmt_row_injection())
7676  {
7677  /*
7678  4. Error: Cannot execute row injection since table uses
7679  storage engine limited to statement-logging
7680  */
7681  my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
7682  }
7683  else if (variables.binlog_format == BINLOG_FORMAT_ROW &&
7684  sqlcom_can_generate_row_events(this))
7685  {
7686  /*
7687  2. Error: Cannot modify table that uses a storage engine
7688  limited to statement-logging when BINLOG_FORMAT = ROW
7689  */
7690  my_error((error= ER_BINLOG_ROW_MODE_AND_STMT_ENGINE), MYF(0));
7691  }
7692  else if ((unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
7693  {
7694  /*
7695  3. Error: Cannot execute statement: binlogging of unsafe
7696  statement is impossible when storage engine is limited to
7697  statement-logging and BINLOG_FORMAT = MIXED.
7698  */
7699  for (int unsafe_type= 0;
7700  unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT;
7701  unsafe_type++)
7702  if (unsafe_flags & (1 << unsafe_type))
7703  my_error((error= ER_BINLOG_UNSAFE_AND_STMT_ENGINE), MYF(0),
7704  ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
7705  }
7706  /* log in statement format! */
7707  }
7708  /* no statement-only engines */
7709  else
7710  {
7711  /* binlog_format = STATEMENT */
7712  if (variables.binlog_format == BINLOG_FORMAT_STMT)
7713  {
7714  if (lex->is_stmt_row_injection())
7715  {
7716  /*
7717  6. Error: Cannot execute row injection since
7718  BINLOG_FORMAT = STATEMENT
7719  */
7720  my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_MODE), MYF(0));
7721  }
7722  else if ((flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 &&
7723  sqlcom_can_generate_row_events(this))
7724  {
7725  /*
7726  5. Error: Cannot modify table that uses a storage engine
7727  limited to row-logging when binlog_format = STATEMENT
7728  */
7729  my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), "");
7730  }
7731  else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
7732  {
7733  /*
7734  7. Warning: Unsafe statement logged as statement due to
7735  binlog_format = STATEMENT
7736  */
7737  binlog_unsafe_warning_flags|= unsafe_flags;
7738  DBUG_PRINT("info", ("Scheduling warning to be issued by "
7739  "binlog_query: '%s'",
7740  ER(ER_BINLOG_UNSAFE_STATEMENT)));
7741  DBUG_PRINT("info", ("binlog_unsafe_warning_flags: 0x%x",
7742  binlog_unsafe_warning_flags));
7743  }
7744  /* log in statement format! */
7745  }
7746  /* No statement-only engines and binlog_format != STATEMENT.
7747  I.e., nothing prevents us from row logging if needed. */
7748  else
7749  {
7750  if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection()
7751  || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0)
7752  {
7753  /* log in row format! */
7754  set_current_stmt_binlog_format_row_if_mixed();
7755  }
7756  }
7757  }
7758 
7759  if (non_replicated_tables_count > 0)
7760  {
7761  if ((replicated_tables_count == 0) || ! is_write)
7762  {
7763  DBUG_PRINT("info", ("decision: no logging, no replicated table affected"));
7764  set_binlog_local_stmt_filter();
7765  }
7766  else
7767  {
7768  if (! is_current_stmt_binlog_format_row())
7769  {
7770  my_error((error= ER_BINLOG_STMT_MODE_AND_NO_REPL_TABLES), MYF(0));
7771  }
7772  else
7773  {
7774  clear_binlog_local_stmt_filter();
7775  }
7776  }
7777  }
7778  else
7779  {
7780  clear_binlog_local_stmt_filter();
7781  }
7782 
7783  if (!error && enforce_gtid_consistency &&
7784  !is_dml_gtid_compatible(write_to_some_transactional_table,
7785  write_to_some_non_transactional_table,
7786  write_all_non_transactional_are_tmp_tables))
7787  error= 1;
7788 
7789  if (error) {
7790  DBUG_PRINT("info", ("decision: no logging since an error was generated"));
7791  DBUG_RETURN(-1);
7792  }
7793 
7794  if (is_write &&
7795  lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
7796  {
7797  /*
7798  Master side of DML in the STMT format events parallelization.
7799  All involving table db:s are stored in a abc-ordered name list.
7800  In case the number of databases exceeds MAX_DBS_IN_EVENT_MTS maximum
7801  the list gathering breaks since it won't be sent to the slave.
7802  */
7803  for (TABLE_LIST *table= tables; table; table= table->next_global)
7804  {
7805  if (table->placeholder())
7806  continue;
7807 
7808  DBUG_ASSERT(table->table);
7809 
7810  if (table->table->file->referenced_by_foreign_key())
7811  {
7812  /*
7813  FK-referenced dbs can't be gathered currently. The following
7814  event will be marked for sequential execution on slave.
7815  */
7816  binlog_accessed_db_names= NULL;
7817  add_to_binlog_accessed_dbs("");
7818  break;
7819  }
7820  if (!is_current_stmt_binlog_format_row())
7821  add_to_binlog_accessed_dbs(table->db);
7822  }
7823  }
7824  DBUG_PRINT("info", ("decision: logging in %s format",
7825  is_current_stmt_binlog_format_row() ?
7826  "ROW" : "STATEMENT"));
7827 
7828  if (variables.binlog_format == BINLOG_FORMAT_ROW &&
7829  (lex->sql_command == SQLCOM_UPDATE ||
7830  lex->sql_command == SQLCOM_UPDATE_MULTI ||
7831  lex->sql_command == SQLCOM_DELETE ||
7832  lex->sql_command == SQLCOM_DELETE_MULTI))
7833  {
7834  String table_names;
7835  /*
7836  Generate a warning for UPDATE/DELETE statements that modify a
7837  BLACKHOLE table, as row events are not logged in row format.
7838  */
7839  for (TABLE_LIST *table= tables; table; table= table->next_global)
7840  {
7841  if (table->placeholder())
7842  continue;
7843  if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB &&
7844  table->lock_type >= TL_WRITE_ALLOW_WRITE)
7845  {
7846  table_names.append(table->table_name);
7847  table_names.append(",");
7848  }
7849  }
7850  if (!table_names.is_empty())
7851  {
7852  bool is_update= (lex->sql_command == SQLCOM_UPDATE ||
7853  lex->sql_command == SQLCOM_UPDATE_MULTI);
7854  /*
7855  Replace the last ',' with '.' for table_names
7856  */
7857  table_names.replace(table_names.length()-1, 1, ".", 1);
7858  push_warning_printf(this, Sql_condition::WARN_LEVEL_WARN,
7859  WARN_ON_BLOCKHOLE_IN_RBR,
7860  ER(WARN_ON_BLOCKHOLE_IN_RBR),
7861  is_update ? "UPDATE" : "DELETE",
7862  table_names.c_ptr());
7863  }
7864  }
7865  }
7866 #ifndef DBUG_OFF
7867  else
7868  DBUG_PRINT("info", ("decision: no logging since "
7869  "mysql_bin_log.is_open() = %d "
7870  "and (options & OPTION_BIN_LOG) = 0x%llx "
7871  "and binlog_format = %lu "
7872  "and binlog_filter->db_ok(db) = %d",
7873  mysql_bin_log.is_open(),
7874  (variables.option_bits & OPTION_BIN_LOG),
7875  variables.binlog_format,
7876  binlog_filter->db_ok(db)));
7877 #endif
7878 
7879  DBUG_RETURN(0);
7880 }
7881 
7882 
7883 bool THD::is_ddl_gtid_compatible() const
7884 {
7885  DBUG_ENTER("THD::is_ddl_gtid_compatible");
7886 
7887  // If @@session.sql_log_bin has been manually turned off (only
7888  // doable by SUPER), then no problem, we can execute any statement.
7889  if ((variables.option_bits & OPTION_BIN_LOG) == 0)
7890  DBUG_RETURN(true);
7891 
7892  if (lex->sql_command == SQLCOM_CREATE_TABLE &&
7893  !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
7894  lex->select_lex.item_list.elements)
7895  {
7896  /*
7897  CREATE ... SELECT (without TEMPORARY) is unsafe because if
7898  binlog_format=row it will be logged as a CREATE TABLE followed
7899  by row events, re-executed non-atomically as two transactions,
7900  and then written to the slave's binary log as two separate
7901  transactions with the same GTID.
7902  */
7903  my_error(ER_GTID_UNSAFE_CREATE_SELECT, MYF(0));
7904  DBUG_RETURN(false);
7905  }
7906  if ((lex->sql_command == SQLCOM_CREATE_TABLE &&
7907  (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) != 0) ||
7908  (lex->sql_command == SQLCOM_DROP_TABLE && lex->drop_temporary))
7909  {
7910  /*
7911  [CREATE|DROP] TEMPORARY TABLE is unsafe to execute
7912  inside a transaction because the table will stay and the
7913  transaction will be written to the slave's binary log with the
7914  GTID even if the transaction is rolled back.
7915  */
7916  if (in_multi_stmt_transaction_mode())
7917  {
7918  my_error(ER_GTID_UNSAFE_CREATE_DROP_TEMPORARY_TABLE_IN_TRANSACTION,
7919  MYF(0));
7920  DBUG_RETURN(false);
7921  }
7922  }
7923  DBUG_RETURN(true);
7924 }
7925 
7926 
7927 bool
7928 THD::is_dml_gtid_compatible(bool transactional_table,
7929  bool non_transactional_table,
7930  bool non_transactional_tmp_tables) const
7931 {
7932  DBUG_ENTER("THD::is_dml_gtid_compatible(bool, bool, bool)");
7933 
7934  // If @@session.sql_log_bin has been manually turned off (only
7935  // doable by SUPER), then no problem, we can execute any statement.
7936  if ((variables.option_bits & OPTION_BIN_LOG) == 0)
7937  DBUG_RETURN(true);
7938 
7939  /*
7940  Single non-transactional updates are allowed when not mixed
7941  together with transactional statements within a transaction.
7942  Furthermore, writing to transactional and non-transactional
7943  engines in a single statement is also disallowed.
7944  Multi-statement transactions on non-transactional tables are
7945  split into single-statement transactions when
7946  GTID_NEXT = "AUTOMATIC".
7947 
7948  Non-transactional updates are allowed when row binlog format is
7949  used and all non-transactional tables are temporary.
7950 
7951  The debug symbol "allow_gtid_unsafe_non_transactional_updates"
7952  disables the error. This is useful because it allows us to run
7953  old tests that were not written with the restrictions of GTIDs in
7954  mind.
7955  */
7956  if (non_transactional_table &&
7957  (transactional_table || trans_has_updated_trans_table(this)) &&
7958  !(non_transactional_tmp_tables && is_current_stmt_binlog_format_row()) &&
7959  !DBUG_EVALUATE_IF("allow_gtid_unsafe_non_transactional_updates", 1, 0))
7960  {
7961  my_error(ER_GTID_UNSAFE_NON_TRANSACTIONAL_TABLE, MYF(0));
7962  DBUG_RETURN(false);
7963  }
7964 
7965  DBUG_RETURN(true);
7966 }
7967 
7968 /*
7969  Implementation of interface to write rows to the binary log through the
7970  thread. The thread is responsible for writing the rows it has
7971  inserted/updated/deleted.
7972 */
7973 
7974 #ifndef MYSQL_CLIENT
7975 
7976 /*
7977  Template member function for ensuring that there is an rows log
7978  event of the apropriate type before proceeding.
7979 
7980  PRE CONDITION:
7981  - Events of type 'RowEventT' have the type code 'type_code'.
7982 
7983  POST CONDITION:
7984  If a non-NULL pointer is returned, the pending event for thread 'thd' will
7985  be an event of type 'RowEventT' (which have the type code 'type_code')
7986  will either empty or have enough space to hold 'needed' bytes. In
7987  addition, the columns bitmap will be correct for the row, meaning that
7988  the pending event will be flushed if the columns in the event differ from
7989  the columns suppled to the function.
7990 
7991  RETURNS
7992  If no error, a non-NULL pending event (either one which already existed or
7993  the newly created one).
7994  If error, NULL.
7995  */
7996 
7997 template <class RowsEventT> Rows_log_event*
7998 THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
7999  size_t needed,
8000  bool is_transactional,
8001  RowsEventT *hint __attribute__((unused)),
8002  const uchar* extra_row_info)
8003 {
8004  DBUG_ENTER("binlog_prepare_pending_rows_event");
8005 
8006  /* Fetch the type code for the RowsEventT template parameter */
8007  int const general_type_code= RowsEventT::TYPE_CODE;
8008 
8009  Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional);
8010 
8011  if (unlikely(pending && !pending->is_valid()))
8012  DBUG_RETURN(NULL);
8013 
8014  /*
8015  Check if the current event is non-NULL and a write-rows
8016  event. Also check if the table provided is mapped: if it is not,
8017  then we have switched to writing to a new table.
8018  If there is no pending event, we need to create one. If there is a pending
8019  event, but it's not about the same table id, or not of the same type
8020  (between Write, Update and Delete), or not the same affected columns, or
8021  going to be too big, flush this event to disk and create a new pending
8022  event.
8023  */
8024  if (!pending ||
8025  pending->server_id != serv_id ||
8026  pending->get_table_id() != table->s->table_map_id ||
8027  pending->get_general_type_code() != general_type_code ||
8028  pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
8029  pending->read_write_bitmaps_cmp(table) == FALSE ||
8030  !binlog_row_event_extra_data_eq(pending->get_extra_row_data(),
8031  extra_row_info))
8032  {
8033  /* Create a new RowsEventT... */
8034  Rows_log_event* const
8035  ev= new RowsEventT(this, table, table->s->table_map_id,
8036  is_transactional, extra_row_info);
8037  if (unlikely(!ev))
8038  DBUG_RETURN(NULL);
8039  ev->server_id= serv_id; // I don't like this, it's too easy to forget.
8040  /*
8041  flush the pending event and replace it with the newly created
8042  event...
8043  */
8044  if (unlikely(
8045  mysql_bin_log.flush_and_set_pending_rows_event(this, ev,
8046  is_transactional)))
8047  {
8048  delete ev;
8049  DBUG_RETURN(NULL);
8050  }
8051 
8052  DBUG_RETURN(ev); /* This is the new pending event */
8053  }
8054  DBUG_RETURN(pending); /* This is the current pending event */
8055 }
8056 
8057 /* Declare in unnamed namespace. */
8058 CPP_UNNAMED_NS_START
8059 
8076  public:
8087  Row_data_memory(TABLE *table, size_t const len1)
8088  : m_memory(0)
8089  {
8090 #ifndef DBUG_OFF
8091  m_alloc_checked= FALSE;
8092 #endif
8093  allocate_memory(table, len1);
8094  m_ptr[0]= has_memory() ? m_memory : 0;
8095  m_ptr[1]= 0;
8096  }
8097 
8098  Row_data_memory(TABLE *table, size_t const len1, size_t const len2)
8099  : m_memory(0)
8100  {
8101 #ifndef DBUG_OFF
8102  m_alloc_checked= FALSE;
8103 #endif
8104  allocate_memory(table, len1 + len2);
8105  m_ptr[0]= has_memory() ? m_memory : 0;
8106  m_ptr[1]= has_memory() ? m_memory + len1 : 0;
8107  }
8108 
8109  ~Row_data_memory()
8110  {
8111  if (m_memory != 0 && m_release_memory_on_destruction)
8112  my_free(m_memory);
8113  }
8114 
8121  bool has_memory() const {
8122 #ifndef DBUG_OFF
8123  m_alloc_checked= TRUE;
8124 #endif
8125  return m_memory != 0;
8126  }
8127 
8128  uchar *slot(uint s)
8129  {
8130  DBUG_ASSERT(s < sizeof(m_ptr)/sizeof(*m_ptr));
8131  DBUG_ASSERT(m_ptr[s] != 0);
8132  DBUG_ASSERT(m_alloc_checked == TRUE);
8133  return m_ptr[s];
8134  }
8135 
8136  private:
8137  void allocate_memory(TABLE *const table, size_t const total_length)
8138  {
8139  if (table->s->blob_fields == 0)
8140  {
8141  /*
8142  The maximum length of a packed record is less than this
8143  length. We use this value instead of the supplied length
8144  when allocating memory for records, since we don't know how
8145  the memory will be used in future allocations.
8146 
8147  Since table->s->reclength is for unpacked records, we have
8148  to add two bytes for each field, which can potentially be
8149  added to hold the length of a packed field.
8150  */
8151  size_t const maxlen= table->s->reclength + 2 * table->s->fields;
8152 
8153  /*
8154  Allocate memory for two records if memory hasn't been
8155  allocated. We allocate memory for two records so that it can
8156  be used when processing update rows as well.
8157  */
8158  if (table->write_row_record == 0)
8159  table->write_row_record=
8160  (uchar *) alloc_root(&table->mem_root, 2 * maxlen);
8161  m_memory= table->write_row_record;
8162  m_release_memory_on_destruction= FALSE;
8163  }
8164  else
8165  {
8166  m_memory= (uchar *) my_malloc(total_length, MYF(MY_WME));
8167  m_release_memory_on_destruction= TRUE;
8168  }
8169  }
8170 
8171 #ifndef DBUG_OFF
8172  mutable bool m_alloc_checked;
8173 #endif
8174  bool m_release_memory_on_destruction;
8175  uchar *m_memory;
8176  uchar *m_ptr[2];
8177  };
8178 
8179 CPP_UNNAMED_NS_END
8180 
8181 int THD::binlog_write_row(TABLE* table, bool is_trans,
8182  uchar const *record,
8183  const uchar* extra_row_info)
8184 {
8185  DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
8186 
8187  /*
8188  Pack records into format for transfer. We are allocating more
8189  memory than needed, but that doesn't matter.
8190  */
8191  Row_data_memory memory(table, max_row_length(table, record));
8192  if (!memory.has_memory())
8193  return HA_ERR_OUT_OF_MEM;
8194 
8195  uchar *row_data= memory.slot(0);
8196 
8197  size_t const len= pack_row(table, table->write_set, row_data, record);
8198 
8199  Rows_log_event* const ev=
8200  binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
8201  static_cast<Write_rows_log_event*>(0),
8202  extra_row_info);
8203 
8204  if (unlikely(ev == 0))
8205  return HA_ERR_OUT_OF_MEM;
8206 
8207  return ev->add_row_data(row_data, len);
8208 }
8209 
8210 int THD::binlog_update_row(TABLE* table, bool is_trans,
8211  const uchar *before_record,
8212  const uchar *after_record,
8213  const uchar* extra_row_info)
8214 {
8215  DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
8216  int error= 0;
8217 
8222  MY_BITMAP *old_read_set= table->read_set;
8223  MY_BITMAP *old_write_set= table->write_set;
8224 
8230  binlog_prepare_row_images(table);
8231 
8232  size_t const before_maxlen = max_row_length(table, before_record);
8233  size_t const after_maxlen = max_row_length(table, after_record);
8234 
8235  Row_data_memory row_data(table, before_maxlen, after_maxlen);
8236  if (!row_data.has_memory())
8237  return HA_ERR_OUT_OF_MEM;
8238 
8239  uchar *before_row= row_data.slot(0);
8240  uchar *after_row= row_data.slot(1);
8241 
8242  size_t const before_size= pack_row(table, table->read_set, before_row,
8243  before_record);
8244  size_t const after_size= pack_row(table, table->write_set, after_row,
8245  after_record);
8246 
8247  /*
8248  Don't print debug messages when running valgrind since they can
8249  trigger false warnings.
8250  */
8251 #ifndef HAVE_purify
8252  DBUG_DUMP("before_record", before_record, table->s->reclength);
8253  DBUG_DUMP("after_record", after_record, table->s->reclength);
8254  DBUG_DUMP("before_row", before_row, before_size);
8255  DBUG_DUMP("after_row", after_row, after_size);
8256 #endif
8257 
8258  Rows_log_event* const ev=
8259  binlog_prepare_pending_rows_event(table, server_id,
8260  before_size + after_size, is_trans,
8261  static_cast<Update_rows_log_event*>(0),
8262  extra_row_info);
8263 
8264  if (unlikely(ev == 0))
8265  return HA_ERR_OUT_OF_MEM;
8266 
8267  error= ev->add_row_data(before_row, before_size) ||
8268  ev->add_row_data(after_row, after_size);
8269 
8270  /* restore read/write set for the rest of execution */
8271  table->column_bitmaps_set_no_signal(old_read_set,
8272  old_write_set);
8273 
8274  return error;
8275 }
8276 
8277 int THD::binlog_delete_row(TABLE* table, bool is_trans,
8278  uchar const *record,
8279  const uchar* extra_row_info)
8280 {
8281  DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
8282  int error= 0;
8283 
8288  MY_BITMAP *old_read_set= table->read_set;
8289  MY_BITMAP *old_write_set= table->write_set;
8290 
8296  binlog_prepare_row_images(table);
8297 
8298  /*
8299  Pack records into format for transfer. We are allocating more
8300  memory than needed, but that doesn't matter.
8301  */
8302  Row_data_memory memory(table, max_row_length(table, record));
8303  if (unlikely(!memory.has_memory()))
8304  return HA_ERR_OUT_OF_MEM;
8305 
8306  uchar *row_data= memory.slot(0);
8307 
8308  DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8);
8309  size_t const len= pack_row(table, table->read_set, row_data, record);
8310 
8311  Rows_log_event* const ev=
8312  binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
8313  static_cast<Delete_rows_log_event*>(0),
8314  extra_row_info);
8315 
8316  if (unlikely(ev == 0))
8317  return HA_ERR_OUT_OF_MEM;
8318 
8319  error= ev->add_row_data(row_data, len);
8320 
8321  /* restore read/write set for the rest of execution */
8322  table->column_bitmaps_set_no_signal(old_read_set,
8323  old_write_set);
8324 
8325  return error;
8326 }
8327 
8328 void THD::binlog_prepare_row_images(TABLE *table)
8329 {
8330  DBUG_ENTER("THD::binlog_prepare_row_images");
8336  DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", table->read_set);
8337  THD *thd= table->in_use;
8338 
8344  if (table->s->primary_key < MAX_KEY &&
8345  (thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
8346  !ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT))
8347  {
8352  DBUG_ASSERT(table->read_set != &table->tmp_set);
8353 
8354  bitmap_clear_all(&table->tmp_set);
8355 
8356  switch(thd->variables.binlog_row_image)
8357  {
8358  case BINLOG_ROW_IMAGE_MINIMAL:
8359  /* MINIMAL: Mark only PK */
8360  table->mark_columns_used_by_index_no_reset(table->s->primary_key,
8361  &table->tmp_set);
8362  break;
8363  case BINLOG_ROW_IMAGE_NOBLOB:
8368  bitmap_union(&table->tmp_set, table->read_set);
8369  for (Field **ptr=table->field ; *ptr ; ptr++)
8370  {
8371  Field *field= (*ptr);
8372  if ((field->type() == MYSQL_TYPE_BLOB) &&
8373  !(field->flags & PRI_KEY_FLAG))
8374  bitmap_clear_bit(&table->tmp_set, field->field_index);
8375  }
8376  break;
8377  default:
8378  DBUG_ASSERT(0); // impossible.
8379  }
8380 
8381  /* set the temporary read_set */
8382  table->column_bitmaps_set_no_signal(&table->tmp_set,
8383  table->write_set);
8384  }
8385 
8386  DBUG_PRINT_BITSET("debug", "table->read_set (after preparing): %s", table->read_set);
8387  DBUG_VOID_RETURN;
8388 }
8389 
8390 
8391 int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
8392 {
8393  DBUG_ENTER("THD::binlog_flush_pending_rows_event");
8394  /*
8395  We shall flush the pending event even if we are not in row-based
8396  mode: it might be the case that we left row-based mode before
8397  flushing anything (e.g., if we have explicitly locked tables).
8398  */
8399  if (!mysql_bin_log.is_open())
8400  DBUG_RETURN(0);
8401 
8402  /*
8403  Mark the event as the last event of a statement if the stmt_end
8404  flag is set.
8405  */
8406  int error= 0;
8407  if (Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional))
8408  {
8409  if (stmt_end)
8410  {
8411  pending->set_flags(Rows_log_event::STMT_END_F);
8412  binlog_table_maps= 0;
8413  }
8414 
8415  error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0,
8416  is_transactional);
8417  }
8418 
8419  DBUG_RETURN(error);
8420 }
8421 
8422 
8442 bool
8443 THD::binlog_row_event_extra_data_eq(const uchar* a,
8444  const uchar* b)
8445 {
8446  return ((a == b) ||
8447  ((a != NULL) &&
8448  (b != NULL) &&
8449  (a[EXTRA_ROW_INFO_LEN_OFFSET] ==
8450  b[EXTRA_ROW_INFO_LEN_OFFSET]) &&
8451  (memcmp(a, b,
8452  a[EXTRA_ROW_INFO_LEN_OFFSET]) == 0)));
8453 }
8454 
8455 #if !defined(DBUG_OFF) && !defined(_lint)
8456 static const char *
8457 show_query_type(THD::enum_binlog_query_type qtype)
8458 {
8459  switch (qtype) {
8460  case THD::ROW_QUERY_TYPE:
8461  return "ROW";
8462  case THD::STMT_QUERY_TYPE:
8463  return "STMT";
8464  case THD::QUERY_TYPE_COUNT:
8465  default:
8466  DBUG_ASSERT(0 <= qtype && qtype < THD::QUERY_TYPE_COUNT);
8467  }
8468  static char buf[64];
8469  sprintf(buf, "UNKNOWN#%d", qtype);
8470  return buf;
8471 }
8472 #endif
8473 
8477 static void reset_binlog_unsafe_suppression()
8478 {
8479  DBUG_ENTER("reset_binlog_unsafe_suppression");
8480  unsafe_warning_suppression_is_activated= false;
8481  limit_unsafe_warning_count= 0;
8482  limit_unsafe_suppression_start_time= my_getsystime()/10000000;
8483  DBUG_VOID_RETURN;
8484 }
8485 
8489 static void print_unsafe_warning_to_log(int unsafe_type, char* buf,
8490  char* query)
8491 {
8492  DBUG_ENTER("print_unsafe_warning_in_log");
8493  sprintf(buf, ER(ER_BINLOG_UNSAFE_STATEMENT),
8494  ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
8495  sql_print_warning(ER(ER_MESSAGE_AND_STATEMENT), buf, query);
8496  DBUG_VOID_RETURN;
8497 }
8498 
8512 static void do_unsafe_limit_checkout(char* buf, int unsafe_type, char* query)
8513 {
8514  ulonglong now;
8515  DBUG_ENTER("do_unsafe_limit_checkout");
8516  DBUG_ASSERT(unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT);
8517  limit_unsafe_warning_count++;
8518  /*
8519  INITIALIZING:
8520  If this is the first time this function is called with log warning
8521  enabled, the monitoring the unsafe warnings should start.
8522  */
8523  if (limit_unsafe_suppression_start_time == 0)
8524  {
8525  limit_unsafe_suppression_start_time= my_getsystime()/10000000;
8526  print_unsafe_warning_to_log(unsafe_type, buf, query);
8527  }
8528  else
8529  {
8530  if (!unsafe_warning_suppression_is_activated)
8531  print_unsafe_warning_to_log(unsafe_type, buf, query);
8532 
8533  if (limit_unsafe_warning_count >=
8534  LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT)
8535  {
8536  now= my_getsystime()/10000000;
8537  if (!unsafe_warning_suppression_is_activated)
8538  {
8539  /*
8540  ACTIVATION:
8541  We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT warnings in
8542  less than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT we activate the
8543  suppression.
8544  */
8545  if ((now-limit_unsafe_suppression_start_time) <=
8546  LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
8547  {
8548  unsafe_warning_suppression_is_activated= true;
8549  DBUG_PRINT("info",("A warning flood has been detected and the limit \
8550 unsafety warning suppression has been activated."));
8551  }
8552  else
8553  {
8554  /*
8555  there is no flooding till now, therefore we restart the monitoring
8556  */
8557  limit_unsafe_suppression_start_time= my_getsystime()/10000000;
8558  limit_unsafe_warning_count= 0;
8559  }
8560  }
8561  else
8562  {
8563  /*
8564  Print the suppression note and the unsafe warning.
8565  */
8566  sql_print_information("The following warning was suppressed %d times \
8567 during the last %d seconds in the error log",
8568  limit_unsafe_warning_count,
8569  (int)
8570  (now-limit_unsafe_suppression_start_time));
8571  print_unsafe_warning_to_log(unsafe_type, buf, query);
8572  /*
8573  DEACTIVATION: We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT
8574  warnings in more than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT, the
8575  suppression should be deactivated.
8576  */
8577  if ((now - limit_unsafe_suppression_start_time) >
8578  LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
8579  {
8580  reset_binlog_unsafe_suppression();
8581  DBUG_PRINT("info",("The limit unsafety warning supression has been \
8582 deactivated"));
8583  }
8584  }
8585  limit_unsafe_warning_count= 0;
8586  }
8587  }
8588  DBUG_VOID_RETURN;
8589 }
8590 
8597 void THD::issue_unsafe_warnings()
8598 {
8599  char buf[MYSQL_ERRMSG_SIZE * 2];
8600  DBUG_ENTER("issue_unsafe_warnings");
8601  /*
8602  Ensure that binlog_unsafe_warning_flags is big enough to hold all
8603  bits. This is actually a constant expression.
8604  */
8605  DBUG_ASSERT(LEX::BINLOG_STMT_UNSAFE_COUNT <=
8606  sizeof(binlog_unsafe_warning_flags) * CHAR_BIT);
8607 
8608  uint32 unsafe_type_flags= binlog_unsafe_warning_flags;
8609 
8610  /*
8611  For each unsafe_type, check if the statement is unsafe in this way
8612  and issue a warning.
8613  */
8614  for (int unsafe_type=0;
8615  unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT;
8616  unsafe_type++)
8617  {
8618  if ((unsafe_type_flags & (1 << unsafe_type)) != 0)
8619  {
8620  push_warning_printf(this, Sql_condition::WARN_LEVEL_NOTE,
8621  ER_BINLOG_UNSAFE_STATEMENT,
8622  ER(ER_BINLOG_UNSAFE_STATEMENT),
8623  ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
8624  if (log_warnings)
8625  {
8626  if (unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT)
8627  do_unsafe_limit_checkout( buf, unsafe_type, query());
8628  else //cases other than LIMIT unsafety
8629  print_unsafe_warning_to_log(unsafe_type, buf, query());
8630  }
8631  }
8632  }
8633  DBUG_VOID_RETURN;
8634 }
8635 
8661 int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
8662  ulong query_len, bool is_trans, bool direct,
8663  bool suppress_use, int errcode)
8664 {
8665  DBUG_ENTER("THD::binlog_query");
8666  DBUG_PRINT("enter", ("qtype: %s query: '%s'",
8667  show_query_type(qtype), query_arg));
8668  DBUG_ASSERT(query_arg && mysql_bin_log.is_open());
8669 
8670  if (get_binlog_local_stmt_filter() == BINLOG_FILTER_SET)
8671  {
8672  /*
8673  The current statement is to be ignored, and not written to
8674  the binlog. Do not call issue_unsafe_warnings().
8675  */
8676  DBUG_RETURN(0);
8677  }
8678 
8679  /*
8680  If we are not in prelocked mode, mysql_unlock_tables() will be
8681  called after this binlog_query(), so we have to flush the pending
8682  rows event with the STMT_END_F set to unlock all tables at the
8683  slave side as well.
8684 
8685  If we are in prelocked mode, the flushing will be done inside the
8686  top-most close_thread_tables().
8687  */
8688  if (this->locked_tables_mode <= LTM_LOCK_TABLES)
8689  if (int error= binlog_flush_pending_rows_event(TRUE, is_trans))
8690  DBUG_RETURN(error);
8691 
8692  /*
8693  Warnings for unsafe statements logged in statement format are
8694  printed in three places instead of in decide_logging_format().
8695  This is because the warnings should be printed only if the statement
8696  is actually logged. When executing decide_logging_format(), we cannot
8697  know for sure if the statement will be logged:
8698 
8699  1 - sp_head::execute_procedure which prints out warnings for calls to
8700  stored procedures.
8701 
8702  2 - sp_head::execute_function which prints out warnings for calls
8703  involving functions.
8704 
8705  3 - THD::binlog_query (here) which prints warning for top level
8706  statements not covered by the two cases above: i.e., if not insided a
8707  procedure and a function.
8708 
8709  Besides, we should not try to print these warnings if it is not
8710  possible to write statements to the binary log as it happens when
8711  the execution is inside a function, or generaly speaking, when
8712  the variables.option_bits & OPTION_BIN_LOG is false.
8713  */
8714  if ((variables.option_bits & OPTION_BIN_LOG) &&
8715  sp_runtime_ctx == NULL && !binlog_evt_union.do_union)
8716  issue_unsafe_warnings();
8717 
8718  switch (qtype) {
8719  /*
8720  ROW_QUERY_TYPE means that the statement may be logged either in
8721  row format or in statement format. If
8722  current_stmt_binlog_format is row, it means that the
8723  statement has already been logged in row format and hence shall
8724  not be logged again.
8725  */
8726  case THD::ROW_QUERY_TYPE:
8727  DBUG_PRINT("debug",
8728  ("is_current_stmt_binlog_format_row: %d",
8729  is_current_stmt_binlog_format_row()));
8730  if (is_current_stmt_binlog_format_row())
8731  DBUG_RETURN(0);
8732  /* Fall through */
8733 
8734  /*
8735  STMT_QUERY_TYPE means that the query must be logged in statement
8736  format; it cannot be logged in row format. This is typically
8737  used by DDL statements. It is an error to use this query type
8738  if current_stmt_binlog_format_row is row.
8739 
8740  @todo Currently there are places that call this method with
8741  STMT_QUERY_TYPE and current_stmt_binlog_format is row. Fix those
8742  places and add assert to ensure correct behavior. /Sven
8743  */
8744  case THD::STMT_QUERY_TYPE:
8745  /*
8746  The MYSQL_LOG::write() function will set the STMT_END_F flag and
8747  flush the pending rows event if necessary.
8748  */
8749  {
8750  Query_log_event qinfo(this, query_arg, query_len, is_trans, direct,
8751  suppress_use, errcode);
8752  /*
8753  Binlog table maps will be irrelevant after a Query_log_event
8754  (they are just removed on the slave side) so after the query
8755  log event is written to the binary log, we pretend that no
8756  table maps were written.
8757  */
8758  int error= mysql_bin_log.write_event(&qinfo);
8759  binlog_table_maps= 0;
8760  DBUG_RETURN(error);
8761  }
8762  break;
8763 
8764  case THD::QUERY_TYPE_COUNT:
8765  default:
8766  DBUG_ASSERT(0 <= qtype && qtype < QUERY_TYPE_COUNT);
8767  }
8768  DBUG_RETURN(0);
8769 }
8770 
8771 #endif /* !defined(MYSQL_CLIENT) */
8772 
8773 struct st_mysql_storage_engine binlog_storage_engine=
8774 { MYSQL_HANDLERTON_INTERFACE_VERSION };
8775 
8778 mysql_declare_plugin(binlog)
8779 {
8780  MYSQL_STORAGE_ENGINE_PLUGIN,
8781  &binlog_storage_engine,
8782  "binlog",
8783  "MySQL AB",
8784  "This is a pseudo storage engine to represent the binlog in a transaction",
8785  PLUGIN_LICENSE_GPL,
8786  binlog_init, /* Plugin Init */
8787  NULL, /* Plugin Deinit */
8788  0x0100 /* 1.0 */,
8789  NULL, /* status variables */
8790  NULL, /* system variables */
8791  NULL, /* config options */
8792  0,
8793 }
8794 mysql_declare_plugin_end;