MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
binlog.h
1 #ifndef BINLOG_H_INCLUDED
2 /* Copyright (c) 2010, 2011, 2012, 2013 Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software Foundation,
15  51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
16 
17 #define BINLOG_H_INCLUDED
18 
19 #include "mysqld.h" /* opt_relay_logname */
20 #include "log_event.h"
21 #include "log.h"
22 
23 class Relay_log_info;
24 class Master_info;
25 
27 
32 public:
33  class Mutex_queue {
34  friend class Stage_manager;
35  public:
36  Mutex_queue()
37  : m_first(NULL), m_last(&m_first)
38  {
39  }
40 
41  void init(
42 #ifdef HAVE_PSI_INTERFACE
43  PSI_mutex_key key_LOCK_queue
44 #endif
45  ) {
46  mysql_mutex_init(key_LOCK_queue, &m_lock, MY_MUTEX_INIT_FAST);
47  }
48 
49  void deinit() {
50  mysql_mutex_destroy(&m_lock);
51  }
52 
53  bool is_empty() const {
54  return m_first == NULL;
55  }
56 
58  bool append(THD *first);
59 
65  THD *fetch_and_empty();
66 
67  std::pair<bool,THD*> pop_front();
68 
69  private:
70  void lock() { mysql_mutex_lock(&m_lock); }
71  void unlock() { mysql_mutex_unlock(&m_lock); }
72 
77  THD *m_first;
78 
85  THD **m_last;
86 
88  mysql_mutex_t m_lock;
89  } __attribute__((aligned(CPU_LEVEL1_DCACHE_LINESIZE)));
90 
91 public:
93  {
94  }
95 
96  ~Stage_manager()
97  {
98  }
99 
103  enum StageID {
104  FLUSH_STAGE,
105  SYNC_STAGE,
106  COMMIT_STAGE,
107  STAGE_COUNTER
108  };
109 
110  void init(
111 #ifdef HAVE_PSI_INTERFACE
112  PSI_mutex_key key_LOCK_flush_queue,
113  PSI_mutex_key key_LOCK_sync_queue,
114  PSI_mutex_key key_LOCK_commit_queue,
115  PSI_mutex_key key_LOCK_done,
116  PSI_cond_key key_COND_done
117 #endif
118  )
119  {
120  mysql_mutex_init(key_LOCK_done, &m_lock_done, MY_MUTEX_INIT_FAST);
121  mysql_cond_init(key_COND_done, &m_cond_done, NULL);
122 #ifndef DBUG_OFF
123  /* reuse key_COND_done 'cos a new PSI object would be wasteful in DBUG_ON */
124  mysql_cond_init(key_COND_done, &m_cond_preempt, NULL);
125 #endif
126  m_queue[FLUSH_STAGE].init(
127 #ifdef HAVE_PSI_INTERFACE
128  key_LOCK_flush_queue
129 #endif
130  );
131  m_queue[SYNC_STAGE].init(
132 #ifdef HAVE_PSI_INTERFACE
133  key_LOCK_sync_queue
134 #endif
135  );
136  m_queue[COMMIT_STAGE].init(
137 #ifdef HAVE_PSI_INTERFACE
138  key_LOCK_commit_queue
139 #endif
140  );
141  }
142 
143  void deinit()
144  {
145  for (size_t i = 0 ; i < STAGE_COUNTER ; ++i)
146  m_queue[i].deinit();
147  mysql_cond_destroy(&m_cond_done);
148  mysql_mutex_destroy(&m_lock_done);
149  }
150 
173  bool enroll_for(StageID stage, THD *first, mysql_mutex_t *stage_mutex);
174 
175  std::pair<bool,THD*> pop_front(StageID stage)
176  {
177  return m_queue[stage].pop_front();
178  }
179 
180 #ifndef DBUG_OFF
181 
189  void clear_preempt_status(THD *head);
190 #endif
191 
197  THD *fetch_queue_for(StageID stage) {
198  DBUG_PRINT("debug", ("Fetching queue for stage %d", stage));
199  return m_queue[stage].fetch_and_empty();
200  }
201 
202  void signal_done(THD *queue) {
203  mysql_mutex_lock(&m_lock_done);
204  for (THD *thd= queue ; thd ; thd = thd->next_to_commit)
205  thd->transaction.flags.pending= false;
206  mysql_mutex_unlock(&m_lock_done);
207  mysql_cond_broadcast(&m_cond_done);
208  }
209 
210 private:
218  Mutex_queue m_queue[STAGE_COUNTER];
219 
221  mysql_cond_t m_cond_done;
222 
224  mysql_mutex_t m_lock_done;
225 #ifndef DBUG_OFF
226 
227  bool leader_await_preempt_status;
228 
230  mysql_cond_t m_cond_preempt;
231 #endif
232 };
233 
234 
235 class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
236 {
237  private:
238 #ifdef HAVE_PSI_INTERFACE
239 
240  PSI_mutex_key m_key_LOCK_index;
241 
242  PSI_mutex_key m_key_COND_done;
243 
244  PSI_mutex_key m_key_LOCK_commit_queue;
245  PSI_mutex_key m_key_LOCK_done;
246  PSI_mutex_key m_key_LOCK_flush_queue;
247  PSI_mutex_key m_key_LOCK_sync_queue;
249  PSI_mutex_key m_key_LOCK_commit;
251  PSI_mutex_key m_key_LOCK_sync;
253  PSI_mutex_key m_key_LOCK_xids;
255  PSI_cond_key m_key_update_cond;
257  PSI_cond_key m_key_prep_xids_cond;
259  PSI_file_key m_key_file_log;
261  PSI_file_key m_key_file_log_index;
262 #endif
263  /* POSIX thread objects are inited by init_pthread_objects() */
264  mysql_mutex_t LOCK_index;
265  mysql_mutex_t LOCK_commit;
266  mysql_mutex_t LOCK_sync;
267  mysql_mutex_t LOCK_xids;
268  mysql_cond_t update_cond;
269  ulonglong bytes_written;
270  IO_CACHE index_file;
271  char index_file_name[FN_REFLEN];
272  /*
273  crash_safe_index_file is temp file used for guaranteeing
274  index file crash safe when master server restarts.
275  */
276  IO_CACHE crash_safe_index_file;
277  char crash_safe_index_file_name[FN_REFLEN];
278  /*
279  purge_file is a temp file used in purge_logs so that the index file
280  can be updated before deleting files from disk, yielding better crash
281  recovery. It is created on demand the first time purge_logs is called
282  and then reused for subsequent calls. It is cleaned up in cleanup().
283  */
284  IO_CACHE purge_index_file;
285  char purge_index_file_name[FN_REFLEN];
286  /*
287  The max size before rotation (usable only if log_type == LOG_BIN: binary
288  logs and relay logs).
289  For a binlog, max_size should be max_binlog_size.
290  For a relay log, it should be max_relay_log_size if this is non-zero,
291  max_binlog_size otherwise.
292  max_size is set in init(), and dynamically changed (when one does SET
293  GLOBAL MAX_BINLOG_SIZE|MAX_RELAY_LOG_SIZE) by fix_max_binlog_size and
294  fix_max_relay_log_size).
295  */
296  ulong max_size;
297 
298  // current file sequence number for load data infile binary logging
299  uint file_id;
300  uint open_count; // For replication
301  int readers_count;
302 
303  /* pointer to the sync period variable, for binlog this will be
304  sync_binlog_period, for relay log this will be
305  sync_relay_log_period
306  */
307  uint *sync_period_ptr;
308  uint sync_counter;
309 
310  my_atomic_rwlock_t m_prep_xids_lock;
311  mysql_cond_t m_prep_xids_cond;
312  volatile int32 m_prep_xids;
313 
317  void inc_prep_xids(THD *thd) {
318  DBUG_ENTER("MYSQL_BIN_LOG::inc_prep_xids");
319  my_atomic_rwlock_wrlock(&m_prep_xids_lock);
320 #ifndef DBUG_OFF
321  int result= my_atomic_add32(&m_prep_xids, 1);
322 #else
323  (void) my_atomic_add32(&m_prep_xids, 1);
324 #endif
325  DBUG_PRINT("debug", ("m_prep_xids: %d", result + 1));
326  my_atomic_rwlock_wrunlock(&m_prep_xids_lock);
327  thd->transaction.flags.xid_written= true;
328  DBUG_VOID_RETURN;
329  }
330 
336  void dec_prep_xids(THD *thd) {
337  DBUG_ENTER("MYSQL_BIN_LOG::dec_prep_xids");
338  my_atomic_rwlock_wrlock(&m_prep_xids_lock);
339  int32 result= my_atomic_add32(&m_prep_xids, -1);
340  DBUG_PRINT("debug", ("m_prep_xids: %d", result - 1));
341  my_atomic_rwlock_wrunlock(&m_prep_xids_lock);
342  thd->transaction.flags.xid_written= false;
343  /* If the old value was 1, it is zero now. */
344  if (result == 1)
345  {
346  mysql_mutex_lock(&LOCK_xids);
347  mysql_cond_signal(&m_prep_xids_cond);
348  mysql_mutex_unlock(&LOCK_xids);
349  }
350  DBUG_VOID_RETURN;
351  }
352 
353  int32 get_prep_xids() {
354  my_atomic_rwlock_rdlock(&m_prep_xids_lock);
355  int32 result= my_atomic_load32(&m_prep_xids);
356  my_atomic_rwlock_rdunlock(&m_prep_xids_lock);
357  return result;
358  }
359 
360  inline uint get_sync_period()
361  {
362  return *sync_period_ptr;
363  }
364 
365  int write_to_file(IO_CACHE *cache);
366  /*
367  This is used to start writing to a new log file. The difference from
368  new_file() is locking. new_file_without_locking() does not acquire
369  LOCK_log.
370  */
371  int new_file_without_locking(Format_description_log_event *extra_description_event);
372  int new_file_impl(bool need_lock, Format_description_log_event *extra_description_event);
373 
375  Stage_manager stage_manager;
376  void do_flush(THD *thd);
377 
378 public:
380  using MYSQL_LOG::is_open;
381 
382  /* This is relay log */
383  bool is_relay_log;
384  ulong signal_cnt; // update of the counter is checked by heartbeat
385  uint8 checksum_alg_reset; // to contain a new value when binlog is rotated
386  /*
387  Holds the last seen in Relay-Log FD's checksum alg value.
388  The initial value comes from the slave's local FD that heads
389  the very first Relay-Log file. In the following the value may change
390  with each received master's FD_m.
391  Besides to be used in verification events that IO thread receives
392  (except the 1st fake Rotate, see @c Master_info:: checksum_alg_before_fd),
393  the value specifies if/how to compute checksum for slave's local events
394  and the first fake Rotate (R_f^1) coming from the master.
395  R_f^1 needs logging checksum-compatibly with the RL's heading FD_s.
396 
397  Legends for the checksum related comments:
398 
399  FD - Format-Description event,
400  R - Rotate event
401  R_f - the fake Rotate event
402  E - an arbirary event
403 
404  The underscore indexes for any event
405  `_s' indicates the event is generated by Slave
406  `_m' - by Master
407 
408  Two special underscore indexes of FD:
409  FD_q - Format Description event for queuing (relay-logging)
410  FD_e - Format Description event for executing (relay-logging)
411 
412  Upper indexes:
413  E^n - n:th event is a sequence
414 
415  RL - Relay Log
416  (A) - checksum algorithm descriptor value
417  FD.(A) - the value of (A) in FD
418  */
419  uint8 relay_log_checksum_alg;
420 
421  MYSQL_BIN_LOG(uint *sync_period);
422  /*
423  note that there's no destructor ~MYSQL_BIN_LOG() !
424  The reason is that we don't want it to be automatically called
425  on exit() - but only during the correct shutdown process
426  */
427 
428 #ifdef HAVE_PSI_INTERFACE
429  void set_psi_keys(PSI_mutex_key key_LOCK_index,
430  PSI_mutex_key key_LOCK_commit,
431  PSI_mutex_key key_LOCK_commit_queue,
432  PSI_mutex_key key_LOCK_done,
433  PSI_mutex_key key_LOCK_flush_queue,
434  PSI_mutex_key key_LOCK_log,
435  PSI_mutex_key key_LOCK_sync,
436  PSI_mutex_key key_LOCK_sync_queue,
437  PSI_mutex_key key_LOCK_xids,
438  PSI_cond_key key_COND_done,
439  PSI_cond_key key_update_cond,
440  PSI_cond_key key_prep_xids_cond,
441  PSI_file_key key_file_log,
442  PSI_file_key key_file_log_index)
443  {
444  m_key_COND_done= key_COND_done;
445 
446  m_key_LOCK_commit_queue= key_LOCK_commit_queue;
447  m_key_LOCK_done= key_LOCK_done;
448  m_key_LOCK_flush_queue= key_LOCK_flush_queue;
449  m_key_LOCK_sync_queue= key_LOCK_sync_queue;
450 
451  m_key_LOCK_index= key_LOCK_index;
452  m_key_LOCK_log= key_LOCK_log;
453  m_key_LOCK_commit= key_LOCK_commit;
454  m_key_LOCK_sync= key_LOCK_sync;
455  m_key_LOCK_xids= key_LOCK_xids;
456  m_key_update_cond= key_update_cond;
457  m_key_prep_xids_cond= key_prep_xids_cond;
458  m_key_file_log= key_file_log;
459  m_key_file_log_index= key_file_log_index;
460  }
461 #endif
462 
472  bool find_first_log_not_in_gtid_set(char *binlog_file_name,
473  const Gtid_set *gtid_set,
474  const char **errmsg);
475 
491  bool init_gtid_sets(Gtid_set *gtid_set, Gtid_set *lost_groups,
492  bool verify_checksum, bool need_lock);
493 
494  void set_previous_gtid_set(Gtid_set *previous_gtid_set_param)
495  {
496  previous_gtid_set= previous_gtid_set_param;
497  }
498 private:
499  Gtid_set* previous_gtid_set;
500 
501  int open(const char *opt_name) { return open_binlog(opt_name); }
502  bool change_stage(THD *thd, Stage_manager::StageID stage,
503  THD* queue, mysql_mutex_t *leave,
504  mysql_mutex_t *enter);
505  std::pair<int,my_off_t> flush_thread_caches(THD *thd);
506  int flush_cache_to_file(my_off_t *flush_end_pos);
507  int finish_commit(THD *thd);
508  std::pair<bool, bool> sync_binlog_file(bool force);
509  void process_commit_stage_queue(THD *thd, THD *queue);
510  void process_after_commit_stage_queue(THD *thd, THD *first);
511  int process_flush_stage_queue(my_off_t *total_bytes_var, bool *rotate_var,
512  THD **out_queue_var);
513  int ordered_commit(THD *thd, bool all, bool skip_commit = false);
514 public:
515  int open_binlog(const char *opt_name);
516  void close();
517  enum_result commit(THD *thd, bool all);
518  int rollback(THD *thd, bool all);
519  int prepare(THD *thd, bool all);
521  my_off_t *valid_pos);
523 #if !defined(MYSQL_CLIENT)
524 
525  void update_thd_next_event_pos(THD *thd);
526  int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
527  bool is_transactional);
528 
529 #endif /* !defined(MYSQL_CLIENT) */
530  void add_bytes_written(ulonglong inc)
531  {
532  bytes_written += inc;
533  }
534  void reset_bytes_written()
535  {
536  bytes_written = 0;
537  }
538  void harvest_bytes_written(ulonglong* counter)
539  {
540 #ifndef DBUG_OFF
541  char buf1[22],buf2[22];
542 #endif
543  DBUG_ENTER("harvest_bytes_written");
544  (*counter)+=bytes_written;
545  DBUG_PRINT("info",("counter: %s bytes_written: %s", llstr(*counter,buf1),
546  llstr(bytes_written,buf2)));
547  bytes_written=0;
548  DBUG_VOID_RETURN;
549  }
550  void set_max_size(ulong max_size_arg);
551  void signal_update();
552  int wait_for_update_relay_log(THD* thd, const struct timespec * timeout);
553  int wait_for_update_bin_log(THD* thd, const struct timespec * timeout);
554 public:
555  void init_pthread_objects();
556  void cleanup();
575  bool open_binlog(const char *log_name,
576  const char *new_name,
577  enum cache_type io_cache_type_arg,
578  ulong max_size,
579  bool null_created,
580  bool need_lock_index, bool need_sid_lock,
581  Format_description_log_event *extra_description_event);
582  bool open_index_file(const char *index_file_name_arg,
583  const char *log_name, bool need_lock_index);
584  /* Use this to start writing a new log file */
585  int new_file(Format_description_log_event *extra_description_event);
586 
587  bool write_event(Log_event* event_info);
588  bool write_cache(THD *thd, class binlog_cache_data *binlog_cache_data);
589  int do_write_cache(IO_CACHE *cache);
590 
591  void set_write_error(THD *thd, bool is_transactional);
592  bool check_write_error(THD *thd);
593  bool write_incident(THD *thd, bool need_lock_log,
594  bool do_flush_and_sync= true);
595  bool write_incident(Incident_log_event *ev, bool need_lock_log,
596  bool do_flush_and_sync= true);
597 
598  void start_union_events(THD *thd, query_id_t query_id_param);
599  void stop_union_events(THD *thd);
600  bool is_query_in_union(THD *thd, query_id_t query_id_param);
601 
602 #ifdef HAVE_REPLICATION
603  bool append_buffer(const char* buf, uint len, Master_info *mi);
604  bool append_event(Log_event* ev, Master_info *mi);
605 private:
606  bool after_append_to_relay_log(Master_info *mi);
607 #endif // ifdef HAVE_REPLICATION
608 public:
609 
610  void make_log_name(char* buf, const char* log_ident);
611  bool is_active(const char* log_file_name);
612  int remove_logs_from_index(LOG_INFO* linfo, bool need_update_threads);
613  int rotate(bool force_rotate, bool* check_purge);
614  void purge();
615  int rotate_and_purge(bool force_rotate);
630  bool flush_and_sync(const bool force= false);
631  int purge_logs(const char *to_log, bool included,
632  bool need_lock_index, bool need_update_threads,
633  ulonglong *decrease_log_space, bool auto_purge);
634  int purge_logs_before_date(time_t purge_time, bool auto_purge);
635  int purge_first_log(Relay_log_info* rli, bool included);
636  int set_crash_safe_index_file_name(const char *base_file_name);
639  int add_log_to_index(uchar* log_file_name, int name_len,
640  bool need_lock_index);
641  int move_crash_safe_index_file_to_index_file(bool need_lock_index);
642  int set_purge_index_file_name(const char *base_file_name);
643  int open_purge_index_file(bool destroy);
644  bool is_inited_purge_index_file();
645  int close_purge_index_file();
646  int clean_purge_index_file();
647  int sync_purge_index_file();
648  int register_purge_index_entry(const char* entry);
649  int register_create_index_entry(const char* entry);
650  int purge_index_entry(THD *thd, ulonglong *decrease_log_space,
651  bool need_lock_index);
652  bool reset_logs(THD* thd);
653  void close(uint exiting);
654 
655  // iterating through the log index file
656  int find_log_pos(LOG_INFO* linfo, const char* log_name,
657  bool need_lock_index);
658  int find_next_log(LOG_INFO* linfo, bool need_lock_index);
659  int get_current_log(LOG_INFO* linfo);
660  int raw_get_current_log(LOG_INFO* linfo);
661  uint next_file_id();
662  inline char* get_index_fname() { return index_file_name;}
663  inline char* get_log_fname() { return log_file_name; }
664  inline char* get_name() { return name; }
665  inline mysql_mutex_t* get_log_lock() { return &LOCK_log; }
666  inline mysql_cond_t* get_log_cond() { return &update_cond; }
667  inline IO_CACHE* get_log_file() { return &log_file; }
668 
669  inline void lock_index() { mysql_mutex_lock(&LOCK_index);}
670  inline void unlock_index() { mysql_mutex_unlock(&LOCK_index);}
671  inline IO_CACHE *get_index_file() { return &index_file;}
672  inline uint32 get_open_count() { return open_count; }
673 };
674 
675 typedef struct st_load_file_info
676 {
677  THD* thd;
678  my_off_t last_pos_in_file;
679  bool wrote_create_file, log_delayed;
681 
682 extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
683 
684 bool trans_has_updated_trans_table(const THD* thd);
685 bool stmt_has_updated_trans_table(const THD *thd);
686 bool ending_trans(THD* thd, const bool all);
687 bool ending_single_stmt_trans(THD* thd, const bool all);
688 bool trans_cannot_safely_rollback(const THD* thd);
689 bool stmt_cannot_safely_rollback(const THD* thd);
690 
691 int log_loaded_block(IO_CACHE* file);
692 
696 File open_binlog_file(IO_CACHE *log, const char *log_file_name,
697  const char **errmsg);
698 int check_binlog_magic(IO_CACHE* log, const char** errmsg);
699 bool purge_master_logs(THD* thd, const char* to_log);
700 bool purge_master_logs_before_date(THD* thd, time_t purge_time);
701 bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log);
702 bool mysql_show_binlog_events(THD* thd);
703 void check_binlog_cache_size(THD *thd);
704 void check_binlog_stmt_cache_size(THD *thd);
705 bool binlog_enabled();
706 void register_binlog_handler(THD *thd, bool trx);
708 
709 extern const char *log_bin_index;
710 extern const char *log_bin_basename;
711 extern bool opt_binlog_order_commits;
712 
727 inline bool normalize_binlog_name(char *to, const char *from, bool is_relay_log)
728 {
729  DBUG_ENTER("normalize_binlog_name");
730  bool error= false;
731  char buff[FN_REFLEN];
732  char *ptr= (char*) from;
733  char *opt_name= is_relay_log ? opt_relay_logname : opt_bin_logname;
734 
735  DBUG_ASSERT(from);
736 
737  /* opt_name is not null and not empty and from is a relative path */
738  if (opt_name && opt_name[0] && from && !test_if_hard_path(from))
739  {
740  // take the path from opt_name
741  // take the filename from from
742  char log_dirpart[FN_REFLEN], log_dirname[FN_REFLEN];
743  size_t log_dirpart_len, log_dirname_len;
744  dirname_part(log_dirpart, opt_name, &log_dirpart_len);
745  dirname_part(log_dirname, from, &log_dirname_len);
746 
747  /* log may be empty => relay-log or log-bin did not
748  hold paths, just filename pattern */
749  if (log_dirpart_len > 0)
750  {
751  /* create the new path name */
752  if(fn_format(buff, from+log_dirname_len, log_dirpart, "",
753  MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH)) == NULL)
754  {
755  error= true;
756  goto end;
757  }
758 
759  ptr= buff;
760  }
761  }
762 
763  DBUG_ASSERT(ptr);
764 
765  if (ptr)
766  strmake(to, ptr, strlen(ptr));
767 
768 end:
769  DBUG_RETURN(error);
770 }
771 #endif /* BINLOG_H_INCLUDED */