MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
semisync_master.cc
1 /* Copyright (C) 2007 Google Inc.
2  Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
3  Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17 
18 
19 #include "semisync_master.h"
20 
21 #define TIME_THOUSAND 1000
22 #define TIME_MILLION 1000000
23 #define TIME_BILLION 1000000000
24 
25 /* This indicates whether semi-synchronous replication is enabled. */
26 char rpl_semi_sync_master_enabled;
27 unsigned long rpl_semi_sync_master_timeout;
28 unsigned long rpl_semi_sync_master_trace_level;
29 char rpl_semi_sync_master_status = 0;
30 unsigned long rpl_semi_sync_master_yes_transactions = 0;
31 unsigned long rpl_semi_sync_master_no_transactions = 0;
32 unsigned long rpl_semi_sync_master_off_times = 0;
33 unsigned long rpl_semi_sync_master_timefunc_fails = 0;
34 unsigned long rpl_semi_sync_master_wait_timeouts = 0;
35 unsigned long rpl_semi_sync_master_wait_sessions = 0;
36 unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
37 unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
38 unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
39 unsigned long rpl_semi_sync_master_avg_net_wait_time = 0;
40 unsigned long long rpl_semi_sync_master_net_wait_num = 0;
41 unsigned long rpl_semi_sync_master_clients = 0;
42 unsigned long long rpl_semi_sync_master_net_wait_time = 0;
43 unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
44 char rpl_semi_sync_master_wait_no_slave = 1;
45 
46 
47 static int getWaitTime(const struct timespec& start_ts);
48 
49 static unsigned long long timespec_to_usec(const struct timespec *ts)
50 {
51 #ifndef __WIN__
52  return (unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
53 #else
54  return ts->tv.i64 / 10;
55 #endif /* __WIN__ */
56 }
57 
58 /*******************************************************************************
59  *
60  * <ActiveTranx> class : manage all active transaction nodes
61  *
62  ******************************************************************************/
63 
64 ActiveTranx::ActiveTranx(mysql_mutex_t *lock,
65  unsigned long trace_level)
66  : Trace(trace_level), allocator_(max_connections),
67  num_entries_(max_connections << 1), /* Transaction hash table size
68  * is set to double the size
69  * of max_connections */
70  lock_(lock)
71 {
72  /* No transactions are in the list initially. */
73  trx_front_ = NULL;
74  trx_rear_ = NULL;
75 
76  /* Create the hash table to find a transaction's ending event. */
77  trx_htb_ = new TranxNode *[num_entries_];
78  for (int idx = 0; idx < num_entries_; ++idx)
79  trx_htb_[idx] = NULL;
80 
81  sql_print_information("Semi-sync replication initialized for transactions.");
82 }
83 
84 ActiveTranx::~ActiveTranx()
85 {
86  delete [] trx_htb_;
87  trx_htb_ = NULL;
88  num_entries_ = 0;
89 }
90 
91 unsigned int ActiveTranx::calc_hash(const unsigned char *key,
92  unsigned int length)
93 {
94  unsigned int nr = 1, nr2 = 4;
95 
96  /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
97  while (length--)
98  {
99  nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
100  nr2 += 3;
101  }
102  return((unsigned int) nr);
103 }
104 
105 unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
106  my_off_t log_file_pos)
107 {
108  unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
109  strlen(log_file_name));
110  unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
111  sizeof(log_file_pos));
112 
113  return (hash1 + hash2) % num_entries_;
114 }
115 
116 int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
117  const char *log_file_name2, my_off_t log_file_pos2)
118 {
119  int cmp = strcmp(log_file_name1, log_file_name2);
120 
121  if (cmp != 0)
122  return cmp;
123 
124  if (log_file_pos1 > log_file_pos2)
125  return 1;
126  else if (log_file_pos1 < log_file_pos2)
127  return -1;
128  return 0;
129 }
130 
131 int ActiveTranx::insert_tranx_node(const char *log_file_name,
132  my_off_t log_file_pos)
133 {
134  const char *kWho = "ActiveTranx:insert_tranx_node";
135  TranxNode *ins_node;
136  int result = 0;
137  unsigned int hash_val;
138 
139  function_enter(kWho);
140 
141  ins_node = allocator_.allocate_node();
142  if (!ins_node)
143  {
144  sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
145  kWho, log_file_name, (unsigned long)log_file_pos);
146  result = -1;
147  goto l_end;
148  }
149 
150  /* insert the binlog position in the active transaction list. */
151  strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
152  ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
153  ins_node->log_pos_ = log_file_pos;
154 
155  if (!trx_front_)
156  {
157  /* The list is empty. */
158  trx_front_ = trx_rear_ = ins_node;
159  }
160  else
161  {
162  int cmp = compare(ins_node, trx_rear_);
163  if (cmp > 0)
164  {
165  /* Compare with the tail first. If the transaction happens later in
166  * binlog, then make it the new tail.
167  */
168  trx_rear_->next_ = ins_node;
169  trx_rear_ = ins_node;
170  }
171  else
172  {
173  /* Otherwise, it is an error because the transaction should hold the
174  * mysql_bin_log.LOCK_log when appending events.
175  */
176  sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
177  "new node (%s, %lu)", kWho,
178  trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_,
179  ins_node->log_name_, (unsigned long)ins_node->log_pos_);
180  result = -1;
181  goto l_end;
182  }
183  }
184 
185  hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
186  ins_node->hash_next_ = trx_htb_[hash_val];
187  trx_htb_[hash_val] = ins_node;
188 
189  if (trace_level_ & kTraceDetail)
190  sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
191  ins_node->log_name_, (unsigned long)ins_node->log_pos_,
192  hash_val);
193 
194  l_end:
195  return function_exit(kWho, result);
196 }
197 
198 bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
199  my_off_t log_file_pos)
200 {
201  const char *kWho = "ActiveTranx::is_tranx_end_pos";
202  function_enter(kWho);
203 
204  unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
205  TranxNode *entry = trx_htb_[hash_val];
206 
207  while (entry != NULL)
208  {
209  if (compare(entry, log_file_name, log_file_pos) == 0)
210  break;
211 
212  entry = entry->hash_next_;
213  }
214 
215  if (trace_level_ & kTraceDetail)
216  sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
217  log_file_name, (unsigned long)log_file_pos, hash_val);
218 
219  function_exit(kWho, (entry != NULL));
220  return (entry != NULL);
221 }
222 
223 int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
224  my_off_t log_file_pos)
225 {
226  const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
227  TranxNode *new_front;
228 
229  function_enter(kWho);
230 
231  if (log_file_name != NULL)
232  {
233  new_front = trx_front_;
234 
235  while (new_front)
236  {
237  if (compare(new_front, log_file_name, log_file_pos) > 0)
238  break;
239  new_front = new_front->next_;
240  }
241  }
242  else
243  {
244  /* If log_file_name is NULL, clear everything. */
245  new_front = NULL;
246  }
247 
248  if (new_front == NULL)
249  {
250  /* No active transaction nodes after the call. */
251 
252  /* Clear the hash table. */
253  memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
254  allocator_.free_all_nodes();
255 
256  /* Clear the active transaction list. */
257  if (trx_front_ != NULL)
258  {
259  trx_front_ = NULL;
260  trx_rear_ = NULL;
261  }
262 
263  if (trace_level_ & kTraceDetail)
264  sql_print_information("%s: cleared all nodes", kWho);
265  }
266  else if (new_front != trx_front_)
267  {
268  TranxNode *curr_node, *next_node;
269 
270  /* Delete all transaction nodes before the confirmation point. */
271  int n_frees = 0;
272  curr_node = trx_front_;
273  while (curr_node != new_front)
274  {
275  next_node = curr_node->next_;
276  n_frees++;
277 
278  /* Remove the node from the hash table. */
279  unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
280  TranxNode **hash_ptr = &(trx_htb_[hash_val]);
281  while ((*hash_ptr) != NULL)
282  {
283  if ((*hash_ptr) == curr_node)
284  {
285  (*hash_ptr) = curr_node->hash_next_;
286  break;
287  }
288  hash_ptr = &((*hash_ptr)->hash_next_);
289  }
290 
291  curr_node = next_node;
292  }
293 
294  trx_front_ = new_front;
295  allocator_.free_nodes_before(trx_front_);
296 
297  if (trace_level_ & kTraceDetail)
298  sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
299  kWho, n_frees,
300  trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
301  }
302 
303  return function_exit(kWho, 0);
304 }
305 
306 
307 /*******************************************************************************
308  *
309  * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
310  * <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave.
311  *
312  * The most important functions during semi-syn replication listed:
313  *
314  * Master:
315  * . reportReplyBinlog(): called by the binlog dump thread when it receives
316  * the slave's status information.
317  * . updateSyncHeader(): based on transaction waiting information, decide
318  * whether to request the slave to reply.
319  * . writeTranxInBinlog(): called by the transaction thread when it finishes
320  * writing all transaction events in binlog.
321  * . commitTrx(): transaction thread wait for the slave reply.
322  *
323  * Slave:
324  * . slaveReadSyncHeader(): read the semi-sync header from the master, get the
325  * sync status and get the payload for events.
326  * . slaveReply(): reply to the master about the replication progress.
327  *
328  ******************************************************************************/
329 
330 ReplSemiSyncMaster::ReplSemiSyncMaster()
331  : active_tranxs_(NULL),
332  init_done_(false),
333  reply_file_name_inited_(false),
334  reply_file_pos_(0L),
335  wait_file_name_inited_(false),
336  wait_file_pos_(0),
337  master_enabled_(false),
338  wait_timeout_(0L),
339  state_(0)
340 {
341  strcpy(reply_file_name_, "");
342  strcpy(wait_file_name_, "");
343 }
344 
345 int ReplSemiSyncMaster::initObject()
346 {
347  int result;
348  const char *kWho = "ReplSemiSyncMaster::initObject";
349 
350  if (init_done_)
351  {
352  fprintf(stderr, "%s called twice\n", kWho);
353  return 1;
354  }
355  init_done_ = true;
356 
357  /* References to the parameter works after set_options(). */
358  setWaitTimeout(rpl_semi_sync_master_timeout);
359  setTraceLevel(rpl_semi_sync_master_trace_level);
360 
361  /* Mutex initialization can only be done after MY_INIT(). */
362  mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
363  &LOCK_binlog_, MY_MUTEX_INIT_FAST);
364  mysql_cond_init(key_ss_cond_COND_binlog_send_,
365  &COND_binlog_send_, NULL);
366 
367  if (rpl_semi_sync_master_enabled)
368  result = enableMaster();
369  else
370  result = disableMaster();
371 
372  return result;
373 }
374 
375 int ReplSemiSyncMaster::enableMaster()
376 {
377  int result = 0;
378 
379  /* Must have the lock when we do enable of disable. */
380  lock();
381 
382  if (!getMasterEnabled())
383  {
384  active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
385  if (active_tranxs_ != NULL)
386  {
387  commit_file_name_inited_ = false;
388  reply_file_name_inited_ = false;
389  wait_file_name_inited_ = false;
390 
391  set_master_enabled(true);
392  state_ = true;
393  sql_print_information("Semi-sync replication enabled on the master.");
394  }
395  else
396  {
397  sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
398  result = -1;
399  }
400  }
401 
402  unlock();
403 
404  return result;
405 }
406 
407 int ReplSemiSyncMaster::disableMaster()
408 {
409  /* Must have the lock when we do enable of disable. */
410  lock();
411 
412  if (getMasterEnabled())
413  {
414  /* Switch off the semi-sync first so that waiting transaction will be
415  * waken up.
416  */
417  switch_off();
418 
419  assert(active_tranxs_ != NULL);
420  delete active_tranxs_;
421  active_tranxs_ = NULL;
422 
423  reply_file_name_inited_ = false;
424  wait_file_name_inited_ = false;
425  commit_file_name_inited_ = false;
426 
427  set_master_enabled(false);
428  sql_print_information("Semi-sync replication disabled on the master.");
429  }
430 
431  unlock();
432 
433  return 0;
434 }
435 
436 ReplSemiSyncMaster::~ReplSemiSyncMaster()
437 {
438  if (init_done_)
439  {
440  mysql_mutex_destroy(&LOCK_binlog_);
441  mysql_cond_destroy(&COND_binlog_send_);
442  }
443 
444  delete active_tranxs_;
445 }
446 
447 void ReplSemiSyncMaster::lock()
448 {
449  mysql_mutex_lock(&LOCK_binlog_);
450 }
451 
452 void ReplSemiSyncMaster::unlock()
453 {
454  mysql_mutex_unlock(&LOCK_binlog_);
455 }
456 
457 void ReplSemiSyncMaster::cond_broadcast()
458 {
459  mysql_cond_broadcast(&COND_binlog_send_);
460 }
461 
462 int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
463 {
464  const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
465  int wait_res;
466 
467  function_enter(kWho);
468  wait_res= mysql_cond_timedwait(&COND_binlog_send_,
469  &LOCK_binlog_, wait_time);
470  return function_exit(kWho, wait_res);
471 }
472 
473 void ReplSemiSyncMaster::add_slave()
474 {
475  lock();
476  rpl_semi_sync_master_clients++;
477  unlock();
478 }
479 
480 void ReplSemiSyncMaster::remove_slave()
481 {
482  lock();
483  rpl_semi_sync_master_clients--;
484 
485  /* If user has chosen not to wait if no semi-sync slave available
486  and the last semi-sync slave exits, turn off semi-sync on master
487  immediately.
488  */
489  if (!rpl_semi_sync_master_wait_no_slave &&
490  rpl_semi_sync_master_clients == 0)
491  switch_off();
492  unlock();
493 }
494 
495 bool ReplSemiSyncMaster::is_semi_sync_slave()
496 {
497  int null_value;
498  long long val= 0;
499  get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
500  return val;
501 }
502 
503 int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
504  const char *log_file_name,
505  my_off_t log_file_pos,
506  bool skipped_event)
507 {
508  const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
509  int cmp;
510  bool can_release_threads = false;
511  bool need_copy_send_pos = true;
512 
513  if (!(getMasterEnabled()))
514  return 0;
515 
516  function_enter(kWho);
517 
518  lock();
519 
520  /* This is the real check inside the mutex. */
521  if (!getMasterEnabled())
522  goto l_end;
523 
524  if (!is_on())
525  /* We check to see whether we can switch semi-sync ON. */
526  try_switch_on(server_id, log_file_name, log_file_pos);
527 
528  /* The position should increase monotonically, if there is only one
529  * thread sending the binlog to the slave.
530  * In reality, to improve the transaction availability, we allow multiple
531  * sync replication slaves. So, if any one of them get the transaction,
532  * the transaction session in the primary can move forward.
533  */
534  if (reply_file_name_inited_)
535  {
536  cmp = ActiveTranx::compare(log_file_name, log_file_pos,
537  reply_file_name_, reply_file_pos_);
538 
539  /* If the requested position is behind the sending binlog position,
540  * would not adjust sending binlog position.
541  * We based on the assumption that there are multiple semi-sync slave,
542  * and at least one of them shou/ld be up to date.
543  * If all semi-sync slaves are behind, at least initially, the primary
544  * can find the situation after the waiting timeout. After that, some
545  * slaves should catch up quickly.
546  */
547  if (cmp < 0)
548  {
549  /* If the position is behind, do not copy it. */
550  need_copy_send_pos = false;
551  }
552  }
553 
554  if (need_copy_send_pos)
555  {
556  strcpy(reply_file_name_, log_file_name);
557  reply_file_pos_ = log_file_pos;
558  reply_file_name_inited_ = true;
559 
560  /* Remove all active transaction nodes before this point. */
561  assert(active_tranxs_ != NULL);
562  active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
563 
564  if (trace_level_ & kTraceDetail)
565  {
566  if(!skipped_event)
567  sql_print_information("%s: Got reply at (%s, %lu)", kWho,
568  log_file_name, (unsigned long)log_file_pos);
569  else
570  sql_print_information("%s: Transaction skipped at (%s, %lu)", kWho,
571  log_file_name, (unsigned long)log_file_pos);
572  }
573  }
574 
575  if (rpl_semi_sync_master_wait_sessions > 0)
576  {
577  /* Let us check if some of the waiting threads doing a trx
578  * commit can now proceed.
579  */
580  cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
581  wait_file_name_, wait_file_pos_);
582  if (cmp >= 0)
583  {
584  /* Yes, at least one waiting thread can now proceed:
585  * let us release all waiting threads with a broadcast
586  */
587  can_release_threads = true;
588  wait_file_name_inited_ = false;
589  }
590  }
591 
592  l_end:
593  unlock();
594 
595  if (can_release_threads)
596  {
597  if (trace_level_ & kTraceDetail)
598  sql_print_information("%s: signal all waiting threads.", kWho);
599 
600  cond_broadcast();
601  }
602 
603  return function_exit(kWho, 0);
604 }
605 
606 int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
607  my_off_t trx_wait_binlog_pos)
608 {
609  const char *kWho = "ReplSemiSyncMaster::commitTrx";
610 
611  function_enter(kWho);
612 
613  if (getMasterEnabled() && trx_wait_binlog_name)
614  {
615  struct timespec start_ts;
616  struct timespec abstime;
617  int wait_result;
618  PSI_stage_info old_stage;
619 
620  set_timespec(start_ts, 0);
621 
622  /* Acquire the mutex. */
623  lock();
624 
625  /* This must be called after acquired the lock */
626  THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_,
627  & stage_waiting_for_semi_sync_ack_from_slave,
628  & old_stage);
629 
630  /* This is the real check inside the mutex. */
631  if (!getMasterEnabled() || !is_on())
632  goto l_end;
633 
634  if (trace_level_ & kTraceDetail)
635  {
636  sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
637  trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
638  (int)is_on());
639  }
640 
641  /* Calcuate the waiting period. */
642 #ifdef __WIN__
643  abstime.tv.i64 = start_ts.tv.i64 + (__int64)wait_timeout_ * TIME_THOUSAND * 10;
644  abstime.max_timeout_msec= (long)wait_timeout_;
645 #else
646  abstime.tv_sec = start_ts.tv_sec + wait_timeout_ / TIME_THOUSAND;
647  abstime.tv_nsec = start_ts.tv_nsec +
648  (wait_timeout_ % TIME_THOUSAND) * TIME_MILLION;
649  if (abstime.tv_nsec >= TIME_BILLION)
650  {
651  abstime.tv_sec++;
652  abstime.tv_nsec -= TIME_BILLION;
653  }
654 #endif /* __WIN__ */
655 
656  while (is_on())
657  {
658  if (reply_file_name_inited_)
659  {
660  int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
661  trx_wait_binlog_name, trx_wait_binlog_pos);
662  if (cmp >= 0)
663  {
664  /* We have already sent the relevant binlog to the slave: no need to
665  * wait here.
666  */
667  if (trace_level_ & kTraceDetail)
668  sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
669  kWho, reply_file_name_, (unsigned long)reply_file_pos_);
670  break;
671  }
672  }
673 
674  /* Let us update the info about the minimum binlog position of waiting
675  * threads.
676  */
677  if (wait_file_name_inited_)
678  {
679  int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
680  wait_file_name_, wait_file_pos_);
681  if (cmp <= 0)
682  {
683  /* This thd has a lower position, let's update the minimum info. */
684  strcpy(wait_file_name_, trx_wait_binlog_name);
685  wait_file_pos_ = trx_wait_binlog_pos;
686 
687  rpl_semi_sync_master_wait_pos_backtraverse++;
688  if (trace_level_ & kTraceDetail)
689  sql_print_information("%s: move back wait position (%s, %lu),",
690  kWho, wait_file_name_, (unsigned long)wait_file_pos_);
691  }
692  }
693  else
694  {
695  strcpy(wait_file_name_, trx_wait_binlog_name);
696  wait_file_pos_ = trx_wait_binlog_pos;
697  wait_file_name_inited_ = true;
698 
699  if (trace_level_ & kTraceDetail)
700  sql_print_information("%s: init wait position (%s, %lu),",
701  kWho, wait_file_name_, (unsigned long)wait_file_pos_);
702  }
703 
704  /* In semi-synchronous replication, we wait until the binlog-dump
705  * thread has received the reply on the relevant binlog segment from the
706  * replication slave.
707  *
708  * Let us suspend this thread to wait on the condition;
709  * when replication has progressed far enough, we will release
710  * these waiting threads.
711  */
712  rpl_semi_sync_master_wait_sessions++;
713 
714  if (trace_level_ & kTraceDetail)
715  sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
716  kWho, wait_timeout_,
717  wait_file_name_, (unsigned long)wait_file_pos_);
718 
719  wait_result = cond_timewait(&abstime);
720  rpl_semi_sync_master_wait_sessions--;
721 
722  if (wait_result != 0)
723  {
724  /* This is a real wait timeout. */
725  sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
726  "semi-sync up to file %s, position %lu.",
727  trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
728  reply_file_name_, (unsigned long)reply_file_pos_);
729  rpl_semi_sync_master_wait_timeouts++;
730 
731  /* switch semi-sync off */
732  switch_off();
733  }
734  else
735  {
736  int wait_time;
737 
738  wait_time = getWaitTime(start_ts);
739  if (wait_time < 0)
740  {
741  if (trace_level_ & kTraceGeneral)
742  {
743  sql_print_information("Assessment of waiting time for commitTrx "
744  "failed at wait position (%s, %lu)",
745  trx_wait_binlog_name,
746  (unsigned long)trx_wait_binlog_pos);
747  }
748  rpl_semi_sync_master_timefunc_fails++;
749  }
750  else
751  {
752  rpl_semi_sync_master_trx_wait_num++;
753  rpl_semi_sync_master_trx_wait_time += wait_time;
754  }
755  }
756  }
757 
758  l_end:
759  /*
760  At this point, the binlog file and position of this transaction
761  must have been removed from ActiveTranx.
762  */
763  assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
764  trx_wait_binlog_pos));
765 
766  /* Update the status counter. */
767  if (is_on())
768  rpl_semi_sync_master_yes_transactions++;
769  else
770  rpl_semi_sync_master_no_transactions++;
771 
772  /* The lock held will be released by thd_exit_cond, so no need to
773  call unlock() here */
774  THD_EXIT_COND(NULL, & old_stage);
775  }
776 
777  return function_exit(kWho, 0);
778 }
779 
780 /* Indicate that semi-sync replication is OFF now.
781  *
782  * What should we do when it is disabled? The problem is that we want
783  * the semi-sync replication enabled again when the slave catches up
784  * later. But, it is not that easy to detect that the slave has caught
785  * up. This is caused by the fact that MySQL's replication protocol is
786  * asynchronous, meaning that if the master does not use the semi-sync
787  * protocol, the slave would not send anything to the master.
788  * Still, if the master is sending (N+1)-th event, we assume that it is
789  * an indicator that the slave has received N-th event and earlier ones.
790  *
791  * If semi-sync is disabled, all transactions still update the wait
792  * position with the last position in binlog. But no transactions will
793  * wait for confirmations and the active transaction list would not be
794  * maintained. In binlog dump thread, updateSyncHeader() checks whether
795  * the current sending event catches up with last wait position. If it
796  * does match, semi-sync will be switched on again.
797  */
798 int ReplSemiSyncMaster::switch_off()
799 {
800  const char *kWho = "ReplSemiSyncMaster::switch_off";
801  int result;
802 
803  function_enter(kWho);
804  state_ = false;
805 
806  /* Clear the active transaction list. */
807  assert(active_tranxs_ != NULL);
808  result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
809 
810  rpl_semi_sync_master_off_times++;
811  wait_file_name_inited_ = false;
812  reply_file_name_inited_ = false;
813  sql_print_information("Semi-sync replication switched OFF.");
814  cond_broadcast(); /* wake up all waiting threads */
815 
816  return function_exit(kWho, result);
817 }
818 
819 int ReplSemiSyncMaster::try_switch_on(int server_id,
820  const char *log_file_name,
821  my_off_t log_file_pos)
822 {
823  const char *kWho = "ReplSemiSyncMaster::try_switch_on";
824  bool semi_sync_on = false;
825 
826  function_enter(kWho);
827 
828  /* If the current sending event's position is larger than or equal to the
829  * 'largest' commit transaction binlog position, the slave is already
830  * catching up now and we can switch semi-sync on here.
831  * If commit_file_name_inited_ indicates there are no recent transactions,
832  * we can enable semi-sync immediately.
833  */
834  if (commit_file_name_inited_)
835  {
836  int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
837  commit_file_name_, commit_file_pos_);
838  semi_sync_on = (cmp >= 0);
839  }
840  else
841  {
842  semi_sync_on = true;
843  }
844 
845  if (semi_sync_on)
846  {
847  /* Switch semi-sync replication on. */
848  state_ = true;
849 
850  sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
851  "at (%s, %lu)",
852  server_id, log_file_name,
853  (unsigned long)log_file_pos);
854  }
855 
856  return function_exit(kWho, 0);
857 }
858 
859 int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
860  unsigned long size)
861 {
862  const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
863  function_enter(kWho);
864 
865  int hlen=0;
866  if (!is_semi_sync_slave())
867  {
868  hlen= 0;
869  }
870  else
871  {
872  /* No enough space for the extra header, disable semi-sync master */
873  if (sizeof(kSyncHeader) > size)
874  {
875  sql_print_warning("No enough space in the packet "
876  "for semi-sync extra header, "
877  "semi-sync replication disabled");
878  disableMaster();
879  return 0;
880  }
881 
882  /* Set the magic number and the sync status. By default, no sync
883  * is required.
884  */
885  memcpy(header, kSyncHeader, sizeof(kSyncHeader));
886  hlen= sizeof(kSyncHeader);
887  }
888  return function_exit(kWho, hlen);
889 }
890 
891 int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
892  const char *log_file_name,
893  my_off_t log_file_pos,
894  uint32 server_id)
895 {
896  const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
897  int cmp = 0;
898  bool sync = false;
899 
900  /* If the semi-sync master is not enabled, or the slave is not a semi-sync
901  * target, do not request replies from the slave.
902  */
903  if (!getMasterEnabled() || !is_semi_sync_slave())
904  {
905  sync = false;
906  return 0;
907  }
908 
909  function_enter(kWho);
910 
911  lock();
912 
913  /* This is the real check inside the mutex. */
914  if (!getMasterEnabled())
915  {
916  sync = false;
917  goto l_end;
918  }
919 
920  if (is_on())
921  {
922  /* semi-sync is ON */
923  sync = false; /* No sync unless a transaction is involved. */
924 
925  if (reply_file_name_inited_)
926  {
927  cmp = ActiveTranx::compare(log_file_name, log_file_pos,
928  reply_file_name_, reply_file_pos_);
929  if (cmp <= 0)
930  {
931  /* If we have already got the reply for the event, then we do
932  * not need to sync the transaction again.
933  */
934  goto l_end;
935  }
936  }
937 
938  if (wait_file_name_inited_)
939  {
940  cmp = ActiveTranx::compare(log_file_name, log_file_pos,
941  wait_file_name_, wait_file_pos_);
942  }
943  else
944  {
945  cmp = 1;
946  }
947 
948  /* If we are already waiting for some transaction replies which
949  * are later in binlog, do not wait for this one event.
950  */
951  if (cmp >= 0)
952  {
953  /*
954  * We only wait if the event is a transaction's ending event.
955  */
956  assert(active_tranxs_ != NULL);
957  sync = active_tranxs_->is_tranx_end_pos(log_file_name,
958  log_file_pos);
959  }
960  }
961  else
962  {
963  if (commit_file_name_inited_)
964  {
965  int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
966  commit_file_name_, commit_file_pos_);
967  sync = (cmp >= 0);
968  }
969  else
970  {
971  sync = true;
972  }
973  }
974 
975  if (trace_level_ & kTraceDetail)
976  sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
977  kWho, server_id, log_file_name,
978  (unsigned long)log_file_pos, sync, (int)is_on());
979 
980  l_end:
981  unlock();
982 
983  /* We do not need to clear sync flag because we set it to 0 when we
984  * reserve the packet header.
985  */
986  if (sync)
987  {
988  (packet)[2] = kPacketFlagSync;
989  }
990 
991  return function_exit(kWho, 0);
992 }
993 
994 int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
995  my_off_t log_file_pos)
996 {
997  const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
998  int result = 0;
999 
1000  function_enter(kWho);
1001 
1002  lock();
1003 
1004  /* This is the real check inside the mutex. */
1005  if (!getMasterEnabled())
1006  goto l_end;
1007 
1008  /* Update the 'largest' transaction commit position seen so far even
1009  * though semi-sync is switched off.
1010  * It is much better that we update commit_file_* here, instead of
1011  * inside commitTrx(). This is mostly because updateSyncHeader()
1012  * will watch for commit_file_* to decide whether to switch semi-sync
1013  * on. The detailed reason is explained in function updateSyncHeader().
1014  */
1015  if (commit_file_name_inited_)
1016  {
1017  int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1018  commit_file_name_, commit_file_pos_);
1019  if (cmp > 0)
1020  {
1021  /* This is a larger position, let's update the maximum info. */
1022  strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1023  commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
1024  commit_file_pos_ = log_file_pos;
1025  }
1026  }
1027  else
1028  {
1029  strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1030  commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
1031  commit_file_pos_ = log_file_pos;
1032  commit_file_name_inited_ = true;
1033  }
1034 
1035  if (is_on())
1036  {
1037  assert(active_tranxs_ != NULL);
1038  if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
1039  {
1040  /*
1041  if insert tranx_node failed, print a warning message
1042  and turn off semi-sync
1043  */
1044  sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
1045  log_file_name, (ulong)log_file_pos);
1046  switch_off();
1047  }
1048  }
1049 
1050  l_end:
1051  unlock();
1052 
1053  return function_exit(kWho, result);
1054 }
1055 
1056 int ReplSemiSyncMaster::skipSlaveReply(const char *event_buf,
1057  uint32 server_id,
1058  const char* skipped_log_file,
1059  my_off_t skipped_log_pos)
1060 {
1061  const char *kWho = "ReplSemiSyncMaster::skipSlaveReply";
1062 
1063  function_enter(kWho);
1064 
1065  assert((unsigned char)event_buf[1] == kPacketMagicNum);
1066  if ((unsigned char)event_buf[2] != kPacketFlagSync)
1067  {
1068  /* current event would not require a reply anyway */
1069  goto l_end;
1070  }
1071 
1072  reportReplyBinlog(server_id, skipped_log_file,
1073  skipped_log_pos, true);
1074 
1075  l_end:
1076  return function_exit(kWho, 0);
1077 }
1078 
1079 int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
1080  const char *event_buf)
1081 {
1082  const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
1083  const unsigned char *packet;
1084  char log_file_name[FN_REFLEN];
1085  my_off_t log_file_pos;
1086  ulong log_file_len = 0;
1087  ulong packet_len;
1088  int result = -1;
1089 
1090  struct timespec start_ts= { 0, 0 };
1091  ulong trc_level = trace_level_;
1092 
1093  function_enter(kWho);
1094 
1095  assert((unsigned char)event_buf[1] == kPacketMagicNum);
1096  if ((unsigned char)event_buf[2] != kPacketFlagSync)
1097  {
1098  /* current event does not require reply */
1099  result = 0;
1100  goto l_end;
1101  }
1102 
1103  if (trc_level & kTraceNetWait)
1104  set_timespec(start_ts, 0);
1105 
1106  /* We flush to make sure that the current event is sent to the network,
1107  * instead of being buffered in the TCP/IP stack.
1108  */
1109  if (net_flush(net))
1110  {
1111  sql_print_error("Semi-sync master failed on net_flush() "
1112  "before waiting for slave reply");
1113  goto l_end;
1114  }
1115 
1116  net_clear(net, 0);
1117  if (trc_level & kTraceDetail)
1118  sql_print_information("%s: Wait for replica's reply", kWho);
1119 
1120  /* Wait for the network here. Though binlog dump thread can indefinitely wait
1121  * here, transactions would not wait indefintely.
1122  * Transactions wait on binlog replies detected by binlog dump threads. If
1123  * binlog dump threads wait too long, transactions will timeout and continue.
1124  */
1125  packet_len = my_net_read(net);
1126 
1127  if (trc_level & kTraceNetWait)
1128  {
1129  int wait_time = getWaitTime(start_ts);
1130  if (wait_time < 0)
1131  {
1132  sql_print_information("Assessment of waiting time for "
1133  "readSlaveReply failed.");
1134  rpl_semi_sync_master_timefunc_fails++;
1135  }
1136  else
1137  {
1138  rpl_semi_sync_master_net_wait_num++;
1139  rpl_semi_sync_master_net_wait_time += wait_time;
1140  }
1141  }
1142 
1143  if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
1144  {
1145  if (packet_len == packet_error)
1146  sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
1147  net->last_error, net->last_errno);
1148  else
1149  sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
1150  net->last_error, net->last_errno);
1151  goto l_end;
1152  }
1153 
1154  packet = net->read_pos;
1155  if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
1156  {
1157  sql_print_error("Read semi-sync reply magic number error");
1158  goto l_end;
1159  }
1160 
1161  log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
1162  log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
1163  if (log_file_len >= FN_REFLEN)
1164  {
1165  sql_print_error("Read semi-sync reply binlog file length too large");
1166  goto l_end;
1167  }
1168  strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
1169  log_file_name[log_file_len] = 0;
1170 
1171  if (trc_level & kTraceDetail)
1172  sql_print_information("%s: Got reply (%s, %lu)",
1173  kWho, log_file_name, (ulong)log_file_pos);
1174 
1175  result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
1176 
1177  l_end:
1178  return function_exit(kWho, result);
1179 }
1180 
1181 
1182 int ReplSemiSyncMaster::resetMaster()
1183 {
1184  const char *kWho = "ReplSemiSyncMaster::resetMaster";
1185  int result = 0;
1186 
1187  function_enter(kWho);
1188 
1189 
1190  lock();
1191 
1192  state_ = getMasterEnabled()? 1 : 0;
1193 
1194  wait_file_name_inited_ = false;
1195  reply_file_name_inited_ = false;
1196  commit_file_name_inited_ = false;
1197 
1198  rpl_semi_sync_master_yes_transactions = 0;
1199  rpl_semi_sync_master_no_transactions = 0;
1200  rpl_semi_sync_master_off_times = 0;
1201  rpl_semi_sync_master_timefunc_fails = 0;
1202  rpl_semi_sync_master_wait_sessions = 0;
1203  rpl_semi_sync_master_wait_pos_backtraverse = 0;
1204  rpl_semi_sync_master_trx_wait_num = 0;
1205  rpl_semi_sync_master_trx_wait_time = 0;
1206  rpl_semi_sync_master_net_wait_num = 0;
1207  rpl_semi_sync_master_net_wait_time = 0;
1208 
1209  unlock();
1210 
1211  return function_exit(kWho, result);
1212 }
1213 
1214 void ReplSemiSyncMaster::setExportStats()
1215 {
1216  lock();
1217 
1218  rpl_semi_sync_master_status = state_;
1219  rpl_semi_sync_master_avg_trx_wait_time=
1220  ((rpl_semi_sync_master_trx_wait_num) ?
1221  (unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
1222  ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
1223  rpl_semi_sync_master_avg_net_wait_time=
1224  ((rpl_semi_sync_master_net_wait_num) ?
1225  (unsigned long)((double)rpl_semi_sync_master_net_wait_time /
1226  ((double)rpl_semi_sync_master_net_wait_num)) : 0);
1227 
1228  unlock();
1229 }
1230 
1231 /* Get the waiting time given the wait's staring time.
1232  *
1233  * Return:
1234  * >= 0: the waiting time in microsecons(us)
1235  * < 0: error in get time or time back traverse
1236  */
1237 static int getWaitTime(const struct timespec& start_ts)
1238 {
1239  unsigned long long start_usecs, end_usecs;
1240  struct timespec end_ts;
1241 
1242  /* Starting time in microseconds(us). */
1243  start_usecs = timespec_to_usec(&start_ts);
1244 
1245  /* Get the wait time interval. */
1246  set_timespec(end_ts, 0);
1247 
1248  /* Ending time in microseconds(us). */
1249  end_usecs = timespec_to_usec(&end_ts);
1250 
1251  if (end_usecs < start_usecs)
1252  return -1;
1253 
1254  return (int)(end_usecs - start_usecs);
1255 }