MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_injector.cc
1 /* Copyright (c) 2006, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 #include "sql_priv.h"
17 #include "unireg.h" // REQUIRED by later includes
18 #include "rpl_injector.h"
19 #include "transaction.h"
20 #include "sql_parse.h" // begin_trans, end_trans, COMMIT
21 #include "sql_base.h" // close_thread_tables
22 #include "log_event.h" // Incident_log_event
23 #include "binlog.h" // mysql_bin_log
24 
25 /*
26  injector::transaction - member definitions
27 */
28 
29 /* inline since it's called below */
30 inline
31 injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
32  : m_state(START_STATE), m_thd(thd)
33 {
34  /*
35  Default initialization of m_start_pos (which initializes it to garbage).
36  We need to fill it in using the code below.
37  */
38  LOG_INFO log_info;
39  log->get_current_log(&log_info);
40  /* !!! binlog_pos does not follow RAII !!! */
41  m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
42  m_start_pos.m_file_pos= log_info.pos;
43 
44  if (unlikely(m_start_pos.m_file_name == NULL))
45  {
46  m_thd= NULL;
47  return;
48  }
49 
50  /*
51  Next pos is unknown until after commit of the Binlog transaction
52  */
53  m_next_pos.m_file_name= 0;
54  m_next_pos.m_file_pos= 0;
55 
56  /*
57  Ensure we don't pick up this thd's last written Binlog pos in
58  empty-transaction-commit cases.
59  This is not ideal, as it zaps this information for any other
60  usage (e.g. WL4047)
61  Potential improvement : save the 'old' next pos prior to
62  commit, and restore on error.
63  */
64  m_thd->clear_next_event_pos();
65 
66  trans_begin(m_thd);
67 }
68 
69 injector::transaction::~transaction()
70 {
71  if (!good())
72  return;
73 
74  /* Needed since my_free expects a 'char*' (instead of 'void*'). */
75  char* const start_pos_memory= const_cast<char*>(m_start_pos.m_file_name);
76 
77  if (start_pos_memory)
78  {
79  my_free(start_pos_memory);
80  }
81 
82  char* const next_pos_memory= const_cast<char*>(m_next_pos.m_file_name);
83  if (next_pos_memory)
84  {
85  my_free(next_pos_memory);
86  }
87 }
88 
94 {
95  DBUG_ENTER("injector::transaction::commit()");
96  int error= m_thd->binlog_flush_pending_rows_event(true);
97  /*
98  Cluster replication does not preserve statement or
99  transaction boundaries of the master. Instead, a new
100  transaction on replication slave is started when a new GCI
101  (global checkpoint identifier) is issued, and is committed
102  when the last event of the check point has been received and
103  processed. This ensures consistency of each cluster in
104  cluster replication, and there is no requirement for stronger
105  consistency: MySQL replication is asynchronous with other
106  engines as well.
107 
108  A practical consequence of that is that row level replication
109  stream passed through the injector thread never contains
110  COMMIT events.
111  Here we should preserve the server invariant that there is no
112  outstanding statement transaction when the normal transaction
113  is committed by committing the statement transaction
114  explicitly.
115  */
116  trans_commit_stmt(m_thd);
117  if (!trans_commit(m_thd))
118  {
119  close_thread_tables(m_thd);
120  m_thd->mdl_context.release_transactional_locks();
121  }
122 
123  /* Copy next position out into our next pos member */
124  if ((error == 0) &&
125  (m_thd->binlog_next_event_pos.file_name != NULL) &&
126  ((m_next_pos.m_file_name=
127  my_strdup(m_thd->binlog_next_event_pos.file_name, MYF(0))) != NULL))
128  {
129  m_next_pos.m_file_pos= m_thd->binlog_next_event_pos.pos;
130  }
131  else
132  {
133  /* Error, problem copying etc. */
134  m_next_pos.m_file_name= NULL;
135  m_next_pos.m_file_pos= 0;
136  }
137 
138  DBUG_RETURN(error);
139 }
140 
141 
142 int injector::transaction::rollback()
143 {
144  DBUG_ENTER("injector::transaction::rollback()");
145  trans_rollback_stmt(m_thd);
146  if (!trans_rollback(m_thd))
147  {
148  close_thread_tables(m_thd);
149  if (!m_thd->locked_tables_mode)
150  m_thd->mdl_context.release_transactional_locks();
151  }
152  DBUG_RETURN(0);
153 }
154 
155 
156 int injector::transaction::use_table(server_id_type sid, table tbl)
157 {
158  DBUG_ENTER("injector::transaction::use_table");
159 
160  int error;
161 
162  if ((error= check_state(TABLE_STATE)))
163  DBUG_RETURN(error);
164 
165  server_id_type save_id= m_thd->server_id;
166  m_thd->set_server_id(sid);
167  error= m_thd->binlog_write_table_map(tbl.get_table(),
168  tbl.is_transactional(), FALSE);
169  m_thd->set_server_id(save_id);
170  DBUG_RETURN(error);
171 }
172 
173 
174 int injector::transaction::write_row (server_id_type sid, table tbl,
175  MY_BITMAP const* cols, size_t colcnt,
176  record_type record,
177  const uchar* extra_row_info)
178 {
179  DBUG_ENTER("injector::transaction::write_row(...)");
180 
181  int error= check_state(ROW_STATE);
182  if (error)
183  DBUG_RETURN(error);
184 
185  server_id_type save_id= m_thd->server_id;
186  m_thd->set_server_id(sid);
187  table::save_sets saveset(tbl, cols, cols);
188 
189  error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
190  record, extra_row_info);
191  m_thd->set_server_id(save_id);
192  DBUG_RETURN(error);
193 }
194 
195 int injector::transaction::write_row (server_id_type sid, table tbl,
196  MY_BITMAP const* cols, size_t colcnt,
197  record_type record)
198 {
199  return write_row(sid, tbl, cols, colcnt, record, NULL);
200 }
201 
202 
203 int injector::transaction::delete_row(server_id_type sid, table tbl,
204  MY_BITMAP const* cols, size_t colcnt,
205  record_type record,
206  const uchar* extra_row_info)
207 {
208  DBUG_ENTER("injector::transaction::delete_row(...)");
209 
210  int error= check_state(ROW_STATE);
211  if (error)
212  DBUG_RETURN(error);
213 
214  server_id_type save_id= m_thd->server_id;
215  m_thd->set_server_id(sid);
216  table::save_sets saveset(tbl, cols, cols);
217  error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
218  record, extra_row_info);
219  m_thd->set_server_id(save_id);
220  DBUG_RETURN(error);
221 }
222 
223 int injector::transaction::delete_row(server_id_type sid, table tbl,
224  MY_BITMAP const* cols, size_t colcnt,
225  record_type record)
226 {
227  return delete_row(sid, tbl, cols, colcnt, record, NULL);
228 }
229 
230 
231 int injector::transaction::update_row(server_id_type sid, table tbl,
232  MY_BITMAP const* cols, size_t colcnt,
233  record_type before, record_type after,
234  const uchar* extra_row_info)
235 {
236  DBUG_ENTER("injector::transaction::update_row(...)");
237 
238  int error= check_state(ROW_STATE);
239  if (error)
240  DBUG_RETURN(error);
241 
242  server_id_type save_id= m_thd->server_id;
243  m_thd->set_server_id(sid);
244  // The read- and write sets with autorestore (in the destructor)
245  table::save_sets saveset(tbl, cols, cols);
246 
247  error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
248  before, after, extra_row_info);
249  m_thd->set_server_id(save_id);
250  DBUG_RETURN(error);
251 }
252 
253 int injector::transaction::update_row(server_id_type sid, table tbl,
254  MY_BITMAP const* cols, size_t colcnt,
255  record_type before, record_type after)
256 {
257  return update_row(sid, tbl, cols, colcnt, before, after, NULL);
258 }
259 
260 injector::transaction::binlog_pos injector::transaction::start_pos() const
261 {
262  return m_start_pos;
263 }
264 
265 injector::transaction::binlog_pos injector::transaction::next_pos() const
266 {
267  return m_next_pos;
268 }
269 
270 /*
271  injector - member definitions
272 */
273 
274 /* This constructor is called below */
275 inline injector::injector()
276 {
277 }
278 
279 static injector *s_injector= 0;
280 injector *injector::instance()
281 {
282  if (s_injector == 0)
283  s_injector= new injector;
284  /* "There can be only one [instance]" */
285  return s_injector;
286 }
287 
288 void injector::free_instance()
289 {
290  injector *inj = s_injector;
291 
292  if (inj != 0)
293  {
294  s_injector= 0;
295  delete inj;
296  }
297 }
298 
299 void injector::new_trans(THD *thd, injector::transaction *ptr)
300 {
301  DBUG_ENTER("injector::new_trans(THD *, transaction *)");
302  /*
303  Currently, there is no alternative to using 'mysql_bin_log' since that
304  is hardcoded into the way the handler is using the binary log.
305  */
306  transaction trans(&mysql_bin_log, thd);
307  ptr->swap(trans);
308 
309  DBUG_VOID_RETURN;
310 }
311 
312 int injector::record_incident(THD *thd, Incident incident)
313 {
314  Incident_log_event ev(thd, incident);
315  return mysql_bin_log.write_incident(&ev, true/*need_lock_log=true*/);
316 }
317 
318 int injector::record_incident(THD *thd, Incident incident, LEX_STRING const message)
319 {
320  Incident_log_event ev(thd, incident, message);
321  return mysql_bin_log.write_incident(&ev, true/*need_lock_log=true*/);
322 }