MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_utility.cc
1 /* Copyright (c) 2006, 2013, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 #include "rpl_utility.h"
17 
18 #ifndef MYSQL_CLIENT
19 #include "unireg.h" // REQUIRED by later includes
20 #include "rpl_rli.h"
21 #include "sql_tmp_table.h" // tmp tables
22 #include "rpl_rli.h"
23 #include "log_event.h"
24 
25 #include <algorithm>
26 
27 using std::min;
28 using std::max;
29 
30 
35 static int compare(size_t a, size_t b)
36 {
37  if (a < b)
38  return -1;
39  if (b < a)
40  return 1;
41  return 0;
42 }
43 
44 
50 static uint32 uint_max(int bits) {
51  return (((1UL << (bits - 1)) - 1) << 1) | 1;
52 }
53 
54 
62 static uint32
63 max_display_length_for_field(enum_field_types sql_type, unsigned int metadata)
64 {
65  DBUG_PRINT("debug", ("sql_type: %d, metadata: 0x%x", sql_type, metadata));
66  DBUG_ASSERT(metadata >> 16 == 0);
67 
68  switch (sql_type) {
69  case MYSQL_TYPE_NEWDECIMAL:
70  return metadata >> 8;
71 
72  case MYSQL_TYPE_FLOAT:
73  return 12;
74 
75  case MYSQL_TYPE_DOUBLE:
76  return 22;
77 
78  case MYSQL_TYPE_SET:
79  case MYSQL_TYPE_ENUM:
80  return metadata & 0x00ff;
81 
82  case MYSQL_TYPE_STRING:
83  {
84  uchar type= metadata >> 8;
85  if (type == MYSQL_TYPE_SET || type == MYSQL_TYPE_ENUM)
86  return metadata & 0xff;
87  else
88  /* This is taken from Field_string::unpack. */
89  return (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
90  }
91 
92  case MYSQL_TYPE_YEAR:
93  case MYSQL_TYPE_TINY:
94  return 4;
95 
96  case MYSQL_TYPE_SHORT:
97  return 6;
98 
99  case MYSQL_TYPE_INT24:
100  return 9;
101 
102  case MYSQL_TYPE_LONG:
103  return 11;
104 
105 #ifdef HAVE_LONG_LONG
106  case MYSQL_TYPE_LONGLONG:
107  return 20;
108 
109 #endif
110  case MYSQL_TYPE_NULL:
111  return 0;
112 
113  case MYSQL_TYPE_NEWDATE:
114  return 3;
115 
116  case MYSQL_TYPE_DATE:
117  case MYSQL_TYPE_TIME:
118  case MYSQL_TYPE_TIME2:
119  return 3;
120 
121  case MYSQL_TYPE_TIMESTAMP:
122  case MYSQL_TYPE_TIMESTAMP2:
123  return 4;
124 
125  case MYSQL_TYPE_DATETIME:
126  case MYSQL_TYPE_DATETIME2:
127  return 8;
128 
129  case MYSQL_TYPE_BIT:
130  /*
131  Decode the size of the bit field from the master.
132  */
133  DBUG_ASSERT((metadata & 0xff) <= 7);
134  return 8 * (metadata >> 8U) + (metadata & 0x00ff);
135 
136  case MYSQL_TYPE_VAR_STRING:
137  case MYSQL_TYPE_VARCHAR:
138  return metadata;
139 
140  /*
141  The actual length for these types does not really matter since
142  they are used to calc_pack_length, which ignores the given
143  length for these types.
144 
145  Since we want this to be accurate for other uses, we return the
146  maximum size in bytes of these BLOBs.
147  */
148 
149  case MYSQL_TYPE_TINY_BLOB:
150  return uint_max(1 * 8);
151 
152  case MYSQL_TYPE_MEDIUM_BLOB:
153  return uint_max(3 * 8);
154 
155  case MYSQL_TYPE_BLOB:
156  /*
157  For the blob type, Field::real_type() lies and say that all
158  blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look
159  at the length instead to decide what the max display size is.
160  */
161  return uint_max(metadata * 8);
162 
163  case MYSQL_TYPE_LONG_BLOB:
164  case MYSQL_TYPE_GEOMETRY:
165  return uint_max(4 * 8);
166 
167  default:
168  return ~(uint32) 0;
169  }
170 }
171 
172 
173 /*
174  Compare the pack lengths of a source field (on the master) and a
175  target field (on the slave).
176 
177  @param field Target field.
178  @param type Source field type.
179  @param metadata Source field metadata.
180 
181  @retval -1 The length of the source field is smaller than the target field.
182  @retval 0 The length of the source and target fields are the same.
183  @retval 1 The length of the source field is greater than the target field.
184  */
185 int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata)
186 {
187  DBUG_ENTER("compare_lengths");
188  size_t const source_length=
189  max_display_length_for_field(source_type, metadata);
190  size_t const target_length= field->max_display_length();
191  DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
192  " target_length: %lu, target_type: %u",
193  (unsigned long) source_length, source_type,
194  (unsigned long) target_length, field->real_type()));
195  int result= compare(source_length, target_length);
196  DBUG_PRINT("result", ("%d", result));
197  DBUG_RETURN(result);
198 }
199 
200 
201 /*********************************************************************
202  * table_def member definitions *
203  *********************************************************************/
204 
205 /*
206  This function returns the field size in raw bytes based on the type
207  and the encoded field data from the master's raw data.
208 */
209 uint32 table_def::calc_field_size(uint col, uchar *master_data) const
210 {
211  uint32 length;
212 
213  switch (type(col)) {
214  case MYSQL_TYPE_NEWDECIMAL:
215  length= my_decimal_get_binary_size(m_field_metadata[col] >> 8,
216  m_field_metadata[col] & 0xff);
217  break;
218  case MYSQL_TYPE_DECIMAL:
219  case MYSQL_TYPE_FLOAT:
220  case MYSQL_TYPE_DOUBLE:
221  length= m_field_metadata[col];
222  break;
223  /*
224  The cases for SET and ENUM are include for completeness, however
225  both are mapped to type MYSQL_TYPE_STRING and their real types
226  are encoded in the field metadata.
227  */
228  case MYSQL_TYPE_SET:
229  case MYSQL_TYPE_ENUM:
230  case MYSQL_TYPE_STRING:
231  {
232  uchar type= m_field_metadata[col] >> 8U;
233  if ((type == MYSQL_TYPE_SET) || (type == MYSQL_TYPE_ENUM))
234  length= m_field_metadata[col] & 0x00ff;
235  else
236  {
237  /*
238  We are reading the actual size from the master_data record
239  because this field has the actual lengh stored in the first
240  one or two bytes.
241  */
242  length= max_display_length_for_field(MYSQL_TYPE_STRING, m_field_metadata[col]) > 255 ? 2 : 1;
243 
244  /* As in Field_string::unpack */
245  length+= ((length == 1) ? *master_data : uint2korr(master_data));
246  }
247  break;
248  }
249  case MYSQL_TYPE_YEAR:
250  case MYSQL_TYPE_TINY:
251  length= 1;
252  break;
253  case MYSQL_TYPE_SHORT:
254  length= 2;
255  break;
256  case MYSQL_TYPE_INT24:
257  length= 3;
258  break;
259  case MYSQL_TYPE_LONG:
260  length= 4;
261  break;
262 #ifdef HAVE_LONG_LONG
263  case MYSQL_TYPE_LONGLONG:
264  length= 8;
265  break;
266 #endif
267  case MYSQL_TYPE_NULL:
268  length= 0;
269  break;
270  case MYSQL_TYPE_NEWDATE:
271  length= 3;
272  break;
273  case MYSQL_TYPE_DATE:
274  case MYSQL_TYPE_TIME:
275  length= 3;
276  break;
277  case MYSQL_TYPE_TIME2:
278  length= my_time_binary_length(m_field_metadata[col]);
279  break;
280  case MYSQL_TYPE_TIMESTAMP:
281  length= 4;
282  break;
283  case MYSQL_TYPE_TIMESTAMP2:
284  length= my_timestamp_binary_length(m_field_metadata[col]);
285  break;
286  case MYSQL_TYPE_DATETIME:
287  length= 8;
288  break;
289  case MYSQL_TYPE_DATETIME2:
290  length= my_datetime_binary_length(m_field_metadata[col]);
291  break;
292  case MYSQL_TYPE_BIT:
293  {
294  /*
295  Decode the size of the bit field from the master.
296  from_len is the length in bytes from the master
297  from_bit_len is the number of extra bits stored in the master record
298  If from_bit_len is not 0, add 1 to the length to account for accurate
299  number of bytes needed.
300  */
301  uint from_len= (m_field_metadata[col] >> 8U) & 0x00ff;
302  uint from_bit_len= m_field_metadata[col] & 0x00ff;
303  DBUG_ASSERT(from_bit_len <= 7);
304  length= from_len + ((from_bit_len > 0) ? 1 : 0);
305  break;
306  }
307  case MYSQL_TYPE_VARCHAR:
308  {
309  length= m_field_metadata[col] > 255 ? 2 : 1; // c&p of Field_varstring::data_length()
310  length+= length == 1 ? (uint32) *master_data : uint2korr(master_data);
311  break;
312  }
313  case MYSQL_TYPE_TINY_BLOB:
314  case MYSQL_TYPE_MEDIUM_BLOB:
315  case MYSQL_TYPE_LONG_BLOB:
316  case MYSQL_TYPE_BLOB:
317  case MYSQL_TYPE_GEOMETRY:
318  {
319 #if 1
320  /*
321  BUG#29549:
322  This is currently broken for NDB, which is using big-endian
323  order when packing length of BLOB. Once they have decided how to
324  fix the issue, we can enable the code below to make sure to
325  always read the length in little-endian order.
326  */
327  Field_blob fb(m_field_metadata[col]);
328  length= fb.get_packed_size(master_data, TRUE);
329 #else
330  /*
331  Compute the length of the data. We cannot use get_length() here
332  since it is dependent on the specific table (and also checks the
333  packlength using the internal 'table' pointer) and replication
334  is using a fixed format for storing data in the binlog.
335  */
336  switch (m_field_metadata[col]) {
337  case 1:
338  length= *master_data;
339  break;
340  case 2:
341  length= uint2korr(master_data);
342  break;
343  case 3:
344  length= uint3korr(master_data);
345  break;
346  case 4:
347  length= uint4korr(master_data);
348  break;
349  default:
350  DBUG_ASSERT(0); // Should not come here
351  break;
352  }
353 
354  length+= m_field_metadata[col];
355 #endif
356  break;
357  }
358  default:
359  length= ~(uint32) 0;
360  }
361  return length;
362 }
363 
364 
367 static void show_sql_type(enum_field_types type, uint16 metadata, String *str,
368  const CHARSET_INFO *field_cs)
369 {
370  DBUG_ENTER("show_sql_type");
371  DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata));
372 
373  switch (type)
374  {
375  case MYSQL_TYPE_TINY:
376  str->set_ascii(STRING_WITH_LEN("tinyint"));
377  break;
378 
379  case MYSQL_TYPE_SHORT:
380  str->set_ascii(STRING_WITH_LEN("smallint"));
381  break;
382 
383  case MYSQL_TYPE_LONG:
384  str->set_ascii(STRING_WITH_LEN("int"));
385  break;
386 
387  case MYSQL_TYPE_FLOAT:
388  str->set_ascii(STRING_WITH_LEN("float"));
389  break;
390 
391  case MYSQL_TYPE_DOUBLE:
392  str->set_ascii(STRING_WITH_LEN("double"));
393  break;
394 
395  case MYSQL_TYPE_NULL:
396  str->set_ascii(STRING_WITH_LEN("null"));
397  break;
398 
399  case MYSQL_TYPE_TIMESTAMP:
400  case MYSQL_TYPE_TIMESTAMP2:
401  str->set_ascii(STRING_WITH_LEN("timestamp"));
402  break;
403 
404  case MYSQL_TYPE_LONGLONG:
405  str->set_ascii(STRING_WITH_LEN("bigint"));
406  break;
407 
408  case MYSQL_TYPE_INT24:
409  str->set_ascii(STRING_WITH_LEN("mediumint"));
410  break;
411 
412  case MYSQL_TYPE_NEWDATE:
413  case MYSQL_TYPE_DATE:
414  str->set_ascii(STRING_WITH_LEN("date"));
415  break;
416 
417  case MYSQL_TYPE_TIME:
418  case MYSQL_TYPE_TIME2:
419  str->set_ascii(STRING_WITH_LEN("time"));
420  break;
421 
422  case MYSQL_TYPE_DATETIME:
423  case MYSQL_TYPE_DATETIME2:
424  str->set_ascii(STRING_WITH_LEN("datetime"));
425  break;
426 
427  case MYSQL_TYPE_YEAR:
428  str->set_ascii(STRING_WITH_LEN("year"));
429  break;
430 
431  case MYSQL_TYPE_VAR_STRING:
432  case MYSQL_TYPE_VARCHAR:
433  {
434  const CHARSET_INFO *cs= str->charset();
435  uint32 length=
436  cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
437  "varchar(%u)", metadata);
438  str->length(length);
439  }
440  break;
441 
442  case MYSQL_TYPE_BIT:
443  {
444  const CHARSET_INFO *cs= str->charset();
445  int bit_length= 8 * (metadata >> 8) + (metadata & 0xFF);
446  uint32 length=
447  cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
448  "bit(%d)", bit_length);
449  str->length(length);
450  }
451  break;
452 
453  case MYSQL_TYPE_DECIMAL:
454  {
455  const CHARSET_INFO *cs= str->charset();
456  uint32 length=
457  cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
458  "decimal(%d,?)", metadata);
459  str->length(length);
460  }
461  break;
462 
463  case MYSQL_TYPE_NEWDECIMAL:
464  {
465  const CHARSET_INFO *cs= str->charset();
466  uint32 length=
467  cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
468  "decimal(%d,%d)", metadata >> 8, metadata & 0xff);
469  str->length(length);
470  }
471  break;
472 
473  case MYSQL_TYPE_ENUM:
474  str->set_ascii(STRING_WITH_LEN("enum"));
475  break;
476 
477  case MYSQL_TYPE_SET:
478  str->set_ascii(STRING_WITH_LEN("set"));
479  break;
480 
481  case MYSQL_TYPE_BLOB:
482  /*
483  Field::real_type() lies regarding the actual type of a BLOB, so
484  it is necessary to check the pack length to figure out what kind
485  of blob it really is.
486  */
487  switch (get_blob_type_from_length(metadata))
488  {
489  case MYSQL_TYPE_TINY_BLOB:
490  str->set_ascii(STRING_WITH_LEN("tinyblob"));
491  break;
492 
493  case MYSQL_TYPE_MEDIUM_BLOB:
494  str->set_ascii(STRING_WITH_LEN("mediumblob"));
495  break;
496 
497  case MYSQL_TYPE_LONG_BLOB:
498  str->set_ascii(STRING_WITH_LEN("longblob"));
499  break;
500 
501  case MYSQL_TYPE_BLOB:
502  str->set_ascii(STRING_WITH_LEN("blob"));
503  break;
504 
505  default:
506  DBUG_ASSERT(0);
507  break;
508  }
509  break;
510 
511  case MYSQL_TYPE_STRING:
512  {
513  /*
514  This is taken from Field_string::unpack.
515  */
516  const CHARSET_INFO *cs= str->charset();
517  uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
518  uint32 length=
519  cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
520  "char(%d)", bytes / field_cs->mbmaxlen);
521  str->length(length);
522  }
523  break;
524 
525  case MYSQL_TYPE_GEOMETRY:
526  str->set_ascii(STRING_WITH_LEN("geometry"));
527  break;
528 
529  default:
530  str->set_ascii(STRING_WITH_LEN("<unknown type>"));
531  }
532  DBUG_VOID_RETURN;
533 }
534 
535 
543 bool is_conversion_ok(int order, Relay_log_info *rli)
544 {
545  DBUG_ENTER("is_conversion_ok");
546  bool allow_non_lossy, allow_lossy;
547 
548  allow_non_lossy = slave_type_conversions_options &
549  (ULL(1) << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
550  allow_lossy= slave_type_conversions_options &
551  (ULL(1) << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
552 
553  DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
554  allow_non_lossy ? " ALL_NON_LOSSY" : "",
555  allow_lossy ? " ALL_LOSSY" : ""));
556  if (order < 0 && !allow_non_lossy)
557  {
558  /* !!! Add error message saying that non-lossy conversions need to be allowed. */
559  DBUG_RETURN(false);
560  }
561 
562  if (order > 0 && !allow_lossy)
563  {
564  /* !!! Add error message saying that lossy conversions need to be allowed. */
565  DBUG_RETURN(false);
566  }
567 
568  DBUG_RETURN(true);
569 }
570 
571 
603 static bool
604 can_convert_field_to(Field *field,
605  enum_field_types source_type, uint16 metadata,
606  Relay_log_info *rli, uint16 mflags,
607  int *order_var)
608 {
609  DBUG_ENTER("can_convert_field_to");
610 #ifndef DBUG_OFF
611  char field_type_buf[MAX_FIELD_WIDTH];
612  String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
613  field->sql_type(field_type);
614  DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x",
615  field_type.c_ptr_safe(), field->real_type(), source_type, metadata));
616 #endif
617  /*
618  If the real type is the same, we need to check the metadata to
619  decide if conversions are allowed.
620  */
621  if (field->real_type() == source_type)
622  {
623  if (metadata == 0) // Metadata can only be zero if no metadata was provided
624  {
625  /*
626  If there is no metadata, we either have an old event where no
627  metadata were supplied, or a type that does not require any
628  metadata. In either case, conversion can be done but no
629  conversion table is necessary.
630  */
631  DBUG_PRINT("debug", ("Base types are identical, but there is no metadata"));
632  *order_var= 0;
633  DBUG_RETURN(true);
634  }
635 
636  DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
637  if (field->compatible_field_size(metadata, rli, mflags, order_var))
638  DBUG_RETURN(is_conversion_ok(*order_var, rli));
639  else
640  DBUG_RETURN(false);
641  }
642  else if (metadata == 0 &&
643  ((field->real_type() == MYSQL_TYPE_TIMESTAMP2 &&
644  source_type == MYSQL_TYPE_TIMESTAMP) ||
645  (field->real_type() == MYSQL_TYPE_TIME2 &&
646  source_type == MYSQL_TYPE_TIME) ||
647  (field->real_type() == MYSQL_TYPE_DATETIME2 &&
648  source_type == MYSQL_TYPE_DATETIME)))
649  {
650  /*
651  TS-TODO: conversion from FSP1>FSP2.
652  Can do non-lossy conversion
653  from old TIME, TIMESTAMP, DATETIME
654  to new TIME(0), TIMESTAMP(0), DATETIME(0).
655  */
656  *order_var= -1;
657  DBUG_RETURN(true);
658  }
659  else if (!slave_type_conversions_options)
660  DBUG_RETURN(false);
661 
662  /*
663  Here, from and to will always be different. Since the types are
664  different, we cannot use the compatible_field_size() function, but
665  have to rely on hard-coded max-sizes for fields.
666  */
667 
668  DBUG_PRINT("debug", ("Base types are different, checking conversion"));
669  switch (source_type) // Source type (on master)
670  {
671  case MYSQL_TYPE_DECIMAL:
672  case MYSQL_TYPE_NEWDECIMAL:
673  case MYSQL_TYPE_FLOAT:
674  case MYSQL_TYPE_DOUBLE:
675  switch (field->real_type())
676  {
677  case MYSQL_TYPE_NEWDECIMAL:
678  /*
679  Then the other type is either FLOAT, DOUBLE, or old style
680  DECIMAL, so we require lossy conversion.
681  */
682  *order_var= 1;
683  DBUG_RETURN(is_conversion_ok(*order_var, rli));
684 
685  case MYSQL_TYPE_DECIMAL:
686  case MYSQL_TYPE_FLOAT:
687  case MYSQL_TYPE_DOUBLE:
688  {
689  if (source_type == MYSQL_TYPE_NEWDECIMAL ||
690  source_type == MYSQL_TYPE_DECIMAL)
691  *order_var = 1; // Always require lossy conversions
692  else
693  *order_var= compare_lengths(field, source_type, metadata);
694  DBUG_ASSERT(*order_var != 0);
695  DBUG_RETURN(is_conversion_ok(*order_var, rli));
696  }
697 
698  default:
699  DBUG_RETURN(false);
700  }
701  break;
702 
703  /*
704  The length comparison check will do the correct job of comparing
705  the field lengths (in bytes) of two integer types.
706  */
707  case MYSQL_TYPE_TINY:
708  case MYSQL_TYPE_SHORT:
709  case MYSQL_TYPE_INT24:
710  case MYSQL_TYPE_LONG:
711  case MYSQL_TYPE_LONGLONG:
712  switch (field->real_type())
713  {
714  case MYSQL_TYPE_TINY:
715  case MYSQL_TYPE_SHORT:
716  case MYSQL_TYPE_INT24:
717  case MYSQL_TYPE_LONG:
718  case MYSQL_TYPE_LONGLONG:
719  *order_var= compare_lengths(field, source_type, metadata);
720  DBUG_ASSERT(*order_var != 0);
721  DBUG_RETURN(is_conversion_ok(*order_var, rli));
722 
723  default:
724  DBUG_RETURN(false);
725  }
726  break;
727 
728  /*
729  Since source and target type is different, and it is not possible
730  to convert bit types to anything else, this will return false.
731  */
732  case MYSQL_TYPE_BIT:
733  DBUG_RETURN(false);
734 
735  /*
736  If all conversions are disabled, it is not allowed to convert
737  between these types. Since the TEXT vs. BINARY is distinguished by
738  the charset, and the charset is not replicated, we cannot
739  currently distinguish between , e.g., TEXT and BLOB.
740  */
741  case MYSQL_TYPE_TINY_BLOB:
742  case MYSQL_TYPE_MEDIUM_BLOB:
743  case MYSQL_TYPE_LONG_BLOB:
744  case MYSQL_TYPE_BLOB:
745  case MYSQL_TYPE_STRING:
746  case MYSQL_TYPE_VAR_STRING:
747  case MYSQL_TYPE_VARCHAR:
748  switch (field->real_type())
749  {
750  case MYSQL_TYPE_TINY_BLOB:
751  case MYSQL_TYPE_MEDIUM_BLOB:
752  case MYSQL_TYPE_LONG_BLOB:
753  case MYSQL_TYPE_BLOB:
754  case MYSQL_TYPE_STRING:
755  case MYSQL_TYPE_VAR_STRING:
756  case MYSQL_TYPE_VARCHAR:
757  *order_var= compare_lengths(field, source_type, metadata);
758  /*
759  Here we know that the types are different, so if the order
760  gives that they do not require any conversion, we still need
761  to have non-lossy conversion enabled to allow conversion
762  between different (string) types of the same length.
763  */
764  if (*order_var == 0)
765  *order_var= -1;
766  DBUG_RETURN(is_conversion_ok(*order_var, rli));
767 
768  default:
769  DBUG_RETURN(false);
770  }
771  break;
772 
773  case MYSQL_TYPE_GEOMETRY:
774  case MYSQL_TYPE_TIMESTAMP:
775  case MYSQL_TYPE_DATE:
776  case MYSQL_TYPE_TIME:
777  case MYSQL_TYPE_DATETIME:
778  case MYSQL_TYPE_YEAR:
779  case MYSQL_TYPE_NEWDATE:
780  case MYSQL_TYPE_NULL:
781  case MYSQL_TYPE_ENUM:
782  case MYSQL_TYPE_SET:
783  case MYSQL_TYPE_TIMESTAMP2:
784  case MYSQL_TYPE_DATETIME2:
785  case MYSQL_TYPE_TIME2:
786  DBUG_RETURN(false);
787  }
788  DBUG_RETURN(false); // To keep GCC happy
789 }
790 
791 
819 bool
821  TABLE *table, TABLE **conv_table_var)
822  const
823 {
824  /*
825  We only check the initial columns for the tables.
826  */
827  uint const cols_to_check= min<ulong>(table->s->fields, size());
828  TABLE *tmp_table= NULL;
829 
830  for (uint col= 0 ; col < cols_to_check ; ++col)
831  {
832  Field *const field= table->field[col];
833  int order;
834  if (can_convert_field_to(field, type(col), field_metadata(col), rli, m_flags, &order))
835  {
836  DBUG_PRINT("debug", ("Checking column %d -"
837  " field '%s' can be converted - order: %d",
838  col, field->field_name, order));
839  DBUG_ASSERT(order >= -1 && order <= 1);
840 
841  /*
842  If order is not 0, a conversion is required, so we need to set
843  up the conversion table.
844  */
845  if (order != 0 && tmp_table == NULL)
846  {
847  /*
848  This will create the full table with all fields. This is
849  necessary to ge the correct field lengths for the record.
850  */
851  tmp_table= create_conversion_table(thd, rli, table);
852  if (tmp_table == NULL)
853  return false;
854  /*
855  Clear all fields up to, but not including, this column.
856  */
857  for (unsigned int i= 0; i < col; ++i)
858  tmp_table->field[i]= NULL;
859  }
860 
861  if (order == 0 && tmp_table != NULL)
862  tmp_table->field[col]= NULL;
863  }
864  else
865  {
866  DBUG_PRINT("debug", ("Checking column %d -"
867  " field '%s' can not be converted",
868  col, field->field_name));
869  DBUG_ASSERT(col < size() && col < table->s->fields);
870  DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
871  const char *db_name= table->s->db.str;
872  const char *tbl_name= table->s->table_name.str;
873  char source_buf[MAX_FIELD_WIDTH];
874  char target_buf[MAX_FIELD_WIDTH];
875  String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
876  String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
877  show_sql_type(type(col), field_metadata(col), &source_type, field->charset());
878  field->sql_type(target_type);
879  rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED,
880  ER(ER_SLAVE_CONVERSION_FAILED),
881  col, db_name, tbl_name,
882  source_type.c_ptr_safe(), target_type.c_ptr_safe());
883  return false;
884  }
885  }
886 
887 #ifndef DBUG_OFF
888  if (tmp_table)
889  {
890  for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
891  if (tmp_table->field[col])
892  {
893  char source_buf[MAX_FIELD_WIDTH];
894  char target_buf[MAX_FIELD_WIDTH];
895  String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
896  String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
897  tmp_table->field[col]->sql_type(source_type);
898  table->field[col]->sql_type(target_type);
899  DBUG_PRINT("debug", ("Field %s - conversion required."
900  " Source type: '%s', Target type: '%s'",
901  tmp_table->field[col]->field_name,
902  source_type.c_ptr_safe(), target_type.c_ptr_safe()));
903  }
904  }
905 #endif
906 
907  *conv_table_var= tmp_table;
908  return true;
909 }
910 
921 TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table) const
922 {
923  DBUG_ENTER("table_def::create_conversion_table");
924 
925  List<Create_field> field_list;
926  TABLE *conv_table= NULL;
927  /*
928  At slave, columns may differ. So we should create
929  min(columns@master, columns@slave) columns in the
930  conversion table.
931  */
932  uint const cols_to_create= min<ulong>(target_table->s->fields, size());
933 
934  // Default value : treat all values signed
935  bool unsigned_flag= FALSE;
936 
937  // Check if slave_type_conversions contains ALL_UNSIGNED
938  unsigned_flag= slave_type_conversions_options &
939  (ULL(1) << SLAVE_TYPE_CONVERSIONS_ALL_UNSIGNED);
940 
941  // Check if slave_type_conversions contains ALL_SIGNED
942  unsigned_flag= unsigned_flag && !(slave_type_conversions_options &
943  (ULL(1) << SLAVE_TYPE_CONVERSIONS_ALL_SIGNED));
944 
945  for (uint col= 0 ; col < cols_to_create; ++col)
946  {
947  Create_field *field_def=
948  (Create_field*) alloc_root(thd->mem_root, sizeof(Create_field));
949  if (field_list.push_back(field_def))
950  DBUG_RETURN(NULL);
951 
952  uint decimals= 0;
953  TYPELIB* interval= NULL;
954  uint pack_length= 0;
955  uint32 max_length=
956  max_display_length_for_field(type(col), field_metadata(col));
957 
958  switch(type(col))
959  {
960  int precision;
961  case MYSQL_TYPE_ENUM:
962  case MYSQL_TYPE_SET:
963  interval= static_cast<Field_enum*>(target_table->field[col])->typelib;
964  pack_length= field_metadata(col) & 0x00ff;
965  break;
966 
967  case MYSQL_TYPE_NEWDECIMAL:
968  /*
969  The display length of a DECIMAL type is not the same as the
970  length that should be supplied to make_field, so we correct
971  the length here.
972  */
973  precision= field_metadata(col) >> 8;
974  decimals= field_metadata(col) & 0x00ff;
975  max_length=
976  my_decimal_precision_to_length(precision, decimals, FALSE);
977  break;
978 
979  case MYSQL_TYPE_DECIMAL:
980  sql_print_error("In RBR mode, Slave received incompatible DECIMAL field "
981  "(old-style decimal field) from Master while creating "
982  "conversion table. Please consider changing datatype on "
983  "Master to new style decimal by executing ALTER command for"
984  " column Name: %s.%s.%s.",
985  target_table->s->db.str,
986  target_table->s->table_name.str,
987  target_table->field[col]->field_name);
988  goto err;
989 
990  case MYSQL_TYPE_TINY_BLOB:
991  case MYSQL_TYPE_MEDIUM_BLOB:
992  case MYSQL_TYPE_LONG_BLOB:
993  case MYSQL_TYPE_BLOB:
994  case MYSQL_TYPE_GEOMETRY:
995  pack_length= field_metadata(col) & 0x00ff;
996  break;
997 
998  default:
999  break;
1000  }
1001 
1002  DBUG_PRINT("debug", ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d,"
1003  " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
1004  binlog_type(col), target_table->field[col]->field_name,
1005  max_length, decimals, TRUE, unsigned_flag, pack_length));
1006  field_def->init_for_tmp_table(type(col),
1007  max_length,
1008  decimals,
1009  TRUE, // maybe_null
1010  unsigned_flag, // unsigned_flag
1011  pack_length);
1012  field_def->charset= target_table->field[col]->charset();
1013  field_def->interval= interval;
1014  }
1015 
1016  conv_table= create_virtual_tmp_table(thd, field_list);
1017 
1018 err:
1019  if (conv_table == NULL)
1020  rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION,
1021  ER(ER_SLAVE_CANT_CREATE_CONVERSION),
1022  target_table->s->db.str,
1023  target_table->s->table_name.str);
1024  DBUG_RETURN(conv_table);
1025 }
1026 
1027 #endif /* MYSQL_CLIENT */
1028 
1029 table_def::table_def(unsigned char *types, ulong size,
1030  uchar *field_metadata, int metadata_size,
1031  uchar *null_bitmap, uint16 flags)
1032  : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
1033  m_field_metadata(0), m_null_bits(0), m_flags(flags),
1034  m_memory(NULL)
1035 {
1036  m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
1037  &m_type, size,
1038  &m_field_metadata,
1039  size * sizeof(uint16),
1040  &m_null_bits, (size + 7) / 8,
1041  NULL);
1042 
1043  memset(m_field_metadata, 0, size * sizeof(uint16));
1044 
1045  if (m_type)
1046  memcpy(m_type, types, size);
1047  else
1048  m_size= 0;
1049  /*
1050  Extract the data from the table map into the field metadata array
1051  iff there is field metadata. The variable metadata_size will be
1052  0 if we are replicating from an older version server since no field
1053  metadata was written to the table map. This can also happen if
1054  there were no fields in the master that needed extra metadata.
1055  */
1056  if (m_size && metadata_size)
1057  {
1058  int index= 0;
1059  for (unsigned int i= 0; i < m_size; i++)
1060  {
1061  switch (binlog_type(i)) {
1062  case MYSQL_TYPE_TINY_BLOB:
1063  case MYSQL_TYPE_BLOB:
1064  case MYSQL_TYPE_MEDIUM_BLOB:
1065  case MYSQL_TYPE_LONG_BLOB:
1066  case MYSQL_TYPE_DOUBLE:
1067  case MYSQL_TYPE_FLOAT:
1068  case MYSQL_TYPE_GEOMETRY:
1069  {
1070  /*
1071  These types store a single byte.
1072  */
1073  m_field_metadata[i]= field_metadata[index];
1074  index++;
1075  break;
1076  }
1077  case MYSQL_TYPE_SET:
1078  case MYSQL_TYPE_ENUM:
1079  case MYSQL_TYPE_STRING:
1080  {
1081  uint16 x= field_metadata[index++] << 8U; // real_type
1082  x+= field_metadata[index++]; // pack or field length
1083  m_field_metadata[i]= x;
1084  break;
1085  }
1086  case MYSQL_TYPE_BIT:
1087  {
1088  uint16 x= field_metadata[index++];
1089  x = x + (field_metadata[index++] << 8U);
1090  m_field_metadata[i]= x;
1091  break;
1092  }
1093  case MYSQL_TYPE_VARCHAR:
1094  {
1095  /*
1096  These types store two bytes.
1097  */
1098  char *ptr= (char *)&field_metadata[index];
1099  m_field_metadata[i]= uint2korr(ptr);
1100  index= index + 2;
1101  break;
1102  }
1103  case MYSQL_TYPE_NEWDECIMAL:
1104  {
1105  uint16 x= field_metadata[index++] << 8U; // precision
1106  x+= field_metadata[index++]; // decimals
1107  m_field_metadata[i]= x;
1108  break;
1109  }
1110  case MYSQL_TYPE_TIME2:
1111  case MYSQL_TYPE_DATETIME2:
1112  case MYSQL_TYPE_TIMESTAMP2:
1113  m_field_metadata[i]= field_metadata[index++];
1114  break;
1115  default:
1116  m_field_metadata[i]= 0;
1117  break;
1118  }
1119  }
1120  }
1121  if (m_size && null_bitmap)
1122  memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
1123 }
1124 
1125 
1126 table_def::~table_def()
1127 {
1128  my_free(m_memory);
1129 #ifndef DBUG_OFF
1130  m_type= 0;
1131  m_size= 0;
1132 #endif
1133 }
1134 
1142 bool event_checksum_test(uchar *event_buf, ulong event_len, uint8 alg)
1143 {
1144  bool res= FALSE;
1145  uint16 flags= 0; // to store in FD's buffer flags orig value
1146 
1147  if (alg != BINLOG_CHECKSUM_ALG_OFF && alg != BINLOG_CHECKSUM_ALG_UNDEF)
1148  {
1149  ha_checksum incoming;
1150  ha_checksum computed;
1151 
1152  if (event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT)
1153  {
1154 #ifndef DBUG_OFF
1155  int8 fd_alg= event_buf[event_len - BINLOG_CHECKSUM_LEN -
1156  BINLOG_CHECKSUM_ALG_DESC_LEN];
1157 #endif
1158  /*
1159  FD event is checksummed and therefore verified w/o the binlog-in-use flag
1160  */
1161  flags= uint2korr(event_buf + FLAGS_OFFSET);
1162  if (flags & LOG_EVENT_BINLOG_IN_USE_F)
1163  event_buf[FLAGS_OFFSET] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1164  /*
1165  The only algorithm currently is CRC32. Zero indicates
1166  the binlog file is checksum-free *except* the FD-event.
1167  */
1168  DBUG_ASSERT(fd_alg == BINLOG_CHECKSUM_ALG_CRC32 || fd_alg == 0);
1169  DBUG_ASSERT(alg == BINLOG_CHECKSUM_ALG_CRC32);
1170  /*
1171  Complile time guard to watch over the max number of alg
1172  */
1173  compile_time_assert(BINLOG_CHECKSUM_ALG_ENUM_END <= 0x80);
1174  }
1175  incoming= uint4korr(event_buf + event_len - BINLOG_CHECKSUM_LEN);
1176  computed= my_checksum(0L, NULL, 0);
1177  /* checksum the event content but the checksum part itself */
1178  computed= my_checksum(computed, (const uchar*) event_buf,
1179  event_len - BINLOG_CHECKSUM_LEN);
1180  if (flags != 0)
1181  {
1182  /* restoring the orig value of flags of FD */
1183  DBUG_ASSERT(event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
1184  event_buf[FLAGS_OFFSET]= flags;
1185  }
1186  res= !(computed == incoming);
1187  }
1188  return DBUG_EVALUATE_IF("simulate_checksum_test_failure", TRUE, res);
1189 }
1190 
1191 #ifndef MYSQL_CLIENT
1192 
1193 #define HASH_ROWS_POS_SEARCH_INVALID -1
1194 
1199 static uchar*
1200 hash_slave_rows_get_key(const uchar *record,
1201  size_t *length,
1202  my_bool not_used __attribute__((unused)))
1203 {
1204  DBUG_ENTER("get_key");
1205 
1206  HASH_ROW_ENTRY *entry=(HASH_ROW_ENTRY *) record;
1207  HASH_ROW_PREAMBLE *preamble= entry->preamble;
1208  *length= preamble->length;
1209 
1210  DBUG_RETURN((uchar*) &preamble->hash_value);
1211 }
1212 
1213 static void
1214 hash_slave_rows_free_entry(HASH_ROW_ENTRY *entry)
1215 {
1216  DBUG_ENTER("free_entry");
1217  if (entry)
1218  {
1219  if (entry->preamble)
1220  my_free(entry->preamble);
1221  if (entry->positions)
1222  my_free(entry->positions);
1223  my_free(entry);
1224  }
1225  DBUG_VOID_RETURN;
1226 }
1227 
1229 {
1230  return (m_hash.records == 0);
1231 }
1232 
1238 {
1239  if (my_hash_init(&m_hash,
1240  &my_charset_bin, /* the charater set information */
1241  16 /* TODO */, /* growth size */
1242  0, /* key offset */
1243  0, /* key length */
1244  hash_slave_rows_get_key, /* get function pointer */
1245  (my_hash_free_key) hash_slave_rows_free_entry, /* freefunction pointer */
1246  MYF(0))) /* flags */
1247  return true;
1248  return false;
1249 }
1250 
1252 {
1253  if (my_hash_inited(&m_hash))
1254  my_hash_free(&m_hash);
1255 
1256  return 0;
1257 }
1258 
1260 {
1261  return m_hash.records;
1262 }
1263 
1265 {
1266  return make_entry(NULL, NULL);
1267 }
1268 
1269 HASH_ROW_ENTRY* Hash_slave_rows::make_entry(const uchar* bi_start, const uchar* bi_ends)
1270 {
1271  DBUG_ENTER("Hash_slave_rows::make_entry");
1272 
1273  HASH_ROW_ENTRY *entry= (HASH_ROW_ENTRY*) my_malloc(sizeof(HASH_ROW_ENTRY), MYF(0));
1274  HASH_ROW_PREAMBLE *preamble= (HASH_ROW_PREAMBLE *) my_malloc(sizeof(HASH_ROW_PREAMBLE), MYF(0));
1275  HASH_ROW_POS *pos= (HASH_ROW_POS *) my_malloc(sizeof(HASH_ROW_POS), MYF(0));
1276 
1277  if (!entry || !preamble || !pos)
1278  goto err;
1279 
1283  preamble->hash_value= 0;
1284  preamble->length= sizeof(my_hash_value_type);
1285  preamble->search_state= HASH_ROWS_POS_SEARCH_INVALID;
1286  preamble->is_search_state_inited= false;
1287 
1291  pos->bi_start= (const uchar *) bi_start;
1292  pos->bi_ends= (const uchar *) bi_ends;
1293 
1297  entry->preamble= preamble;
1298  entry->positions= pos;
1299 
1300  DBUG_RETURN(entry);
1301 
1302 err:
1303  if (entry)
1304  my_free(entry);
1305  if (preamble)
1306  my_free(entry);
1307  if (pos)
1308  my_free(pos);
1309  DBUG_RETURN(NULL);
1310 }
1311 
1312 bool
1314  MY_BITMAP *cols,
1315  HASH_ROW_ENTRY* entry)
1316 {
1317 
1318  DBUG_ENTER("Hash_slave_rows::put");
1319 
1320  HASH_ROW_PREAMBLE* preamble= entry->preamble;
1321 
1328  preamble->hash_value= make_hash_key(table, cols);
1329 
1330  my_hash_insert(&m_hash, (uchar *) entry);
1331  DBUG_PRINT("debug", ("Added record to hash with key=%u", preamble->hash_value));
1332  DBUG_RETURN(false);
1333 }
1334 
1337 {
1338  DBUG_ENTER("Hash_slave_rows::get");
1339  HASH_SEARCH_STATE state;
1340  my_hash_value_type key;
1341  HASH_ROW_ENTRY *entry= NULL;
1342 
1343  key= make_hash_key(table, cols);
1344 
1345  DBUG_PRINT("debug", ("Looking for record with key=%u in the hash.", key));
1346 
1347  entry= (HASH_ROW_ENTRY*) my_hash_first(&m_hash,
1348  (const uchar*) &key,
1349  sizeof(my_hash_value_type),
1350  &state);
1351  if (entry)
1352  {
1353  DBUG_PRINT("debug", ("Found record with key=%u in the hash.", key));
1354 
1359  entry->preamble->search_state= state;
1360  entry->preamble->is_search_state_inited= true;
1361  }
1362 
1363  DBUG_RETURN(entry);
1364 }
1365 
1367 {
1368  DBUG_ENTER("Hash_slave_rows::next");
1369  DBUG_ASSERT(*entry);
1370 
1371  if (*entry == NULL)
1372  DBUG_RETURN(true);
1373 
1374  HASH_ROW_PREAMBLE *preamble= (*entry)->preamble;
1375 
1376  if (!preamble->is_search_state_inited)
1377  DBUG_RETURN(true);
1378 
1379  my_hash_value_type key= preamble->hash_value;
1380  HASH_SEARCH_STATE state= preamble->search_state;
1381 
1382  /*
1383  Invalidate search for current preamble, because it is going to be
1384  used in the search below (and search state is used in a
1385  one-time-only basis).
1386  */
1387  preamble->search_state= HASH_ROWS_POS_SEARCH_INVALID;
1388  preamble->is_search_state_inited= false;
1389 
1390  DBUG_PRINT("debug", ("Looking for record with key=%u in the hash (next).", key));
1391 
1395  *entry= (HASH_ROW_ENTRY*) my_hash_next(&m_hash,
1396  (const uchar*) &key,
1397  sizeof(my_hash_value_type),
1398  &state);
1399  if (*entry)
1400  {
1401  DBUG_PRINT("debug", ("Found record with key=%u in the hash (next).", key));
1402  preamble= (*entry)->preamble;
1403 
1407  preamble->search_state= state;
1408  preamble->is_search_state_inited= true;
1409  }
1410 
1411  DBUG_RETURN(false);
1412 }
1413 
1414 bool
1416 {
1417  DBUG_ENTER("Hash_slave_rows::del");
1418  DBUG_ASSERT(entry);
1419 
1420  if (my_hash_delete(&m_hash, (uchar *) entry))
1421  DBUG_RETURN(true);
1422  DBUG_RETURN(false);
1423 }
1424 
1425 my_hash_value_type
1426 Hash_slave_rows::make_hash_key(TABLE *table, MY_BITMAP *cols)
1427 {
1428  DBUG_ENTER("Hash_slave_rows::make_hash_key");
1429  ha_checksum crc= 0L;
1430 
1431  uchar *record= table->record[0];
1432  uchar saved_x= 0, saved_filler= 0;
1433 
1434  if (table->s->null_bytes > 0)
1435  {
1436  /*
1437  If we have an X bit then we need to take care of it.
1438  */
1439  if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
1440  {
1441  saved_x= record[0];
1442  record[0]|= 1U;
1443  }
1444 
1445  /*
1446  If (last_null_bit_pos == 0 && null_bytes > 1), then:
1447  X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
1448  Ie, the entire byte is used.
1449  */
1450  if (table->s->last_null_bit_pos > 0)
1451  {
1452  saved_filler= record[table->s->null_bytes - 1];
1453  record[table->s->null_bytes - 1]|=
1454  256U - (1U << table->s->last_null_bit_pos);
1455  }
1456  }
1457 
1458  /*
1459  We can only checksum the bytes if all fields have been signaled
1460  in the before image. Otherwise, unpack_row will not have set the
1461  null_flags correctly (because it only unpacks those fields and
1462  their flags that were actually in the before image).
1463 
1464  @c record_compare, as it also skips null_flags if the read_set
1465  was not marked completely.
1466  */
1467  if (bitmap_is_set_all(cols))
1468  {
1469  crc= my_checksum(crc, table->null_flags, table->s->null_bytes);
1470  DBUG_PRINT("debug", ("make_hash_entry: hash after null_flags: %u", crc));
1471  }
1472 
1473  for (Field **ptr=table->field ;
1474  *ptr && ((*ptr)->field_index < cols->n_bits);
1475  ptr++)
1476  {
1477  Field *f= (*ptr);
1478 
1479  /*
1480  Field is set in the read_set and is isn't NULL.
1481  */
1482  if (bitmap_is_set(cols, f->field_index) && !f->is_null())
1483  {
1484  /*
1485  BLOB and VARCHAR have pointers in their field, we must convert
1486  to string; GEOMETRY is implemented on top of BLOB.
1487  BIT may store its data among NULL bits, convert as well.
1488  */
1489  switch (f->type()) {
1490  case MYSQL_TYPE_BLOB:
1491  case MYSQL_TYPE_VARCHAR:
1492  case MYSQL_TYPE_GEOMETRY:
1493  case MYSQL_TYPE_BIT:
1494  {
1495  String tmp;
1496  f->val_str(&tmp);
1497  crc= my_checksum(crc, (uchar*) tmp.ptr(), tmp.length());
1498  break;
1499  }
1500  default:
1501  crc= my_checksum(crc, f->ptr, f->data_length());
1502  break;
1503  }
1504 #ifndef DBUG_OFF
1505  String tmp;
1506  f->val_str(&tmp);
1507  DBUG_PRINT("debug", ("make_hash_entry: hash after field %s=%s: %u", f->field_name, tmp.c_ptr_safe(), crc));
1508 #endif
1509  }
1510  }
1511 
1512  /*
1513  Restore the saved bytes.
1514 
1515  TODO[record format ndb]: Remove this code once NDB returns the
1516  correct record format.
1517  */
1518  if (table->s->null_bytes > 0)
1519  {
1520  if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
1521  record[0]= saved_x;
1522 
1523  if (table->s->last_null_bit_pos)
1524  record[table->s->null_bytes - 1]= saved_filler;
1525  }
1526 
1527  DBUG_PRINT("debug", ("Created key=%u", crc));
1528  DBUG_RETURN(crc);
1529 }
1530 
1531 
1532 #endif
1533 
1534 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
1535 
1536 Deferred_log_events::Deferred_log_events(Relay_log_info *rli)
1537 {
1538  my_init_dynamic_array(&array, sizeof(Log_event *), 32, 16);
1539 }
1540 
1541 Deferred_log_events::~Deferred_log_events()
1542 {
1543  delete_dynamic(&array);
1544 }
1545 
1546 int Deferred_log_events::add(Log_event *ev)
1547 {
1548  insert_dynamic(&array, (uchar*) &ev);
1549  ev->worker= NULL; // to mark event busy avoiding deletion
1550  return 0;
1551 }
1552 
1553 bool Deferred_log_events::is_empty()
1554 {
1555  return array.elements == 0;
1556 }
1557 
1558 bool Deferred_log_events::execute(Relay_log_info *rli)
1559 {
1560  bool res= false;
1561 
1562  DBUG_ASSERT(rli->deferred_events_collecting);
1563 
1564  rli->deferred_events_collecting= false;
1565  for (uint i= 0; !res && i < array.elements; i++)
1566  {
1567  Log_event *ev= (* (Log_event **)
1568  dynamic_array_ptr(&array, i));
1569  res= ev->apply_event(rli);
1570  }
1571  rli->deferred_events_collecting= true;
1572  return res;
1573 }
1574 
1575 void Deferred_log_events::rewind()
1576 {
1577  /*
1578  Reset preceeding Query log event events which execution was
1579  deferred because of slave side filtering.
1580  */
1581  if (!is_empty())
1582  {
1583  for (uint i= 0; i < array.elements; i++)
1584  {
1585  Log_event *ev= *(Log_event **) dynamic_array_ptr(&array, i);
1586  delete ev;
1587  }
1588  if (array.elements > array.max_element)
1589  freeze_size(&array);
1590  reset_dynamic(&array);
1591  }
1592 }
1593 
1594 #endif