MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_handler.h
1 /* Copyright (c) 2008, 2010, 2012 Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software Foundation,
14  51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
15 
16 #ifndef RPL_HANDLER_H
17 #define RPL_HANDLER_H
18 
19 #include "sql_priv.h"
20 #include "rpl_gtid.h"
21 #include "rpl_mi.h"
22 #include "rpl_rli.h"
23 #include "sql_plugin.h"
24 #include "replication.h"
25 
27 public:
28  void *observer;
29  st_plugin_int *plugin_int;
30  plugin_ref plugin;
31 
32  Observer_info(void *ob, st_plugin_int *p)
33  :observer(ob), plugin_int(p)
34  {
35  plugin= plugin_int_to_ref(plugin_int);
36  }
37 };
38 
39 class Delegate {
40 public:
43 
44  int add_observer(void *observer, st_plugin_int *plugin)
45  {
46  int ret= FALSE;
47  if (!inited)
48  return TRUE;
49  write_lock();
50  Observer_info_iterator iter(observer_info_list);
51  Observer_info *info= iter++;
52  while (info && info->observer != observer)
53  info= iter++;
54  if (!info)
55  {
56  info= new Observer_info(observer, plugin);
57  if (!info || observer_info_list.push_back(info, &memroot))
58  ret= TRUE;
59  }
60  else
61  ret= TRUE;
62  unlock();
63  return ret;
64  }
65 
66  int remove_observer(void *observer, st_plugin_int *plugin)
67  {
68  int ret= FALSE;
69  if (!inited)
70  return TRUE;
71  write_lock();
72  Observer_info_iterator iter(observer_info_list);
73  Observer_info *info= iter++;
74  while (info && info->observer != observer)
75  info= iter++;
76  if (info)
77  {
78  iter.remove();
79  delete info;
80  }
81  else
82  ret= TRUE;
83  unlock();
84  return ret;
85  }
86 
87  inline Observer_info_iterator observer_info_iter()
88  {
89  return Observer_info_iterator(observer_info_list);
90  }
91 
92  inline bool is_empty()
93  {
94  DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty()));
95  return observer_info_list.is_empty();
96  }
97 
98  inline int read_lock()
99  {
100  if (!inited)
101  return TRUE;
102  return rw_rdlock(&lock);
103  }
104 
105  inline int write_lock()
106  {
107  if (!inited)
108  return TRUE;
109  return rw_wrlock(&lock);
110  }
111 
112  inline int unlock()
113  {
114  if (!inited)
115  return TRUE;
116  return rw_unlock(&lock);
117  }
118 
119  inline bool is_inited()
120  {
121  return inited;
122  }
123 
124  Delegate()
125  {
126  inited= FALSE;
127  if (my_rwlock_init(&lock, NULL))
128  return;
129  init_sql_alloc(&memroot, 1024, 0);
130  inited= TRUE;
131  }
132  ~Delegate()
133  {
134  inited= FALSE;
135  rwlock_destroy(&lock);
136  free_root(&memroot, MYF(0));
137  }
138 
139 private:
140  Observer_info_list observer_info_list;
141  rw_lock_t lock;
142  MEM_ROOT memroot;
143  bool inited;
144 };
145 
147  :public Delegate {
148 public:
149  typedef Trans_observer Observer;
150  int before_commit(THD *thd, bool all);
151  int before_rollback(THD *thd, bool all);
152  int after_commit(THD *thd, bool all);
153  int after_rollback(THD *thd, bool all);
154 };
155 
157  :public Delegate {
158 public:
160  int after_flush(THD *thd, const char *log_file,
161  my_off_t log_pos);
162 };
163 
164 #ifdef HAVE_REPLICATION
165 class Binlog_transmit_delegate
166  :public Delegate {
167 public:
168  typedef Binlog_transmit_observer Observer;
169  int transmit_start(THD *thd, ushort flags,
170  const char *log_file, my_off_t log_pos);
171  int transmit_stop(THD *thd, ushort flags);
172  int reserve_header(THD *thd, ushort flags, String *packet);
173  int before_send_event(THD *thd, ushort flags,
174  String *packet, const
175  char *log_file, my_off_t log_pos );
176  int after_send_event(THD *thd, ushort flags,
177  String *packet, const char *skipped_log_file,
178  my_off_t skipped_log_pos);
179  int after_reset_master(THD *thd, ushort flags);
180 };
181 
182 class Binlog_relay_IO_delegate
183  :public Delegate {
184 public:
185  typedef Binlog_relay_IO_observer Observer;
186  int thread_start(THD *thd, Master_info *mi);
187  int thread_stop(THD *thd, Master_info *mi);
188  int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
189  int after_read_event(THD *thd, Master_info *mi,
190  const char *packet, ulong len,
191  const char **event_buf, ulong *event_len);
192  int after_queue_event(THD *thd, Master_info *mi,
193  const char *event_buf, ulong event_len,
194  bool synced);
195  int after_reset_slave(THD *thd, Master_info *mi);
196 private:
197  void init_param(Binlog_relay_IO_param *param, Master_info *mi);
198 };
199 #endif /* HAVE_REPLICATION */
200 
201 int delegates_init();
202 void delegates_destroy();
203 
204 extern Trans_delegate *transaction_delegate;
205 extern Binlog_storage_delegate *binlog_storage_delegate;
206 #ifdef HAVE_REPLICATION
207 extern Binlog_transmit_delegate *binlog_transmit_delegate;
208 extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
209 #endif /* HAVE_REPLICATION */
210 
211 /*
212  if there is no observers in the delegate, we can return 0
213  immediately.
214 */
215 #define RUN_HOOK(group, hook, args) \
216  (group ##_delegate->is_empty() ? \
217  0 : group ##_delegate->hook args)
218 
219 #endif /* RPL_HANDLER_H */