MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
log_event_old.cc
1 /* Copyright (c) 2007, 2013, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 #include "sql_priv.h"
17 #ifndef MYSQL_CLIENT
18 #include "unireg.h"
19 #endif
20 #include "my_global.h" // REQUIRED by log_event.h > m_string.h > my_bitmap.h
21 #include "log_event.h"
22 #ifndef MYSQL_CLIENT
23 #include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
24 #include "sql_base.h" // close_tables_for_reopen
25 #include "key.h" // key_copy
26 #include "lock.h" // mysql_unlock_tables
27 #include "sql_parse.h" // mysql_reset_thd_for_next_command
28 #include "rpl_rli.h"
29 #include "rpl_utility.h"
30 #endif
31 #include "log_event_old.h"
32 #include "rpl_record_old.h"
33 #include "transaction.h"
34 
35 #include <algorithm>
36 
37 using std::min;
38 using std::max;
39 
40 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
41 
42 // Old implementation of do_apply_event()
43 int
44 Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli)
45 {
46  DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
47  int error= 0;
48  THD *ev_thd= ev->thd;
49  uchar const *row_start= ev->m_rows_buf;
50 
51  /*
52  If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
53  does not contain any data. In that case, we just remove all tables in the
54  tables_to_lock list, close the thread tables, and return with
55  success.
56  */
57  if ((ev->m_table_id.id() == ~0U || ev->m_table_id.id() == (~0ULL >> 16)) &&
58  ev->m_cols.n_bits == 1 && ev->m_cols.bitmap[0] == 0)
59 
60  {
61  /*
62  This one is supposed to be set: just an extra check so that
63  nothing strange has happened.
64  */
65  DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
66 
67  const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
68  ev_thd->clear_error();
69  DBUG_RETURN(0);
70  }
71 
72  /*
73  'ev_thd' has been set by exec_relay_log_event(), just before calling
74  do_apply_event(). We still check here to prevent future coding
75  errors.
76  */
77  DBUG_ASSERT(rli->info_thd == ev_thd);
78 
79  /*
80  If there is no locks taken, this is the first binrow event seen
81  after the table map events. We should then lock all the tables
82  used in the transaction and proceed with execution of the actual
83  event.
84  */
85  if (!ev_thd->lock)
86  {
87  /*
88  Lock_tables() reads the contents of ev_thd->lex, so they must be
89  initialized.
90 
91  We also call the mysql_reset_thd_for_next_command(), since this
92  is the logical start of the next "statement". Note that this
93  call might reset the value of current_stmt_binlog_format, so
94  we need to do any changes to that value after this function.
95  */
96  lex_start(ev_thd);
98 
99  /*
100  This is a row injection, so we flag the "statement" as
101  such. Note that this code is called both when the slave does row
102  injections and when the BINLOG statement is used to do row
103  injections.
104  */
105  ev_thd->lex->set_stmt_row_injection();
106 
107  if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0))
108  {
109  uint actual_error= ev_thd->get_stmt_da()->sql_errno();
110  if (ev_thd->is_slave_error || ev_thd->is_fatal_error)
111  {
112  /*
113  Error reporting borrowed from Query_log_event with many excessive
114  simplifications (we don't honour --slave-skip-errors)
115  */
116  rli->report(ERROR_LEVEL, actual_error,
117  "Error '%s' on opening tables",
118  (actual_error ? ev_thd->get_stmt_da()->message() :
119  "unexpected success or fatal error"));
120  ev_thd->is_slave_error= 1;
121  }
122  const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
123  DBUG_RETURN(actual_error);
124  }
125 
126  /*
127  When the open and locking succeeded, we check all tables to
128  ensure that they still have the correct type.
129 
130  We can use a down cast here since we know that every table added
131  to the tables_to_lock is a RPL_TABLE_LIST.
132  */
133 
134  {
135  RPL_TABLE_LIST *ptr= rli->tables_to_lock;
136  for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count);
137  ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
138  {
139  DBUG_ASSERT(ptr->m_tabledef_valid);
140  TABLE *conv_table;
141  if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
142  ptr->table, &conv_table))
143  {
144  ev_thd->is_slave_error= 1;
145  const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
147  }
148  DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
149  " - conv_table: %p",
150  ptr->table->s->db.str,
151  ptr->table->s->table_name.str, conv_table));
152  ptr->m_conv_table= conv_table;
153  }
154  }
155 
156  /*
157  ... and then we add all the tables to the table map and remove
158  them from tables to lock.
159 
160  We also invalidate the query cache for all the tables, since
161  they will now be changed.
162 
163  TODO [/Matz]: Maybe the query cache should not be invalidated
164  here? It might be that a table is not changed, even though it
165  was locked for the statement. We do know that each
166  Old_rows_log_event contain at least one row, so after processing one
167  Old_rows_log_event, we can invalidate the query cache for the
168  associated table.
169  */
170  TABLE_LIST *ptr= rli->tables_to_lock;
171  for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
172  const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
173 #ifdef HAVE_QUERY_CACHE
174  query_cache.invalidate_locked_for_write(rli->tables_to_lock);
175 #endif
176  }
177 
178  TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id);
179 
180  if (table)
181  {
182  /*
183  table == NULL means that this table should not be replicated
184  (this was set up by Table_map_log_event::do_apply_event()
185  which tested replicate-* rules).
186  */
187 
188  /*
189  It's not needed to set_time() but
190  1) it continues the property that "Time" in SHOW PROCESSLIST shows how
191  much slave is behind
192  2) it will be needed when we allow replication from a table with no
193  TIMESTAMP column to a table with one.
194  So we call set_time(), like in SBR. Presently it changes nothing.
195  */
196  ev_thd->set_time(&ev->when);
197  /*
198  There are a few flags that are replicated with each row event.
199  Make sure to set/clear them before executing the main body of
200  the event.
201  */
202  if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
203  ev_thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
204  else
205  ev_thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
206 
207  if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
208  ev_thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
209  else
210  ev_thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
211  /* A small test to verify that objects have consistent types */
212  DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
213 
214  /*
215  Now we are in a statement and will stay in a statement until we
216  see a STMT_END_F.
217 
218  We set this flag here, before actually applying any rows, in
219  case the SQL thread is stopped and we need to detect that we're
220  inside a statement and halting abruptly might cause problems
221  when restarting.
222  */
223  const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
224 
225  error= do_before_row_operations(table);
226  while (error == 0 && row_start < ev->m_rows_end)
227  {
228  uchar const *row_end= NULL;
229  if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
230  break; // We should perform the after-row operation even in
231  // the case of error
232 
233  DBUG_ASSERT(row_end != NULL); // cannot happen
234  DBUG_ASSERT(row_end <= ev->m_rows_end);
235 
236  /* in_use can have been set to NULL in close_tables_for_reopen */
237  THD* old_thd= table->in_use;
238  if (!table->in_use)
239  table->in_use= ev_thd;
240  error= do_exec_row(table);
241  table->in_use = old_thd;
242  switch (error)
243  {
244  /* Some recoverable errors */
245  case HA_ERR_RECORD_CHANGED:
246  case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
247  tuple does not exist */
248  error= 0;
249  case 0:
250  break;
251 
252  default:
253  rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
254  "Error in %s event: row application failed. %s",
255  ev->get_type_str(),
256  ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
257  thd->is_slave_error= 1;
258  break;
259  }
260 
261  row_start= row_end;
262  }
263  DBUG_EXECUTE_IF("stop_slave_middle_group",
264  const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
265  error= do_after_row_operations(table, error);
266  }
267 
268  if (error)
269  { /* error has occured during the transaction */
270  rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
271  "Error in %s event: error during transaction execution "
272  "on table %s.%s. %s",
273  ev->get_type_str(), table->s->db.str,
274  table->s->table_name.str,
275  ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
276 
277  /*
278  If one day we honour --skip-slave-errors in row-based replication, and
279  the error should be skipped, then we would clear mappings, rollback,
280  close tables, but the slave SQL thread would not stop and then may
281  assume the mapping is still available, the tables are still open...
282  So then we should clear mappings/rollback/close here only if this is a
283  STMT_END_F.
284  For now we code, knowing that error is not skippable and so slave SQL
285  thread is certainly going to stop.
286  rollback at the caller along with sbr.
287  */
288  ev_thd->reset_current_stmt_binlog_format_row();
289  const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error);
290  ev_thd->is_slave_error= 1;
291  DBUG_RETURN(error);
292  }
293 
294  DBUG_RETURN(0);
295 }
296 #endif
297 
298 
299 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
300 
301 /*
302  Check if there are more UNIQUE keys after the given key.
303 */
304 static int
305 last_uniq_key(TABLE *table, uint keyno)
306 {
307  while (++keyno < table->s->keys)
308  if (table->key_info[keyno].flags & HA_NOSAME)
309  return 0;
310  return 1;
311 }
312 
313 
314 /*
315  Compares table->record[0] and table->record[1]
316 
317  Returns TRUE if different.
318 */
319 static bool record_compare(TABLE *table)
320 {
321  /*
322  Need to set the X bit and the filler bits in both records since
323  there are engines that do not set it correctly.
324 
325  In addition, since MyISAM checks that one hasn't tampered with the
326  record, it is necessary to restore the old bytes into the record
327  after doing the comparison.
328 
329  TODO[record format ndb]: Remove it once NDB returns correct
330  records. Check that the other engines also return correct records.
331  */
332 
333  bool result= FALSE;
334  uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
335 
336  if (table->s->null_bytes > 0)
337  {
338  for (int i = 0 ; i < 2 ; ++i)
339  {
340  /*
341  If we have an X bit then we need to take care of it.
342  */
343  if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
344  {
345  saved_x[i]= table->record[i][0];
346  table->record[i][0]|= 1U;
347  }
348 
349  /*
350  If (last_null_bit_pos == 0 && null_bytes > 1), then:
351 
352  X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
353 
354  Ie, the entire byte is used.
355  */
356  if (table->s->last_null_bit_pos > 0)
357  {
358  saved_filler[i]= table->record[i][table->s->null_bytes - 1];
359  table->record[i][table->s->null_bytes - 1]|=
360  256U - (1U << table->s->last_null_bit_pos);
361  }
362  }
363  }
364 
365  if (table->s->blob_fields + table->s->varchar_fields == 0)
366  {
367  result= cmp_record(table,record[1]);
368  goto record_compare_exit;
369  }
370 
371  /* Compare null bits */
372  if (memcmp(table->null_flags,
373  table->null_flags+table->s->rec_buff_length,
374  table->s->null_bytes))
375  {
376  result= TRUE; // Diff in NULL value
377  goto record_compare_exit;
378  }
379 
380  /* Compare updated fields */
381  for (Field **ptr=table->field ; *ptr ; ptr++)
382  {
383  if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
384  {
385  result= TRUE;
386  goto record_compare_exit;
387  }
388  }
389 
390 record_compare_exit:
391  /*
392  Restore the saved bytes.
393 
394  TODO[record format ndb]: Remove this code once NDB returns the
395  correct record format.
396  */
397  if (table->s->null_bytes > 0)
398  {
399  for (int i = 0 ; i < 2 ; ++i)
400  {
401  if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
402  table->record[i][0]= saved_x[i];
403 
404  if (table->s->last_null_bit_pos > 0)
405  table->record[i][table->s->null_bytes - 1]= saved_filler[i];
406  }
407  }
408 
409  return result;
410 }
411 
412 
413 /*
414  Copy "extra" columns from record[1] to record[0].
415 
416  Copy the extra fields that are not present on the master but are
417  present on the slave from record[1] to record[0]. This is used
418  after fetching a record that are to be updated, either inside
419  replace_record() or as part of executing an update_row().
420  */
421 static int
422 copy_extra_record_fields(TABLE *table,
423  size_t master_reclength,
424  my_ptrdiff_t master_fields)
425 {
426  DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
427  DBUG_PRINT("info", ("Copying to 0x%lx "
428  "from field %lu at offset %lu "
429  "to field %d at offset %lu",
430  (long) table->record[0],
431  (ulong) master_fields, (ulong) master_reclength,
432  table->s->fields, table->s->reclength));
433  /*
434  Copying the extra fields of the slave that does not exist on
435  master into record[0] (which are basically the default values).
436  */
437 
438  if (table->s->fields < (uint) master_fields)
439  DBUG_RETURN(0);
440 
441  DBUG_ASSERT(master_reclength <= table->s->reclength);
442  if (master_reclength < table->s->reclength)
443  memcpy(table->record[0] + master_reclength,
444  table->record[1] + master_reclength,
445  table->s->reclength - master_reclength);
446 
447  /*
448  Bit columns are special. We iterate over all the remaining
449  columns and copy the "extra" bits to the new record. This is
450  not a very good solution: it should be refactored on
451  opportunity.
452 
453  REFACTORING SUGGESTION (Matz). Introduce a member function
454  similar to move_field_offset() called copy_field_offset() to
455  copy field values and implement it for all Field subclasses. Use
456  this function to copy data from the found record to the record
457  that are going to be inserted.
458 
459  The copy_field_offset() function need to be a virtual function,
460  which in this case will prevent copying an entire range of
461  fields efficiently.
462  */
463  {
464  Field **field_ptr= table->field + master_fields;
465  for ( ; *field_ptr ; ++field_ptr)
466  {
467  /*
468  Set the null bit according to the values in record[1]
469  */
470  if ((*field_ptr)->maybe_null() &&
471  (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
472  (*field_ptr)->set_null();
473  else
474  (*field_ptr)->set_notnull();
475 
476  /*
477  Do the extra work for special columns.
478  */
479  switch ((*field_ptr)->real_type())
480  {
481  default:
482  /* Nothing to do */
483  break;
484 
485  case MYSQL_TYPE_BIT:
486  Field_bit *f= static_cast<Field_bit*>(*field_ptr);
487  if (f->bit_len > 0)
488  {
489  my_ptrdiff_t const offset= table->record[1] - table->record[0];
490  uchar const bits=
491  get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
492  set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
493  }
494  break;
495  }
496  }
497  }
498  DBUG_RETURN(0); // All OK
499 }
500 
501 
502 /*
503  Replace the provided record in the database.
504 
505  SYNOPSIS
506  replace_record()
507  thd Thread context for writing the record.
508  table Table to which record should be written.
509  master_reclength
510  Offset to first column that is not present on the master,
511  alternatively the length of the record on the master
512  side.
513 
514  RETURN VALUE
515  Error code on failure, 0 on success.
516 
517  DESCRIPTION
518  Similar to how it is done in mysql_insert(), we first try to do
519  a ha_write_row() and of that fails due to duplicated keys (or
520  indices), we do an ha_update_row() or a ha_delete_row() instead.
521  */
522 static int
523 replace_record(THD *thd, TABLE *table,
524  ulong const master_reclength,
525  uint const master_fields)
526 {
527  DBUG_ENTER("replace_record");
528  DBUG_ASSERT(table != NULL && thd != NULL);
529 
530  int error;
531  int keynum;
532  auto_afree_ptr<char> key(NULL);
533 
534 #ifndef DBUG_OFF
535  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
536  DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
537  DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
538 #endif
539 
540  while ((error= table->file->ha_write_row(table->record[0])))
541  {
542  if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
543  {
544  table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
545  DBUG_RETURN(error);
546  }
547  if ((keynum= table->file->get_dup_key(error)) < 0)
548  {
549  table->file->print_error(error, MYF(0));
550  /*
551  We failed to retrieve the duplicate key
552  - either because the error was not "duplicate key" error
553  - or because the information which key is not available
554  */
555  DBUG_RETURN(error);
556  }
557 
558  /*
559  We need to retrieve the old row into record[1] to be able to
560  either update or delete the offending record. We either:
561 
562  - use ha_rnd_pos() with a row-id (available as dupp_row) to the
563  offending row, if that is possible (MyISAM and Blackhole), or else
564 
565  - use ha_index_read_idx_map() with the key that is duplicated, to
566  retrieve the offending row.
567  */
568  if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
569  {
570  error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
571  if (error)
572  {
573  DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
574  if (error == HA_ERR_RECORD_DELETED)
575  error= HA_ERR_KEY_NOT_FOUND;
576  table->file->print_error(error, MYF(0));
577  DBUG_RETURN(error);
578  }
579  }
580  else
581  {
582  if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
583  {
584  DBUG_RETURN(my_errno);
585  }
586 
587  if (key.get() == NULL)
588  {
589  key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
590  if (key.get() == NULL)
591  DBUG_RETURN(ENOMEM);
592  }
593 
594  key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
595  0);
596  error= table->file->ha_index_read_idx_map(table->record[1], keynum,
597  (const uchar*)key.get(),
598  HA_WHOLE_KEY,
599  HA_READ_KEY_EXACT);
600  if (error)
601  {
602  DBUG_PRINT("info", ("ha_index_read_idx_map() returns error %d", error));
603  if (error == HA_ERR_RECORD_DELETED)
604  error= HA_ERR_KEY_NOT_FOUND;
605  table->file->print_error(error, MYF(0));
606  DBUG_RETURN(error);
607  }
608  }
609 
610  /*
611  Now, table->record[1] should contain the offending row. That
612  will enable us to update it or, alternatively, delete it (so
613  that we can insert the new row afterwards).
614 
615  First we copy the columns into table->record[0] that are not
616  present on the master from table->record[1], if there are any.
617  */
618  copy_extra_record_fields(table, master_reclength, master_fields);
619 
620  /*
621  REPLACE is defined as either INSERT or DELETE + INSERT. If
622  possible, we can replace it with an UPDATE, but that will not
623  work on InnoDB if FOREIGN KEY checks are necessary.
624 
625  I (Matz) am not sure of the reason for the last_uniq_key()
626  check as, but I'm guessing that it's something along the
627  following lines.
628 
629  Suppose that we got the duplicate key to be a key that is not
630  the last unique key for the table and we perform an update:
631  then there might be another key for which the unique check will
632  fail, so we're better off just deleting the row and inserting
633  the correct row.
634  */
635  if (last_uniq_key(table, keynum) &&
636  !table->file->referenced_by_foreign_key())
637  {
638  error=table->file->ha_update_row(table->record[1],
639  table->record[0]);
640  if (error && error != HA_ERR_RECORD_IS_THE_SAME)
641  table->file->print_error(error, MYF(0));
642  else
643  error= 0;
644  DBUG_RETURN(error);
645  }
646  else
647  {
648  if ((error= table->file->ha_delete_row(table->record[1])))
649  {
650  table->file->print_error(error, MYF(0));
651  DBUG_RETURN(error);
652  }
653  /* Will retry ha_write_row() with the offending row removed. */
654  }
655  }
656 
657  DBUG_RETURN(error);
658 }
659 
660 
685 static int find_and_fetch_row(TABLE *table, uchar *key)
686 {
687  DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
688  DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx",
689  (long) table, (long) key, (long) table->record[1]));
690 
691  DBUG_ASSERT(table->in_use != NULL);
692 
693  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
694 
695  if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
696  table->s->primary_key < MAX_KEY)
697  {
698  /*
699  Use a more efficient method to fetch the record given by
700  table->record[0] if the engine allows it. We first compute a
701  row reference using the position() member function (it will be
702  stored in table->file->ref) and the use rnd_pos() to position
703  the "cursor" (i.e., record[0] in this case) at the correct row.
704 
705  TODO: Add a check that the correct record has been fetched by
706  comparing with the original record. Take into account that the
707  record on the master and slave can be of different
708  length. Something along these lines should work:
709 
710  ADD>>> store_record(table,record[1]);
711  int error= table->file->rnd_pos(table->record[0], table->file->ref);
712  ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
713  table->s->reclength) == 0);
714 
715  */
716  table->file->position(table->record[0]);
717  int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
718  /*
719  ha_rnd_pos() returns the record in table->record[0], so we have to
720  move it to table->record[1].
721  */
722  memcpy(table->record[1], table->record[0], table->s->reclength);
723  DBUG_RETURN(error);
724  }
725 
726  /* We need to retrieve all fields */
727  /* TODO: Move this out from this function to main loop */
728  table->use_all_columns();
729 
730  if (table->s->keys > 0)
731  {
732  int error;
733  /* We have a key: search the table using the index */
734  if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
735  {
736  table->file->print_error(error, MYF(0));
737  DBUG_RETURN(error);
738  }
739 
740  /*
741  Don't print debug messages when running valgrind since they can
742  trigger false warnings.
743  */
744 #ifndef HAVE_purify
745  DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
746  DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
747 #endif
748 
749  /*
750  We need to set the null bytes to ensure that the filler bit are
751  all set when returning. There are storage engines that just set
752  the necessary bits on the bytes and don't set the filler bits
753  correctly.
754  */
755  my_ptrdiff_t const pos=
756  table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
757  table->record[1][pos]= 0xFF;
758  if ((error= table->file->ha_index_read_map(table->record[1], key, HA_WHOLE_KEY,
759  HA_READ_KEY_EXACT)))
760  {
761  table->file->print_error(error, MYF(0));
762  table->file->ha_index_end();
763  DBUG_RETURN(error);
764  }
765 
766  /*
767  Don't print debug messages when running valgrind since they can
768  trigger false warnings.
769  */
770 #ifndef HAVE_purify
771  DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
772  DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
773 #endif
774  /*
775  Below is a minor "optimization". If the key (i.e., key number
776  0) has the HA_NOSAME flag set, we know that we have found the
777  correct record (since there can be no duplicates); otherwise, we
778  have to compare the record with the one found to see if it is
779  the correct one.
780 
781  CAVEAT! This behaviour is essential for the replication of,
782  e.g., the mysql.proc table since the correct record *shall* be
783  found using the primary key *only*. There shall be no
784  comparison of non-PK columns to decide if the correct record is
785  found. I can see no scenario where it would be incorrect to
786  chose the row to change only using a PK or an UNNI.
787  */
788  if (table->key_info->flags & HA_NOSAME)
789  {
790  table->file->ha_index_end();
791  DBUG_RETURN(0);
792  }
793 
794  while (record_compare(table))
795  {
796  int error;
797 
798  /*
799  We need to set the null bytes to ensure that the filler bit
800  are all set when returning. There are storage engines that
801  just set the necessary bits on the bytes and don't set the
802  filler bits correctly.
803 
804  TODO[record format ndb]: Remove this code once NDB returns the
805  correct record format.
806  */
807  if (table->s->null_bytes > 0)
808  {
809  table->record[1][table->s->null_bytes - 1]|=
810  256U - (1U << table->s->last_null_bit_pos);
811  }
812 
813  while ((error= table->file->ha_index_next(table->record[1])))
814  {
815  /* We just skip records that has already been deleted */
816  if (error == HA_ERR_RECORD_DELETED)
817  continue;
818  table->file->print_error(error, MYF(0));
819  table->file->ha_index_end();
820  DBUG_RETURN(error);
821  }
822  }
823 
824  /*
825  Have to restart the scan to be able to fetch the next row.
826  */
827  table->file->ha_index_end();
828  }
829  else
830  {
831  int restart_count= 0; // Number of times scanning has restarted from top
832  int error;
833 
834  /* We don't have a key: search the table using ha_rnd_next() */
835  if ((error= table->file->ha_rnd_init(1)))
836  {
837  table->file->print_error(error, MYF(0));
838  DBUG_RETURN(error);
839  }
840 
841  /* Continue until we find the right record or have made a full loop */
842  do
843  {
844  restart_ha_rnd_next:
845  error= table->file->ha_rnd_next(table->record[1]);
846 
847  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
848  DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
849 
850  switch (error) {
851  case 0:
852  break;
853 
854  /*
855  If the record was deleted, we pick the next one without doing
856  any comparisons.
857  */
858  case HA_ERR_RECORD_DELETED:
859  goto restart_ha_rnd_next;
860 
861  case HA_ERR_END_OF_FILE:
862  if (++restart_count < 2)
863  {
864  if ((error= table->file->ha_rnd_init(1)))
865  {
866  table->file->print_error(error, MYF(0));
867  DBUG_RETURN(error);
868  }
869  }
870  break;
871 
872  default:
873  table->file->print_error(error, MYF(0));
874  DBUG_PRINT("info", ("Record not found"));
875  (void) table->file->ha_rnd_end();
876  DBUG_RETURN(error);
877  }
878  }
879  while (restart_count < 2 && record_compare(table));
880 
881  /*
882  Have to restart the scan to be able to fetch the next row.
883  */
884  DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
885  table->file->ha_rnd_end();
886 
887  DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
888  DBUG_RETURN(error);
889  }
890 
891  DBUG_RETURN(0);
892 }
893 
894 
895 /**********************************************************
896  Row handling primitives for Write_rows_log_event_old
897  **********************************************************/
898 
899 int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
900 {
901  int error= 0;
902 
903  /*
904  We are using REPLACE semantics and not INSERT IGNORE semantics
905  when writing rows, that is: new rows replace old rows. We need to
906  inform the storage engine that it should use this behaviour.
907  */
908 
909  /* Tell the storage engine that we are using REPLACE semantics. */
910  thd->lex->duplicates= DUP_REPLACE;
911 
912  /*
913  Pretend we're executing a REPLACE command: this is needed for
914  InnoDB and NDB Cluster since they are not (properly) checking the
915  lex->duplicates flag.
916  */
917  thd->lex->sql_command= SQLCOM_REPLACE;
918  /*
919  Do not raise the error flag in case of hitting to an unique attribute
920  */
921  table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
922  /*
923  NDB specific: update from ndb master wrapped as Write_rows
924  */
925  /*
926  so that the event should be applied to replace slave's row
927  */
928  table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
929  /*
930  NDB specific: if update from ndb master wrapped as Write_rows
931  does not find the row it's assumed idempotent binlog applying
932  is taking place; don't raise the error.
933  */
934  table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
935  /*
936  TODO: the cluster team (Tomas?) says that it's better if the engine knows
937  how many rows are going to be inserted, then it can allocate needed memory
938  from the start.
939  */
940  table->file->ha_start_bulk_insert(0);
941  return error;
942 }
943 
944 
945 int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
946 {
947  int local_error= 0;
948  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
949  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
950  /*
951  reseting the extra with
952  table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
953  fires bug#27077
954  todo: explain or fix
955  */
956  if ((local_error= table->file->ha_end_bulk_insert()))
957  {
958  table->file->print_error(local_error, MYF(0));
959  }
960  return error? error : local_error;
961 }
962 
963 
964 int
965 Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
966  Relay_log_info const *rli,
967  TABLE *table,
968  uchar const *row_start,
969  uchar const **row_end)
970 {
971  DBUG_ASSERT(table != NULL);
972  DBUG_ASSERT(row_start && row_end);
973 
974  int error;
975  error= unpack_row_old(const_cast<Relay_log_info*>(rli),
976  table, m_width, table->record[0],
977  row_start, &m_cols, row_end, &m_master_reclength,
978  table->write_set, PRE_GA_WRITE_ROWS_EVENT);
979  bitmap_copy(table->read_set, table->write_set);
980  return error;
981 }
982 
983 
984 int Write_rows_log_event_old::do_exec_row(TABLE *table)
985 {
986  DBUG_ASSERT(table != NULL);
987  int error= replace_record(thd, table, m_master_reclength, m_width);
988  return error;
989 }
990 
991 
992 /**********************************************************
993  Row handling primitives for Delete_rows_log_event_old
994  **********************************************************/
995 
996 int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
997 {
998  DBUG_ASSERT(m_memory == NULL);
999 
1000  if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
1001  table->s->primary_key < MAX_KEY)
1002  {
1003  /*
1004  We don't need to allocate any memory for m_after_image and
1005  m_key since they are not used.
1006  */
1007  return 0;
1008  }
1009 
1010  int error= 0;
1011 
1012  if (table->s->keys > 0)
1013  {
1014  m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1015  &m_after_image,
1016  (uint) table->s->reclength,
1017  &m_key,
1018  (uint) table->key_info->key_length,
1019  NullS);
1020  }
1021  else
1022  {
1023  m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1024  m_memory= (uchar*)m_after_image;
1025  m_key= NULL;
1026  }
1027  if (!m_memory)
1028  return HA_ERR_OUT_OF_MEM;
1029 
1030  return error;
1031 }
1032 
1033 
1034 int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1035 {
1036  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1037  table->file->ha_index_or_rnd_end();
1038  my_free(m_memory); // Free for multi_malloc
1039  m_memory= NULL;
1040  m_after_image= NULL;
1041  m_key= NULL;
1042 
1043  return error;
1044 }
1045 
1046 
1047 int
1048 Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
1049  Relay_log_info const *rli,
1050  TABLE *table,
1051  uchar const *row_start,
1052  uchar const **row_end)
1053 {
1054  int error;
1055  DBUG_ASSERT(row_start && row_end);
1056  /*
1057  This assertion actually checks that there is at least as many
1058  columns on the slave as on the master.
1059  */
1060  DBUG_ASSERT(table->s->fields >= m_width);
1061 
1062  error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1063  table, m_width, table->record[0],
1064  row_start, &m_cols, row_end, &m_master_reclength,
1065  table->read_set, PRE_GA_DELETE_ROWS_EVENT);
1066  /*
1067  If we will access rows using the random access method, m_key will
1068  be set to NULL, so we do not need to make a key copy in that case.
1069  */
1070  if (m_key)
1071  {
1072  KEY *const key_info= table->key_info;
1073 
1074  key_copy(m_key, table->record[0], key_info, 0);
1075  }
1076 
1077  return error;
1078 }
1079 
1080 
1081 int Delete_rows_log_event_old::do_exec_row(TABLE *table)
1082 {
1083  int error;
1084  DBUG_ASSERT(table != NULL);
1085 
1086  if (!(error= ::find_and_fetch_row(table, m_key)))
1087  {
1088  /*
1089  Now we should have the right row to delete. We are using
1090  record[0] since it is guaranteed to point to a record with the
1091  correct value.
1092  */
1093  error= table->file->ha_delete_row(table->record[0]);
1094  }
1095  return error;
1096 }
1097 
1098 
1099 /**********************************************************
1100  Row handling primitives for Update_rows_log_event_old
1101  **********************************************************/
1102 
1103 int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
1104 {
1105  DBUG_ASSERT(m_memory == NULL);
1106 
1107  int error= 0;
1108 
1109  if (table->s->keys > 0)
1110  {
1111  m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1112  &m_after_image,
1113  (uint) table->s->reclength,
1114  &m_key,
1115  (uint) table->key_info->key_length,
1116  NullS);
1117  }
1118  else
1119  {
1120  m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1121  m_memory= m_after_image;
1122  m_key= NULL;
1123  }
1124  if (!m_memory)
1125  return HA_ERR_OUT_OF_MEM;
1126 
1127  return error;
1128 }
1129 
1130 
1131 int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1132 {
1133  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1134  table->file->ha_index_or_rnd_end();
1135  my_free(m_memory);
1136  m_memory= NULL;
1137  m_after_image= NULL;
1138  m_key= NULL;
1139 
1140  return error;
1141 }
1142 
1143 
1144 int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1145  Relay_log_info const *rli,
1146  TABLE *table,
1147  uchar const *row_start,
1148  uchar const **row_end)
1149 {
1150  int error;
1151  DBUG_ASSERT(row_start && row_end);
1152  /*
1153  This assertion actually checks that there is at least as many
1154  columns on the slave as on the master.
1155  */
1156  DBUG_ASSERT(table->s->fields >= m_width);
1157 
1158  /* record[0] is the before image for the update */
1159  error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1160  table, m_width, table->record[0],
1161  row_start, &m_cols, row_end, &m_master_reclength,
1162  table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
1163  row_start = *row_end;
1164  /* m_after_image is the after image for the update */
1165  error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1166  table, m_width, m_after_image,
1167  row_start, &m_cols, row_end, &m_master_reclength,
1168  table->write_set, PRE_GA_UPDATE_ROWS_EVENT);
1169 
1170  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1171  DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
1172 
1173  /*
1174  If we will access rows using the random access method, m_key will
1175  be set to NULL, so we do not need to make a key copy in that case.
1176  */
1177  if (m_key)
1178  {
1179  KEY *const key_info= table->key_info;
1180 
1181  key_copy(m_key, table->record[0], key_info, 0);
1182  }
1183 
1184  return error;
1185 }
1186 
1187 
1188 int Update_rows_log_event_old::do_exec_row(TABLE *table)
1189 {
1190  DBUG_ASSERT(table != NULL);
1191 
1192  int error= ::find_and_fetch_row(table, m_key);
1193  if (error)
1194  return error;
1195 
1196  /*
1197  We have to ensure that the new record (i.e., the after image) is
1198  in record[0] and the old record (i.e., the before image) is in
1199  record[1]. This since some storage engines require this (for
1200  example, the partition engine).
1201 
1202  Since find_and_fetch_row() puts the fetched record (i.e., the old
1203  record) in record[1], we can keep it there. We put the new record
1204  (i.e., the after image) into record[0], and copy the fields that
1205  are on the slave (i.e., in record[1]) into record[0], effectively
1206  overwriting the default values that where put there by the
1207  unpack_row() function.
1208  */
1209  memcpy(table->record[0], m_after_image, table->s->reclength);
1210  copy_extra_record_fields(table, m_master_reclength, m_width);
1211 
1212  /*
1213  Now we have the right row to update. The old row (the one we're
1214  looking for) is in record[1] and the new row has is in record[0].
1215  We also have copied the original values already in the slave's
1216  database into the after image delivered from the master.
1217  */
1218  error= table->file->ha_update_row(table->record[1], table->record[0]);
1219  if (error == HA_ERR_RECORD_IS_THE_SAME)
1220  error= 0;
1221 
1222  return error;
1223 }
1224 
1225 #endif
1226 
1227 
1228 /**************************************************************************
1229  Rows_log_event member functions
1230 **************************************************************************/
1231 
1232 #ifndef MYSQL_CLIENT
1233 Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
1234  MY_BITMAP const *cols,
1235  bool using_trans)
1236  : Log_event(thd_arg, 0,
1237  using_trans ? Log_event::EVENT_TRANSACTIONAL_CACHE :
1238  Log_event::EVENT_STMT_CACHE,
1239  Log_event::EVENT_NORMAL_LOGGING),
1240  m_row_count(0),
1241  m_table(tbl_arg),
1242  m_table_id(tid),
1243  m_width(tbl_arg ? tbl_arg->s->fields : 1),
1244  m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1245 #ifdef HAVE_REPLICATION
1246  , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1247 #endif
1248 {
1249 
1250  // This constructor should not be reached.
1251  assert(0);
1252 
1253  /*
1254  We allow a special form of dummy event when the table, and cols
1255  are null and the table id is ~0UL. This is a temporary
1256  solution, to be able to terminate a started statement in the
1257  binary log: the extraneous events will be removed in the future.
1258  */
1259  DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1260  (!tbl_arg && !cols && tid == ~0UL));
1261 
1262  if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
1263  set_flags(NO_FOREIGN_KEY_CHECKS_F);
1264  if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
1265  set_flags(RELAXED_UNIQUE_CHECKS_F);
1266  /* if bitmap_init fails, caught in is_valid() */
1267  if (likely(!bitmap_init(&m_cols,
1268  m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1269  m_width,
1270  false)))
1271  {
1272  /* Cols can be zero if this is a dummy binrows event */
1273  if (likely(cols != NULL))
1274  {
1275  memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1276  create_last_word_mask(&m_cols);
1277  }
1278  }
1279  else
1280  {
1281  // Needed because bitmap_init() does not set it to null on failure
1282  m_cols.bitmap= 0;
1283  }
1284 }
1285 #endif
1286 
1287 
1288 Old_rows_log_event::Old_rows_log_event(const char *buf, uint event_len,
1289  Log_event_type event_type,
1291  *description_event)
1292  : Log_event(buf, description_event),
1293  m_row_count(0),
1294 #ifndef MYSQL_CLIENT
1295  m_table(NULL),
1296 #endif
1297  m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1298 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1299  , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1300 #endif
1301 {
1302  DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1303  uint8 const common_header_len= description_event->common_header_len;
1304  uint8 const post_header_len= description_event->post_header_len[event_type-1];
1305 
1306  DBUG_PRINT("enter",("event_len: %u common_header_len: %d "
1307  "post_header_len: %d",
1308  event_len, common_header_len,
1309  post_header_len));
1310 
1311  const char *post_start= buf + common_header_len;
1312  DBUG_DUMP("post_header", (uchar*) post_start, post_header_len);
1313  post_start+= RW_MAPID_OFFSET;
1314  if (post_header_len == 6)
1315  {
1316  /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
1317  m_table_id= uint4korr(post_start);
1318  post_start+= 4;
1319  }
1320  else
1321  {
1322  m_table_id= (ulong) uint6korr(post_start);
1323  post_start+= RW_FLAGS_OFFSET;
1324  }
1325 
1326  m_flags= uint2korr(post_start);
1327 
1328  uchar const *const var_start=
1329  (const uchar *)buf + common_header_len + post_header_len;
1330  uchar const *const ptr_width= var_start;
1331  uchar *ptr_after_width= (uchar*) ptr_width;
1332  DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1333  m_width = net_field_length(&ptr_after_width);
1334  DBUG_PRINT("debug", ("m_width=%lu", m_width));
1335  /* if bitmap_init fails, catched in is_valid() */
1336  if (likely(!bitmap_init(&m_cols,
1337  m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1338  m_width,
1339  false)))
1340  {
1341  DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1342  memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1343  create_last_word_mask(&m_cols);
1344  ptr_after_width+= (m_width + 7) / 8;
1345  DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1346  }
1347  else
1348  {
1349  // Needed because bitmap_init() does not set it to null on failure
1350  m_cols.bitmap= NULL;
1351  DBUG_VOID_RETURN;
1352  }
1353 
1354  const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
1355  size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
1356  DBUG_PRINT("info",("m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu",
1357  m_table_id.id(), m_flags, m_width, (ulong) data_size));
1358  DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size);
1359 
1360  m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
1361  if (likely((bool)m_rows_buf))
1362  {
1363 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1364  m_curr_row= m_rows_buf;
1365 #endif
1366  m_rows_end= m_rows_buf + data_size;
1367  m_rows_cur= m_rows_end;
1368  memcpy(m_rows_buf, ptr_rows_data, data_size);
1369  }
1370  else
1371  m_cols.bitmap= 0; // to not free it
1372 
1373  DBUG_VOID_RETURN;
1374 }
1375 
1376 
1377 Old_rows_log_event::~Old_rows_log_event()
1378 {
1379  if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
1380  m_cols.bitmap= 0; // so no my_free in bitmap_free
1381  bitmap_free(&m_cols); // To pair with bitmap_init().
1382  my_free(m_rows_buf);
1383 }
1384 
1385 
1386 int Old_rows_log_event::get_data_size()
1387 {
1388  uchar buf[sizeof(m_width)+1];
1389  uchar *end= net_store_length(buf, (m_width + 7) / 8);
1390 
1391  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1392  return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
1393  (m_rows_cur - m_rows_buf););
1394  int data_size= ROWS_HEADER_LEN;
1395  data_size+= no_bytes_in_map(&m_cols);
1396  data_size+= (uint) (end - buf);
1397 
1398  data_size+= (uint) (m_rows_cur - m_rows_buf);
1399  return data_size;
1400 }
1401 
1402 
1403 #ifndef MYSQL_CLIENT
1404 int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
1405 {
1406  /*
1407  When the table has a primary key, we would probably want, by default, to
1408  log only the primary key value instead of the entire "before image". This
1409  would save binlog space. TODO
1410  */
1411  DBUG_ENTER("Old_rows_log_event::do_add_row_data");
1412  DBUG_PRINT("enter", ("row_data: 0x%lx length: %lu", (ulong) row_data,
1413  (ulong) length));
1414  /*
1415  Don't print debug messages when running valgrind since they can
1416  trigger false warnings.
1417  */
1418 #ifndef HAVE_purify
1419  DBUG_DUMP("row_data", row_data, min<size_t>(length, 32));
1420 #endif
1421 
1422  DBUG_ASSERT(m_rows_buf <= m_rows_cur);
1423  DBUG_ASSERT(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1424  DBUG_ASSERT(m_rows_cur <= m_rows_end);
1425 
1426  /* The cast will always work since m_rows_cur <= m_rows_end */
1427  if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1428  {
1429  size_t const block_size= 1024;
1430  my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
1431  my_ptrdiff_t const new_alloc=
1432  block_size * ((cur_size + length + block_size - 1) / block_size);
1433 
1434  uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc,
1435  MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1436  if (unlikely(!new_buf))
1437  DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1438 
1439  /* If the memory moved, we need to move the pointers */
1440  if (new_buf != m_rows_buf)
1441  {
1442  m_rows_buf= new_buf;
1443  m_rows_cur= m_rows_buf + cur_size;
1444  }
1445 
1446  /*
1447  The end pointer should always be changed to point to the end of
1448  the allocated memory.
1449  */
1450  m_rows_end= m_rows_buf + new_alloc;
1451  }
1452 
1453  DBUG_ASSERT(m_rows_cur + length <= m_rows_end);
1454  memcpy(m_rows_cur, row_data, length);
1455  m_rows_cur+= length;
1456  m_row_count++;
1457  DBUG_RETURN(0);
1458 }
1459 #endif
1460 
1461 
1462 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1463 int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
1464 {
1465  DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
1466  int error= 0;
1467 
1468  /*
1469  If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
1470  does not contain any data. In that case, we just remove all tables in the
1471  tables_to_lock list, close the thread tables, and return with
1472  success.
1473  */
1474  if ((m_table_id.id() == ~0U || m_table_id.id() == (~0ULL >> 16)) &&
1475  m_cols.n_bits == 1 && m_cols.bitmap[0] == 0)
1476  {
1477  /*
1478  This one is supposed to be set: just an extra check so that
1479  nothing strange has happened.
1480  */
1481  DBUG_ASSERT(get_flags(STMT_END_F));
1482 
1483  const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1484  thd->clear_error();
1485  DBUG_RETURN(0);
1486  }
1487 
1488  /*
1489  'thd' has been set by exec_relay_log_event(), just before calling
1490  do_apply_event(). We still check here to prevent future coding
1491  errors.
1492  */
1493  DBUG_ASSERT(rli->info_thd == thd);
1494 
1495  /*
1496  If there is no locks taken, this is the first binrow event seen
1497  after the table map events. We should then lock all the tables
1498  used in the transaction and proceed with execution of the actual
1499  event.
1500  */
1501  if (!thd->lock)
1502  {
1503  /*
1504  lock_tables() reads the contents of thd->lex, so they must be
1505  initialized. Contrary to in
1506  Table_map_log_event::do_apply_event() we don't call
1507  mysql_init_query() as that may reset the binlog format.
1508  */
1509  lex_start(thd);
1510 
1511  if ((error= lock_tables(thd, rli->tables_to_lock,
1512  rli->tables_to_lock_count, 0)))
1513  {
1514  if (thd->is_slave_error || thd->is_fatal_error)
1515  {
1516  /*
1517  Error reporting borrowed from Query_log_event with many excessive
1518  simplifications (we don't honour --slave-skip-errors)
1519  */
1520  uint actual_error= thd->net.last_errno;
1521  rli->report(ERROR_LEVEL, actual_error,
1522  "Error '%s' in %s event: when locking tables",
1523  (actual_error ? thd->net.last_error :
1524  "unexpected success or fatal error"),
1525  get_type_str());
1526  thd->is_fatal_error= 1;
1527  }
1528  else
1529  {
1530  rli->report(ERROR_LEVEL, error,
1531  "Error in %s event: when locking tables",
1532  get_type_str());
1533  }
1534  const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1535  DBUG_RETURN(error);
1536  }
1537 
1538  /*
1539  When the open and locking succeeded, we check all tables to
1540  ensure that they still have the correct type.
1541 
1542  We can use a down cast here since we know that every table added
1543  to the tables_to_lock is a RPL_TABLE_LIST.
1544  */
1545 
1546  {
1547  RPL_TABLE_LIST *ptr= rli->tables_to_lock;
1548  for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count);
1549  ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
1550  {
1551  TABLE *conv_table;
1552  if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
1553  ptr->table, &conv_table))
1554  {
1555  thd->is_slave_error= 1;
1556  const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1557  DBUG_RETURN(ERR_BAD_TABLE_DEF);
1558  }
1559  ptr->m_conv_table= conv_table;
1560  }
1561  }
1562 
1563  /*
1564  ... and then we add all the tables to the table map but keep
1565  them in the tables to lock list.
1566 
1567 
1568  We also invalidate the query cache for all the tables, since
1569  they will now be changed.
1570 
1571  TODO [/Matz]: Maybe the query cache should not be invalidated
1572  here? It might be that a table is not changed, even though it
1573  was locked for the statement. We do know that each
1574  Old_rows_log_event contain at least one row, so after processing one
1575  Old_rows_log_event, we can invalidate the query cache for the
1576  associated table.
1577  */
1578  for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
1579  {
1580  const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
1581  }
1582 #ifdef HAVE_QUERY_CACHE
1583  query_cache.invalidate_locked_for_write(rli->tables_to_lock);
1584 #endif
1585  }
1586 
1587  TABLE*
1588  table=
1589  m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
1590 
1591  if (table)
1592  {
1593  /*
1594  table == NULL means that this table should not be replicated
1595  (this was set up by Table_map_log_event::do_apply_event()
1596  which tested replicate-* rules).
1597  */
1598 
1599  /*
1600  It's not needed to set_time() but
1601  1) it continues the property that "Time" in SHOW PROCESSLIST shows how
1602  much slave is behind
1603  2) it will be needed when we allow replication from a table with no
1604  TIMESTAMP column to a table with one.
1605  So we call set_time(), like in SBR. Presently it changes nothing.
1606  */
1607  thd->set_time(&when);
1608  /*
1609  There are a few flags that are replicated with each row event.
1610  Make sure to set/clear them before executing the main body of
1611  the event.
1612  */
1613  if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1614  thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
1615  else
1616  thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1617 
1618  if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1619  thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
1620  else
1621  thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1622  /* A small test to verify that objects have consistent types */
1623  DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
1624 
1625  /*
1626  Now we are in a statement and will stay in a statement until we
1627  see a STMT_END_F.
1628 
1629  We set this flag here, before actually applying any rows, in
1630  case the SQL thread is stopped and we need to detect that we're
1631  inside a statement and halting abruptly might cause problems
1632  when restarting.
1633  */
1634  const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
1635 
1636  if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1637  set_flags(COMPLETE_ROWS_F);
1638 
1639  /*
1640  Set tables write and read sets.
1641 
1642  Read_set contains all slave columns (in case we are going to fetch
1643  a complete record from slave)
1644 
1645  Write_set equals the m_cols bitmap sent from master but it can be
1646  longer if slave has extra columns.
1647  */
1648 
1649  DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
1650 
1651  bitmap_set_all(table->read_set);
1652  bitmap_set_all(table->write_set);
1653  if (!get_flags(COMPLETE_ROWS_F))
1654  bitmap_intersect(table->write_set,&m_cols);
1655 
1656  // Do event specific preparations
1657 
1658  error= do_before_row_operations(rli);
1659 
1660  // row processing loop
1661 
1662  while (error == 0 && m_curr_row < m_rows_end)
1663  {
1664  /* in_use can have been set to NULL in close_tables_for_reopen */
1665  THD* old_thd= table->in_use;
1666  if (!table->in_use)
1667  table->in_use= thd;
1668 
1669  error= do_exec_row(rli);
1670 
1671  DBUG_PRINT("info", ("error: %d", error));
1672  DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
1673 
1674  table->in_use = old_thd;
1675  switch (error)
1676  {
1677  case 0:
1678  break;
1679 
1680  /* Some recoverable errors */
1681  case HA_ERR_RECORD_CHANGED:
1682  case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
1683  tuple does not exist */
1684  error= 0;
1685  break;
1686 
1687  default:
1688  rli->report(ERROR_LEVEL, thd->net.last_errno,
1689  "Error in %s event: row application failed. %s",
1690  get_type_str(),
1691  thd->net.last_error ? thd->net.last_error : "");
1692  thd->is_slave_error= 1;
1693  break;
1694  }
1695 
1696  /*
1697  If m_curr_row_end was not set during event execution (e.g., because
1698  of errors) we can't proceed to the next row. If the error is transient
1699  (i.e., error==0 at this point) we must call unpack_current_row() to set
1700  m_curr_row_end.
1701  */
1702 
1703  DBUG_PRINT("info", ("error: %d", error));
1704  DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
1705  (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
1706 
1707  if (!m_curr_row_end && !error)
1708  unpack_current_row(rli);
1709 
1710  // at this moment m_curr_row_end should be set
1711  DBUG_ASSERT(error || m_curr_row_end != NULL);
1712  DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
1713  DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
1714 
1715  m_curr_row= m_curr_row_end;
1716 
1717  } // row processing loop
1718 
1719  DBUG_EXECUTE_IF("stop_slave_middle_group",
1720  const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1721  error= do_after_row_operations(rli, error);
1722  } // if (table)
1723 
1724  if (error)
1725  { /* error has occured during the transaction */
1726  rli->report(ERROR_LEVEL, thd->net.last_errno,
1727  "Error in %s event: error during transaction execution "
1728  "on table %s.%s. %s",
1729  get_type_str(), table->s->db.str,
1730  table->s->table_name.str,
1731  thd->net.last_error ? thd->net.last_error : "");
1732 
1733  /*
1734  If one day we honour --skip-slave-errors in row-based replication, and
1735  the error should be skipped, then we would clear mappings, rollback,
1736  close tables, but the slave SQL thread would not stop and then may
1737  assume the mapping is still available, the tables are still open...
1738  So then we should clear mappings/rollback/close here only if this is a
1739  STMT_END_F.
1740  For now we code, knowing that error is not skippable and so slave SQL
1741  thread is certainly going to stop.
1742  rollback at the caller along with sbr.
1743  */
1744  thd->reset_current_stmt_binlog_format_row();
1745  const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
1746  thd->is_slave_error= 1;
1747  DBUG_RETURN(error);
1748  }
1749 
1750  /*
1751  This code would ideally be placed in do_update_pos() instead, but
1752  since we have no access to table there, we do the setting of
1753  last_event_start_time here instead.
1754  */
1755  if (table && (table->s->primary_key == MAX_KEY) &&
1756  !is_using_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1757  {
1758  /*
1759  ------------ Temporary fix until WL#2975 is implemented ---------
1760 
1761  This event is not the last one (no STMT_END_F). If we stop now
1762  (in case of terminate_slave_thread()), how will we restart? We
1763  have to restart from Table_map_log_event, but as this table is
1764  not transactional, the rows already inserted will still be
1765  present, and idempotency is not guaranteed (no PK) so we risk
1766  that repeating leads to double insert. So we desperately try to
1767  continue, hope we'll eventually leave this buggy situation (by
1768  executing the final Old_rows_log_event). If we are in a hopeless
1769  wait (reached end of last relay log and nothing gets appended
1770  there), we timeout after one minute, and notify DBA about the
1771  problem. When WL#2975 is implemented, just remove the member
1772  Relay_log_info::last_event_start_time and all its occurrences.
1773  */
1774  const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
1775  }
1776 
1777  if (get_flags(STMT_END_F))
1778  {
1779  /*
1780  This is the end of a statement or transaction, so close (and
1781  unlock) the tables we opened when processing the
1782  Table_map_log_event starting the statement.
1783 
1784  OBSERVER. This will clear *all* mappings, not only those that
1785  are open for the table. There is not good handle for on-close
1786  actions for tables.
1787 
1788  NOTE. Even if we have no table ('table' == 0) we still need to be
1789  here, so that we increase the group relay log position. If we didn't, we
1790  could have a group relay log position which lags behind "forever"
1791  (assume the last master's transaction is ignored by the slave because of
1792  replicate-ignore rules).
1793  */
1794  int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1795 
1796  /*
1797  If this event is not in a transaction, the call below will, if some
1798  transactional storage engines are involved, commit the statement into
1799  them and flush the pending event to binlog.
1800  If this event is in a transaction, the call will do nothing, but a
1801  Xid_log_event will come next which will, if some transactional engines
1802  are involved, commit the transaction and flush the pending event to the
1803  binlog.
1804  If there was a deadlock the transaction should have been rolled back
1805  already. So there should be no need to rollback the transaction.
1806  */
1807  DBUG_ASSERT(! thd->transaction_rollback_request);
1808  if ((error= (binlog_error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd))))
1809  rli->report(ERROR_LEVEL, error,
1810  "Error in %s event: commit of row events failed, "
1811  "table `%s`.`%s`",
1812  get_type_str(), m_table->s->db.str,
1813  m_table->s->table_name.str);
1814  error|= binlog_error;
1815 
1816  /*
1817  Now what if this is not a transactional engine? we still need to
1818  flush the pending event to the binlog; we did it with
1819  thd->binlog_flush_pending_rows_event(). Note that we imitate
1820  what is done for real queries: a call to
1821  ha_autocommit_or_rollback() (sometimes only if involves a
1822  transactional engine), and a call to be sure to have the pending
1823  event flushed.
1824  */
1825 
1826  thd->reset_current_stmt_binlog_format_row();
1827  const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
1828  }
1829 
1830  DBUG_RETURN(error);
1831 }
1832 
1833 
1835 Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
1836 {
1837  /*
1838  If the slave skip counter is 1 and this event does not end a
1839  statement, then we should not start executing on the next event.
1840  Otherwise, we defer the decision to the normal skipping logic.
1841  */
1842  if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1844  else
1845  return Log_event::do_shall_skip(rli);
1846 }
1847 
1848 int
1849 Old_rows_log_event::do_update_pos(Relay_log_info *rli)
1850 {
1851  DBUG_ENTER("Old_rows_log_event::do_update_pos");
1852  int error= 0;
1853 
1854  DBUG_PRINT("info", ("flags: %s",
1855  get_flags(STMT_END_F) ? "STMT_END_F " : ""));
1856 
1857  if (get_flags(STMT_END_F))
1858  {
1859  /*
1860  Indicate that a statement is finished.
1861  Step the group log position if we are not in a transaction,
1862  otherwise increase the event log position.
1863  */
1864  rli->stmt_done(log_pos);
1865  /*
1866  Clear any errors in thd->net.last_err*. It is not known if this is
1867  needed or not. It is believed that any errors that may exist in
1868  thd->net.last_err* are allowed. Examples of errors are "key not
1869  found", which is produced in the test case rpl_row_conflicts.test
1870  */
1871  thd->clear_error();
1872  }
1873  else
1874  {
1875  rli->inc_event_relay_log_pos();
1876  }
1877 
1878  DBUG_RETURN(error);
1879 }
1880 
1881 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
1882 
1883 
1884 #ifndef MYSQL_CLIENT
1885 bool Old_rows_log_event::write_data_header(IO_CACHE *file)
1886 {
1887  // This method should not be reached.
1888  assert(0);
1889  return TRUE;
1890 }
1891 
1892 
1893 bool Old_rows_log_event::write_data_body(IO_CACHE*file)
1894 {
1895  /*
1896  Note that this should be the number of *bits*, not the number of
1897  bytes.
1898  */
1899  uchar sbuf[sizeof(m_width)];
1900  my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
1901 
1902  // This method should not be reached.
1903  assert(0);
1904 
1905  bool res= false;
1906  uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
1907  DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
1908 
1909  DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
1910  res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
1911 
1912  DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1913  res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
1914  no_bytes_in_map(&m_cols));
1915  DBUG_DUMP("rows", m_rows_buf, data_size);
1916  res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
1917 
1918  return res;
1919 
1920 }
1921 #endif
1922 
1923 
1924 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1925 int Old_rows_log_event::pack_info(Protocol *protocol)
1926 {
1927  char buf[256];
1928  char const *const flagstr=
1929  get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
1930  size_t bytes= my_snprintf(buf, sizeof(buf),
1931  "table_id: %llu%s", m_table_id.id(), flagstr);
1932  protocol->store(buf, bytes, &my_charset_bin);
1933  return 0;
1934 }
1935 #endif
1936 
1937 
1938 #ifdef MYSQL_CLIENT
1939 void Old_rows_log_event::print_helper(FILE *file,
1940  PRINT_EVENT_INFO *print_event_info,
1941  char const *const name)
1942 {
1943  IO_CACHE *const head= &print_event_info->head_cache;
1944  IO_CACHE *const body= &print_event_info->body_cache;
1945  if (!print_event_info->short_form)
1946  {
1947  bool const last_stmt_event= get_flags(STMT_END_F);
1948  print_header(head, print_event_info, !last_stmt_event);
1949  my_b_printf(head, "\t%s: table id %llu%s\n",
1950  name, m_table_id.id(),
1951  last_stmt_event ? " flags: STMT_END_F" : "");
1952  print_base64(body, print_event_info, !last_stmt_event);
1953  }
1954 }
1955 #endif
1956 
1957 
1958 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1959 
1994 int
1995 Old_rows_log_event::write_row(const Relay_log_info *const rli,
1996  const bool overwrite)
1997 {
1998  DBUG_ENTER("write_row");
1999  DBUG_ASSERT(m_table != NULL && thd != NULL);
2000 
2001  TABLE *table= m_table; // pointer to event's table
2002  int error;
2003  int keynum;
2004  auto_afree_ptr<char> key(NULL);
2005 
2006  /* fill table->record[0] with default values */
2007 
2008  if ((error= prepare_record(table, table->write_set,
2009  TRUE /* check if columns have def. values */)))
2010  DBUG_RETURN(error);
2011 
2012  /* unpack row into table->record[0] */
2013  error= unpack_current_row(rli); // TODO: how to handle errors?
2014 
2015 #ifndef DBUG_OFF
2016  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2017  DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
2018  DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
2019 #endif
2020 
2021  /*
2022  Try to write record. If a corresponding record already exists in the table,
2023  we try to change it using ha_update_row() if possible. Otherwise we delete
2024  it and repeat the whole process again.
2025 
2026  TODO: Add safety measures against infinite looping.
2027  */
2028 
2029  while ((error= table->file->ha_write_row(table->record[0])))
2030  {
2031  if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
2032  {
2033  table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
2034  DBUG_RETURN(error);
2035  }
2036  if ((keynum= table->file->get_dup_key(error)) < 0)
2037  {
2038  DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
2039  table->file->print_error(error, MYF(0));
2040  /*
2041  We failed to retrieve the duplicate key
2042  - either because the error was not "duplicate key" error
2043  - or because the information which key is not available
2044  */
2045  DBUG_RETURN(error);
2046  }
2047 
2048  /*
2049  We need to retrieve the old row into record[1] to be able to
2050  either update or delete the offending record. We either:
2051 
2052  - use ha_rnd_pos() with a row-id (available as dupp_row) to the
2053  offending row, if that is possible (MyISAM and Blackhole), or else
2054 
2055  - use ha_index_read_idx_map() with the key that is duplicated, to
2056  retrieve the offending row.
2057  */
2058  if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
2059  {
2060  DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
2061  error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
2062  if (error)
2063  {
2064  DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
2065  if (error == HA_ERR_RECORD_DELETED)
2066  error= HA_ERR_KEY_NOT_FOUND;
2067  table->file->print_error(error, MYF(0));
2068  DBUG_RETURN(error);
2069  }
2070  }
2071  else
2072  {
2073  DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
2074 
2075  if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
2076  {
2077  DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
2078  DBUG_RETURN(my_errno);
2079  }
2080 
2081  if (key.get() == NULL)
2082  {
2083  key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
2084  if (key.get() == NULL)
2085  {
2086  DBUG_PRINT("info",("Can't allocate key buffer"));
2087  DBUG_RETURN(ENOMEM);
2088  }
2089  }
2090 
2091  key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
2092  0);
2093  error= table->file->ha_index_read_idx_map(table->record[1], keynum,
2094  (const uchar*)key.get(),
2095  HA_WHOLE_KEY,
2096  HA_READ_KEY_EXACT);
2097  if (error)
2098  {
2099  DBUG_PRINT("info",("ha_index_read_idx_map() returns error %d", error));
2100  if (error == HA_ERR_RECORD_DELETED)
2101  error= HA_ERR_KEY_NOT_FOUND;
2102  table->file->print_error(error, MYF(0));
2103  DBUG_RETURN(error);
2104  }
2105  }
2106 
2107  /*
2108  Now, record[1] should contain the offending row. That
2109  will enable us to update it or, alternatively, delete it (so
2110  that we can insert the new row afterwards).
2111  */
2112 
2113  /*
2114  If row is incomplete we will use the record found to fill
2115  missing columns.
2116  */
2117  if (!get_flags(COMPLETE_ROWS_F))
2118  {
2119  restore_record(table,record[1]);
2120  error= unpack_current_row(rli);
2121  }
2122 
2123 #ifndef DBUG_OFF
2124  DBUG_PRINT("debug",("preparing for update: before and after image"));
2125  DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
2126  DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
2127 #endif
2128 
2129  /*
2130  REPLACE is defined as either INSERT or DELETE + INSERT. If
2131  possible, we can replace it with an UPDATE, but that will not
2132  work on InnoDB if FOREIGN KEY checks are necessary.
2133 
2134  I (Matz) am not sure of the reason for the last_uniq_key()
2135  check as, but I'm guessing that it's something along the
2136  following lines.
2137 
2138  Suppose that we got the duplicate key to be a key that is not
2139  the last unique key for the table and we perform an update:
2140  then there might be another key for which the unique check will
2141  fail, so we're better off just deleting the row and inserting
2142  the correct row.
2143  */
2144  if (last_uniq_key(table, keynum) &&
2145  !table->file->referenced_by_foreign_key())
2146  {
2147  DBUG_PRINT("info",("Updating row using ha_update_row()"));
2148  error=table->file->ha_update_row(table->record[1],
2149  table->record[0]);
2150  switch (error) {
2151 
2152  case HA_ERR_RECORD_IS_THE_SAME:
2153  DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2154  " ha_update_row()"));
2155  error= 0;
2156 
2157  case 0:
2158  break;
2159 
2160  default:
2161  DBUG_PRINT("info",("ha_update_row() returns error %d",error));
2162  table->file->print_error(error, MYF(0));
2163  }
2164 
2165  DBUG_RETURN(error);
2166  }
2167  else
2168  {
2169  DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
2170  if ((error= table->file->ha_delete_row(table->record[1])))
2171  {
2172  DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
2173  table->file->print_error(error, MYF(0));
2174  DBUG_RETURN(error);
2175  }
2176  /* Will retry ha_write_row() with the offending row removed. */
2177  }
2178  }
2179 
2180  DBUG_RETURN(error);
2181 }
2182 
2183 
2211 int Old_rows_log_event::find_row(const Relay_log_info *rli)
2212 {
2213  DBUG_ENTER("find_row");
2214 
2215  DBUG_ASSERT(m_table && m_table->in_use != NULL);
2216 
2217  TABLE *table= m_table;
2218  int error;
2219 
2220  /* unpack row - missing fields get default values */
2221 
2222  // TODO: shall we check and report errors here?
2223  prepare_record(table, table->read_set, FALSE /* don't check errors */);
2224  error= unpack_current_row(rli);
2225 
2226 #ifndef DBUG_OFF
2227  DBUG_PRINT("info",("looking for the following record"));
2228  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2229 #endif
2230 
2231  if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2232  table->s->primary_key < MAX_KEY)
2233  {
2234  /*
2235  Use a more efficient method to fetch the record given by
2236  table->record[0] if the engine allows it. We first compute a
2237  row reference using the position() member function (it will be
2238  stored in table->file->ref) and the use rnd_pos() to position
2239  the "cursor" (i.e., record[0] in this case) at the correct row.
2240 
2241  TODO: Add a check that the correct record has been fetched by
2242  comparing with the original record. Take into account that the
2243  record on the master and slave can be of different
2244  length. Something along these lines should work:
2245 
2246  ADD>>> store_record(table,record[1]);
2247  int error= table->file->rnd_pos(table->record[0], table->file->ref);
2248  ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
2249  table->s->reclength) == 0);
2250 
2251  */
2252  DBUG_PRINT("info",("locating record using primary key (position)"));
2253  int error= table->file->rnd_pos_by_record(table->record[0]);
2254  if (error)
2255  {
2256  DBUG_PRINT("info",("rnd_pos returns error %d",error));
2257  if (error == HA_ERR_RECORD_DELETED)
2258  error= HA_ERR_KEY_NOT_FOUND;
2259  table->file->print_error(error, MYF(0));
2260  }
2261  DBUG_RETURN(error);
2262  }
2263 
2264  // We can't use position() - try other methods.
2265 
2266  /*
2267  We need to retrieve all fields
2268  TODO: Move this out from this function to main loop
2269  */
2270  table->use_all_columns();
2271 
2272  /*
2273  Save copy of the record in table->record[1]. It might be needed
2274  later if linear search is used to find exact match.
2275  */
2276  store_record(table,record[1]);
2277 
2278  if (table->s->keys > 0)
2279  {
2280  DBUG_PRINT("info",("locating record using primary key (index_read)"));
2281 
2282  /* We have a key: search the table using the index */
2283  if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
2284  {
2285  DBUG_PRINT("info",("ha_index_init returns error %d",error));
2286  table->file->print_error(error, MYF(0));
2287  DBUG_RETURN(error);
2288  }
2289 
2290  /* Fill key data for the row */
2291 
2292  DBUG_ASSERT(m_key);
2293  key_copy(m_key, table->record[0], table->key_info, 0);
2294 
2295  /*
2296  Don't print debug messages when running valgrind since they can
2297  trigger false warnings.
2298  */
2299 #ifndef HAVE_purify
2300  DBUG_DUMP("key data", m_key, table->key_info->key_length);
2301 #endif
2302 
2303  /*
2304  We need to set the null bytes to ensure that the filler bit are
2305  all set when returning. There are storage engines that just set
2306  the necessary bits on the bytes and don't set the filler bits
2307  correctly.
2308  */
2309  my_ptrdiff_t const pos=
2310  table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2311  table->record[0][pos]= 0xFF;
2312 
2313  if ((error= table->file->ha_index_read_map(table->record[0], m_key,
2314  HA_WHOLE_KEY,
2315  HA_READ_KEY_EXACT)))
2316  {
2317  DBUG_PRINT("info",("no record matching the key found in the table"));
2318  if (error == HA_ERR_RECORD_DELETED)
2319  error= HA_ERR_KEY_NOT_FOUND;
2320  table->file->print_error(error, MYF(0));
2321  table->file->ha_index_end();
2322  DBUG_RETURN(error);
2323  }
2324 
2325  /*
2326  Don't print debug messages when running valgrind since they can
2327  trigger false warnings.
2328  */
2329 #ifndef HAVE_purify
2330  DBUG_PRINT("info",("found first matching record"));
2331  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2332 #endif
2333  /*
2334  Below is a minor "optimization". If the key (i.e., key number
2335  0) has the HA_NOSAME flag set, we know that we have found the
2336  correct record (since there can be no duplicates); otherwise, we
2337  have to compare the record with the one found to see if it is
2338  the correct one.
2339 
2340  CAVEAT! This behaviour is essential for the replication of,
2341  e.g., the mysql.proc table since the correct record *shall* be
2342  found using the primary key *only*. There shall be no
2343  comparison of non-PK columns to decide if the correct record is
2344  found. I can see no scenario where it would be incorrect to
2345  chose the row to change only using a PK or an UNNI.
2346  */
2347  if (table->key_info->flags & HA_NOSAME)
2348  {
2349  /* Unique does not have non nullable part */
2350  if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
2351  {
2352  table->file->ha_index_end();
2353  DBUG_RETURN(0);
2354  }
2355  else
2356  {
2357  KEY *keyinfo= table->key_info;
2358  /*
2359  Unique has nullable part. We need to check if there is any field in the
2360  BI image that is null and part of UNNI.
2361  */
2362  bool null_found= FALSE;
2363  for (uint i=0; i < keyinfo->user_defined_key_parts && !null_found; i++)
2364  {
2365  uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
2366  Field **f= table->field+fieldnr;
2367  null_found= (*f)->is_null();
2368  }
2369 
2370  if (!null_found)
2371  {
2372  table->file->ha_index_end();
2373  DBUG_RETURN(0);
2374  }
2375 
2376  /* else fall through to index scan */
2377  }
2378  }
2379 
2380  /*
2381  In case key is not unique, we still have to iterate over records found
2382  and find the one which is identical to the row given. A copy of the
2383  record we are looking for is stored in record[1].
2384  */
2385  DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
2386 
2387  while (record_compare(table))
2388  {
2389  /*
2390  We need to set the null bytes to ensure that the filler bit
2391  are all set when returning. There are storage engines that
2392  just set the necessary bits on the bytes and don't set the
2393  filler bits correctly.
2394 
2395  TODO[record format ndb]: Remove this code once NDB returns the
2396  correct record format.
2397  */
2398  if (table->s->null_bytes > 0)
2399  {
2400  table->record[0][table->s->null_bytes - 1]|=
2401  256U - (1U << table->s->last_null_bit_pos);
2402  }
2403 
2404  while ((error= table->file->ha_index_next(table->record[0])))
2405  {
2406  /* We just skip records that has already been deleted */
2407  if (error == HA_ERR_RECORD_DELETED)
2408  continue;
2409  DBUG_PRINT("info",("no record matching the given row found"));
2410  table->file->print_error(error, MYF(0));
2411  (void) table->file->ha_index_end();
2412  DBUG_RETURN(error);
2413  }
2414  }
2415 
2416  /*
2417  Have to restart the scan to be able to fetch the next row.
2418  */
2419  table->file->ha_index_end();
2420  }
2421  else
2422  {
2423  DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
2424 
2425  int restart_count= 0; // Number of times scanning has restarted from top
2426 
2427  /* We don't have a key: search the table using ha_rnd_next() */
2428  if ((error= table->file->ha_rnd_init(1)))
2429  {
2430  DBUG_PRINT("info",("error initializing table scan"
2431  " (ha_rnd_init returns %d)",error));
2432  table->file->print_error(error, MYF(0));
2433  DBUG_RETURN(error);
2434  }
2435 
2436  /* Continue until we find the right record or have made a full loop */
2437  do
2438  {
2439  restart_ha_rnd_next:
2440  error= table->file->ha_rnd_next(table->record[0]);
2441 
2442  switch (error) {
2443 
2444  case 0:
2445  break;
2446 
2447  case HA_ERR_RECORD_DELETED:
2448  goto restart_ha_rnd_next;
2449 
2450  case HA_ERR_END_OF_FILE:
2451  if (++restart_count < 2)
2452  {
2453  if ((error= table->file->ha_rnd_init(1)))
2454  {
2455  table->file->print_error(error, MYF(0));
2456  DBUG_RETURN(error);
2457  }
2458  }
2459  break;
2460 
2461  default:
2462  DBUG_PRINT("info", ("Failed to get next record"
2463  " (ha_rnd_next returns %d)",error));
2464  table->file->print_error(error, MYF(0));
2465  table->file->ha_rnd_end();
2466  DBUG_RETURN(error);
2467  }
2468  }
2469  while (restart_count < 2 && record_compare(table));
2470 
2471  /*
2472  Note: above record_compare will take into accout all record fields
2473  which might be incorrect in case a partial row was given in the event
2474  */
2475 
2476  /*
2477  Have to restart the scan to be able to fetch the next row.
2478  */
2479  if (restart_count == 2)
2480  DBUG_PRINT("info", ("Record not found"));
2481  else
2482  DBUG_DUMP("record found", table->record[0], table->s->reclength);
2483  table->file->ha_rnd_end();
2484 
2485  DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
2486  DBUG_RETURN(error);
2487  }
2488 
2489  DBUG_RETURN(0);
2490 }
2491 
2492 #endif
2493 
2494 
2495 /**************************************************************************
2496  Write_rows_log_event member functions
2497 **************************************************************************/
2498 
2499 /*
2500  Constructor used to build an event for writing to the binary log.
2501  */
2502 #if !defined(MYSQL_CLIENT)
2503 Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2504  TABLE *tbl_arg,
2505  ulong tid_arg,
2506  MY_BITMAP const *cols,
2507  bool is_transactional)
2508  : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
2509 {
2510 
2511  // This constructor should not be reached.
2512  assert(0);
2513 
2514 }
2515 #endif
2516 
2517 
2518 /*
2519  Constructor used by slave to read the event from the binary log.
2520  */
2521 #ifdef HAVE_REPLICATION
2522 Write_rows_log_event_old::Write_rows_log_event_old(const char *buf,
2523  uint event_len,
2525  *description_event)
2526 : Old_rows_log_event(buf, event_len, PRE_GA_WRITE_ROWS_EVENT,
2527  description_event)
2528 {
2529 }
2530 #endif
2531 
2532 
2533 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2534 int
2535 Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2536 {
2537  int error= 0;
2538 
2539  /*
2540  We are using REPLACE semantics and not INSERT IGNORE semantics
2541  when writing rows, that is: new rows replace old rows. We need to
2542  inform the storage engine that it should use this behaviour.
2543  */
2544 
2545  /* Tell the storage engine that we are using REPLACE semantics. */
2546  thd->lex->duplicates= DUP_REPLACE;
2547 
2548  /*
2549  Pretend we're executing a REPLACE command: this is needed for
2550  InnoDB and NDB Cluster since they are not (properly) checking the
2551  lex->duplicates flag.
2552  */
2553  thd->lex->sql_command= SQLCOM_REPLACE;
2554  /*
2555  Do not raise the error flag in case of hitting to an unique attribute
2556  */
2557  m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2558  /*
2559  NDB specific: update from ndb master wrapped as Write_rows
2560  */
2561  /*
2562  so that the event should be applied to replace slave's row
2563  */
2564  m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2565  /*
2566  NDB specific: if update from ndb master wrapped as Write_rows
2567  does not find the row it's assumed idempotent binlog applying
2568  is taking place; don't raise the error.
2569  */
2570  m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2571  /*
2572  TODO: the cluster team (Tomas?) says that it's better if the engine knows
2573  how many rows are going to be inserted, then it can allocate needed memory
2574  from the start.
2575  */
2576  m_table->file->ha_start_bulk_insert(0);
2577  return error;
2578 }
2579 
2580 
2581 int
2582 Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2583  int error)
2584 {
2585  int local_error= 0;
2586  m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2587  m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2588  /*
2589  reseting the extra with
2590  table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
2591  fires bug#27077
2592  todo: explain or fix
2593  */
2594  if ((local_error= m_table->file->ha_end_bulk_insert()))
2595  {
2596  m_table->file->print_error(local_error, MYF(0));
2597  }
2598  return error? error : local_error;
2599 }
2600 
2601 
2602 int
2603 Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2604 {
2605  DBUG_ASSERT(m_table != NULL);
2606  int error= write_row(rli, TRUE /* overwrite */);
2607 
2608  if (error && !thd->net.last_errno)
2609  thd->net.last_errno= error;
2610 
2611  return error;
2612 }
2613 
2614 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2615 
2616 
2617 #ifdef MYSQL_CLIENT
2618 void Write_rows_log_event_old::print(FILE *file,
2619  PRINT_EVENT_INFO* print_event_info)
2620 {
2621  Old_rows_log_event::print_helper(file, print_event_info, "Write_rows_old");
2622 }
2623 #endif
2624 
2625 
2626 /**************************************************************************
2627  Delete_rows_log_event member functions
2628 **************************************************************************/
2629 
2630 /*
2631  Constructor used to build an event for writing to the binary log.
2632  */
2633 
2634 #ifndef MYSQL_CLIENT
2635 Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2636  TABLE *tbl_arg,
2637  ulong tid,
2638  MY_BITMAP const *cols,
2639  bool is_transactional)
2640  : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2641  m_after_image(NULL), m_memory(NULL)
2642 {
2643 
2644  // This constructor should not be reached.
2645  assert(0);
2646 
2647 }
2648 #endif /* #if !defined(MYSQL_CLIENT) */
2649 
2650 
2651 /*
2652  Constructor used by slave to read the event from the binary log.
2653  */
2654 #ifdef HAVE_REPLICATION
2655 Delete_rows_log_event_old::Delete_rows_log_event_old(const char *buf,
2656  uint event_len,
2658  *description_event)
2659  : Old_rows_log_event(buf, event_len, PRE_GA_DELETE_ROWS_EVENT,
2660  description_event),
2661  m_after_image(NULL), m_memory(NULL)
2662 {
2663 }
2664 #endif
2665 
2666 
2667 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2668 
2669 int
2670 Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2671 {
2672  if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2673  m_table->s->primary_key < MAX_KEY)
2674  {
2675  /*
2676  We don't need to allocate any memory for m_key since it is not used.
2677  */
2678  return 0;
2679  }
2680 
2681  if (m_table->s->keys > 0)
2682  {
2683  // Allocate buffer for key searches
2684  m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2685  if (!m_key)
2686  return HA_ERR_OUT_OF_MEM;
2687  }
2688  return 0;
2689 }
2690 
2691 
2692 int
2693 Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2694  int error)
2695 {
2696  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2697  m_table->file->ha_index_or_rnd_end();
2698  my_free(m_key);
2699  m_key= NULL;
2700 
2701  return error;
2702 }
2703 
2704 
2705 int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2706 {
2707  int error;
2708  DBUG_ASSERT(m_table != NULL);
2709 
2710  if (!(error= find_row(rli)))
2711  {
2712  /*
2713  Delete the record found, located in record[0]
2714  */
2715  error= m_table->file->ha_delete_row(m_table->record[0]);
2716  }
2717  return error;
2718 }
2719 
2720 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2721 
2722 
2723 #ifdef MYSQL_CLIENT
2724 void Delete_rows_log_event_old::print(FILE *file,
2725  PRINT_EVENT_INFO* print_event_info)
2726 {
2727  Old_rows_log_event::print_helper(file, print_event_info, "Delete_rows_old");
2728 }
2729 #endif
2730 
2731 
2732 /**************************************************************************
2733  Update_rows_log_event member functions
2734 **************************************************************************/
2735 
2736 /*
2737  Constructor used to build an event for writing to the binary log.
2738  */
2739 #if !defined(MYSQL_CLIENT)
2740 Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2741  TABLE *tbl_arg,
2742  ulong tid,
2743  MY_BITMAP const *cols,
2744  bool is_transactional)
2745  : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2746  m_after_image(NULL), m_memory(NULL)
2747 {
2748 
2749  // This constructor should not be reached.
2750  assert(0);
2751 }
2752 #endif /* !defined(MYSQL_CLIENT) */
2753 
2754 
2755 /*
2756  Constructor used by slave to read the event from the binary log.
2757  */
2758 #ifdef HAVE_REPLICATION
2759 Update_rows_log_event_old::Update_rows_log_event_old(const char *buf,
2760  uint event_len,
2761  const
2763  *description_event)
2764  : Old_rows_log_event(buf, event_len, PRE_GA_UPDATE_ROWS_EVENT,
2765  description_event),
2766  m_after_image(NULL), m_memory(NULL)
2767 {
2768 }
2769 #endif
2770 
2771 
2772 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2773 
2774 int
2775 Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2776 {
2777  if (m_table->s->keys > 0)
2778  {
2779  // Allocate buffer for key searches
2780  m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2781  if (!m_key)
2782  return HA_ERR_OUT_OF_MEM;
2783  }
2784 
2785  return 0;
2786 }
2787 
2788 
2789 int
2790 Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2791  int error)
2792 {
2793  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2794  m_table->file->ha_index_or_rnd_end();
2795  my_free(m_key); // Free for multi_malloc
2796  m_key= NULL;
2797 
2798  return error;
2799 }
2800 
2801 
2802 int
2803 Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2804 {
2805  DBUG_ASSERT(m_table != NULL);
2806 
2807  int error= find_row(rli);
2808  if (error)
2809  {
2810  /*
2811  We need to read the second image in the event of error to be
2812  able to skip to the next pair of updates
2813  */
2814  m_curr_row= m_curr_row_end;
2815  unpack_current_row(rli);
2816  return error;
2817  }
2818 
2819  /*
2820  This is the situation after locating BI:
2821 
2822  ===|=== before image ====|=== after image ===|===
2823  ^ ^
2824  m_curr_row m_curr_row_end
2825 
2826  BI found in the table is stored in record[0]. We copy it to record[1]
2827  and unpack AI to record[0].
2828  */
2829 
2830  store_record(m_table,record[1]);
2831 
2832  m_curr_row= m_curr_row_end;
2833  error= unpack_current_row(rli); // this also updates m_curr_row_end
2834 
2835  /*
2836  Now we have the right row to update. The old row (the one we're
2837  looking for) is in record[1] and the new row is in record[0].
2838  */
2839 #ifndef HAVE_purify
2840  /*
2841  Don't print debug messages when running valgrind since they can
2842  trigger false warnings.
2843  */
2844  DBUG_PRINT("info",("Updating row in table"));
2845  DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
2846  DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
2847 #endif
2848 
2849  error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2850  if (error == HA_ERR_RECORD_IS_THE_SAME)
2851  error= 0;
2852 
2853  return error;
2854 }
2855 
2856 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2857 
2858 
2859 #ifdef MYSQL_CLIENT
2860 void Update_rows_log_event_old::print(FILE *file,
2861  PRINT_EVENT_INFO* print_event_info)
2862 {
2863  Old_rows_log_event::print_helper(file, print_event_info, "Update_rows_old");
2864 }
2865 #endif