19 #include "semisync_master.h" 
   20 #include "sql_class.h"                           
   32   if (repl_semisync.getMasterEnabled())
 
   39     error= repl_semisync.writeTranxInBinlog(log_file,
 
   54   bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
 
   56   if (is_real_trans && param->log_pos)
 
   58     const char *binlog_name= param->log_file;
 
   59     return repl_semisync.commitTrx(binlog_name, param->log_pos);
 
   66   return repl_semi_report_commit(param);
 
   73   bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
 
   78     repl_semisync.add_slave();
 
   84     repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
 
   86   sql_print_information(
"Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
 
   87                         semi_sync_slave ? 
"semi-sync" : 
"asynchronous",
 
   88                         param->server_id, log_file, (
unsigned long)log_pos);
 
   95   bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
 
   97   sql_print_information(
"Stop %s binlog_dump to slave (server_id: %d)",
 
   98                         semi_sync_slave ? 
"semi-sync" : 
"asynchronous",
 
  103     repl_semisync.remove_slave();
 
  109                              unsigned char *header,
 
  110                              unsigned long size, 
unsigned long *len)
 
  112   *len +=  repl_semisync.reserveSyncHeader(header, size);
 
  117                                 unsigned char *packet, 
unsigned long len,
 
  118                                 const char *log_file, my_off_t log_pos)
 
  120   return repl_semisync.updateSyncHeader(packet,
 
  127                                const char *event_buf, 
unsigned long len,
 
  128                                const char * skipped_log_file,
 
  129                                my_off_t skipped_log_pos)
 
  131   if (repl_semisync.is_semi_sync_slave())
 
  133     if(skipped_log_pos>0)
 
  134       repl_semisync.skipSlaveReply(event_buf, param->server_id,
 
  135                                    skipped_log_file, skipped_log_pos);
 
  138       THD *thd= current_thd;
 
  144       (void) repl_semisync.readSlaveReply(&thd->net,
 
  145                                           param->server_id, event_buf);
 
  154   if (repl_semisync.resetMaster())
 
  164 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
 
  169 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
 
  174 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
 
  179 static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
 
  181  "Enable semi-synchronous replication master (disabled by default). ",
 
  183   &fix_rpl_semi_sync_master_enabled,    
 
  186 static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
 
  188  "The timeout value (in ms) for semi-synchronous replication in the master",
 
  190   fix_rpl_semi_sync_master_timeout,     
 
  193 static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
 
  195  "Wait until timeout when no semi-synchronous replication slave available (enabled by default). ",
 
  200 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
 
  202  "The tracing level for semi-sync replication.",
 
  204   &fix_rpl_semi_sync_master_trace_level, 
 
  207 static SYS_VAR* semi_sync_master_system_vars[]= {
 
  208   MYSQL_SYSVAR(enabled),
 
  209   MYSQL_SYSVAR(timeout),
 
  210   MYSQL_SYSVAR(wait_no_slave),
 
  211   MYSQL_SYSVAR(trace_level),
 
  216 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
 
  221   *(
unsigned long *)ptr= *(
unsigned long *)val;
 
  222   repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout);
 
  226 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
 
  231   *(
unsigned long *)ptr= *(
unsigned long *)val;
 
  232   repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level);
 
  236 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
 
  241   *(
char *)ptr= *(
char *)val;
 
  242   if (rpl_semi_sync_master_enabled)
 
  244     if (repl_semisync.enableMaster() != 0)
 
  245       rpl_semi_sync_master_enabled = 
false;
 
  249     if (repl_semisync.disableMaster() != 0)
 
  250       rpl_semi_sync_master_enabled = 
true;
 
  259   repl_semi_report_commit,      
 
  260   repl_semi_report_rollback,    
 
  266   repl_semi_report_binlog_update, 
 
  272   repl_semi_binlog_dump_start,  
 
  273   repl_semi_binlog_dump_end,    
 
  274   repl_semi_reserve_header,     
 
  275   repl_semi_before_send_event,  
 
  276   repl_semi_after_send_event,   
 
  277   repl_semi_reset_master,       
 
  281 #define SHOW_FNAME(name)                        \ 
  282   rpl_semi_sync_master_show_##name 
  284 #define DEF_SHOW_FUNC(name, show_type)                                  \ 
  285   static  int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \ 
  287     repl_semisync.setExportStats();                                     \ 
  288     var->type= show_type;                                               \ 
  289     var->value= (char *)&rpl_semi_sync_master_##name;                           \ 
  293 DEF_SHOW_FUNC(status, SHOW_BOOL)
 
  294 DEF_SHOW_FUNC(clients, SHOW_LONG)
 
  295 DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
 
  296 DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
 
  297 DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
 
  298 DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
 
  299 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
 
  300 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
 
  301 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
 
  305 static 
SHOW_VAR semi_sync_master_status_vars[]= {
 
  306   {
"Rpl_semi_sync_master_status",
 
  307    (
char*) &SHOW_FNAME(status),
 
  309   {
"Rpl_semi_sync_master_clients",
 
  310    (
char*) &SHOW_FNAME(clients),
 
  312   {
"Rpl_semi_sync_master_yes_tx",
 
  313    (
char*) &rpl_semi_sync_master_yes_transactions,
 
  315   {
"Rpl_semi_sync_master_no_tx",
 
  316    (
char*) &rpl_semi_sync_master_no_transactions,
 
  318   {
"Rpl_semi_sync_master_wait_sessions",
 
  319    (
char*) &SHOW_FNAME(wait_sessions),
 
  321   {
"Rpl_semi_sync_master_no_times",
 
  322    (
char*) &rpl_semi_sync_master_off_times,
 
  324   {
"Rpl_semi_sync_master_timefunc_failures",
 
  325    (
char*) &rpl_semi_sync_master_timefunc_fails,
 
  327   {
"Rpl_semi_sync_master_wait_pos_backtraverse",
 
  328    (
char*) &rpl_semi_sync_master_wait_pos_backtraverse,
 
  330   {
"Rpl_semi_sync_master_tx_wait_time",
 
  331    (
char*) &SHOW_FNAME(trx_wait_time),
 
  333   {
"Rpl_semi_sync_master_tx_waits",
 
  334    (
char*) &SHOW_FNAME(trx_wait_num),
 
  336   {
"Rpl_semi_sync_master_tx_avg_wait_time",
 
  337    (
char*) &SHOW_FNAME(avg_trx_wait_time),
 
  339   {
"Rpl_semi_sync_master_net_wait_time",
 
  340    (
char*) &SHOW_FNAME(net_wait_time),
 
  342   {
"Rpl_semi_sync_master_net_waits",
 
  343    (
char*) &SHOW_FNAME(net_wait_num),
 
  345   {
"Rpl_semi_sync_master_net_avg_wait_time",
 
  346    (
char*) &SHOW_FNAME(avg_net_wait_time),
 
  348   {NULL, NULL, SHOW_LONG},
 
  351 #ifdef HAVE_PSI_INTERFACE 
  352 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
 
  354 static PSI_mutex_info all_semisync_mutexes[]=
 
  356   { &key_ss_mutex_LOCK_binlog_, 
"LOCK_binlog_", 0}
 
  359 PSI_cond_key key_ss_cond_COND_binlog_send_;
 
  361 static PSI_cond_info all_semisync_conds[]=
 
  363   { &key_ss_cond_COND_binlog_send_, 
"COND_binlog_send_", 0}
 
  368 { 0, 
"Waiting for semi-sync ACK from slave", 0};
 
  370 #ifdef HAVE_PSI_INTERFACE 
  373   & stage_waiting_for_semi_sync_ack_from_slave
 
  376 static void init_semisync_psi_keys(
void)
 
  378   const char* category= 
"semisync";
 
  381   count= array_elements(all_semisync_mutexes);
 
  384   count= array_elements(all_semisync_conds);
 
  387   count= array_elements(all_semisync_stages);
 
  392 static int semi_sync_master_plugin_init(
void *p)
 
  394 #ifdef HAVE_PSI_INTERFACE 
  395   init_semisync_psi_keys();
 
  398   if (repl_semisync.initObject())
 
  400   if (register_trans_observer(&trans_observer, p))
 
  402   if (register_binlog_storage_observer(&storage_observer, p))
 
  404   if (register_binlog_transmit_observer(&transmit_observer, p))
 
  409 static int semi_sync_master_plugin_deinit(
void *p)
 
  411   if (unregister_trans_observer(&trans_observer, p))
 
  413     sql_print_error(
"unregister_trans_observer failed");
 
  416   if (unregister_binlog_storage_observer(&storage_observer, p))
 
  418     sql_print_error(
"unregister_binlog_storage_observer failed");
 
  421   if (unregister_binlog_transmit_observer(&transmit_observer, p))
 
  423     sql_print_error(
"unregister_binlog_transmit_observer failed");
 
  426   sql_print_information(
"unregister_replicator OK");
 
  431   MYSQL_REPLICATION_INTERFACE_VERSION
 
  437 mysql_declare_plugin(semi_sync_master)
 
  439   MYSQL_REPLICATION_PLUGIN,
 
  440   &semi_sync_master_plugin,
 
  441   "rpl_semi_sync_master",
 
  443   "Semi-synchronous replication master",
 
  445   semi_sync_master_plugin_init, 
 
  446   semi_sync_master_plugin_deinit, 
 
  448   semi_sync_master_status_vars, 
 
  449   semi_sync_master_system_vars, 
 
  453 mysql_declare_plugin_end;