MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
row0log.cc
Go to the documentation of this file.
1 /*****************************************************************************
2 
3 Copyright (c) 2011, 2013, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 
19 /**************************************************/
26 #include "row0log.h"
27 
28 #ifdef UNIV_NONINL
29 #include "row0log.ic"
30 #endif
31 
32 #include "row0row.h"
33 #include "row0ins.h"
34 #include "row0upd.h"
35 #include "row0merge.h"
36 #include "row0ext.h"
37 #include "data0data.h"
38 #include "que0que.h"
39 #include "handler0alter.h"
40 
41 #include<map>
42 
45 enum row_tab_op {
47  ROW_T_INSERT = 0x41,
52 };
53 
55 enum row_op {
57  ROW_OP_INSERT = 0x61,
60 };
61 
62 #ifdef UNIV_DEBUG
63 
64 # define ROW_LOG_APPLY_PRINT
65 #endif /* UNIV_DEBUG */
66 
67 #ifdef ROW_LOG_APPLY_PRINT
68 
69 static bool row_log_apply_print;
70 #endif /* ROW_LOG_APPLY_PRINT */
71 
73 #define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/
74 
76 struct row_log_buf_t {
77  byte* block;
80  ulint blocks;
81  ulint bytes;
82  ulonglong total;
86 };
87 
90 public:
93 #ifdef UNIV_DEBUG
94  row_log_table_blob_t(ulonglong offset_arg) :
95  old_offset (0), free_offset (offset_arg),
96  offset (BLOB_FREED) {}
97 #else /* UNIV_DEBUG */
99  offset (BLOB_FREED) {}
100 #endif /* UNIV_DEBUG */
101 
104 #ifdef UNIV_DEBUG
105  void blob_free(ulonglong offset_arg)
106 #else /* UNIV_DEBUG */
107  void blob_free()
108 #endif /* UNIV_DEBUG */
109  {
110  ut_ad(offset < offset_arg);
111  ut_ad(offset != BLOB_FREED);
112  ut_d(old_offset = offset);
113  ut_d(free_offset = offset_arg);
114  offset = BLOB_FREED;
115  }
118  void blob_alloc(ulonglong offset_arg) {
119  ut_ad(free_offset <= offset_arg);
120  ut_d(old_offset = offset);
121  offset = offset_arg;
122  }
126  bool is_freed(ulonglong offset_arg) const {
127  /* This is supposed to be the offset at the end of the
128  current log record. */
129  ut_ad(offset_arg > 0);
130  /* We should never get anywhere close the magic value. */
131  ut_ad(offset_arg < BLOB_FREED);
132  return(offset_arg < offset);
133  }
134 private:
136  static const ulonglong BLOB_FREED = ~0ULL;
137 #ifdef UNIV_DEBUG
138 
139  ulonglong old_offset;
141  ulonglong free_offset;
142 #endif /* UNIV_DEBUG */
143 
144  ulonglong offset;
145 };
146 
154 typedef std::map<ulint, row_log_table_blob_t> page_no_map;
155 
168 struct row_log_t {
169  int fd;
172  page_no_map* blobs;
179  bool same_pk;
181  const dtuple_t* add_cols;
183  const ulint* col_map;
196  ulint size;
197 };
198 
199 /******************************************************/
201 UNIV_INTERN
202 void
204 /*==============*/
206  const dtuple_t* tuple,
207  trx_id_t trx_id)
209 {
210  byte* b;
211  ulint extra_size;
212  ulint size;
213  ulint mrec_size;
214  ulint avail_size;
215  row_log_t* log;
216 
217  ut_ad(dtuple_validate(tuple));
219 #ifdef UNIV_SYNC_DEBUG
220  ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED)
221  || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
222 #endif /* UNIV_SYNC_DEBUG */
223 
224  if (dict_index_is_corrupted(index)) {
225  return;
226  }
227 
229 
230  /* Compute the size of the record. This differs from
231  row_merge_buf_encode(), because here we do not encode
232  extra_size+1 (and reserve 0 as the end-of-chunk marker). */
233 
235  index, tuple->fields, tuple->n_fields, &extra_size);
236  ut_ad(size >= extra_size);
237  ut_ad(size <= sizeof log->tail.buf);
238 
239  mrec_size = ROW_LOG_HEADER_SIZE
240  + (extra_size >= 0x80) + size
241  + (trx_id ? DATA_TRX_ID_LEN : 0);
242 
243  log = index->online_log;
244  mutex_enter(&log->mutex);
245 
246  if (trx_id > log->max_trx) {
247  log->max_trx = trx_id;
248  }
249 
250  UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
251 
253  avail_size = srv_sort_buf_size - log->tail.bytes;
254 
255  if (mrec_size > avail_size) {
256  b = log->tail.buf;
257  } else {
258  b = log->tail.block + log->tail.bytes;
259  }
260 
261  if (trx_id != 0) {
262  *b++ = ROW_OP_INSERT;
263  trx_write_trx_id(b, trx_id);
264  b += DATA_TRX_ID_LEN;
265  } else {
266  *b++ = ROW_OP_DELETE;
267  }
268 
269  if (extra_size < 0x80) {
270  *b++ = (byte) extra_size;
271  } else {
272  ut_ad(extra_size < 0x8000);
273  *b++ = (byte) (0x80 | (extra_size >> 8));
274  *b++ = (byte) extra_size;
275  }
276 
278  b + extra_size, index, tuple->fields, tuple->n_fields);
279  b += size;
280 
281  if (mrec_size >= avail_size) {
282  const os_offset_t byte_offset
283  = (os_offset_t) log->tail.blocks
285  ibool ret;
286 
287  if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
288  goto write_failed;
289  }
290 
291  if (mrec_size == avail_size) {
292  ut_ad(b == &log->tail.block[srv_sort_buf_size]);
293  } else {
294  ut_ad(b == log->tail.buf + mrec_size);
295  memcpy(log->tail.block + log->tail.bytes,
296  log->tail.buf, avail_size);
297  }
298  UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
299  ret = os_file_write(
300  "(modification log)",
301  OS_FILE_FROM_FD(log->fd),
302  log->tail.block, byte_offset, srv_sort_buf_size);
303  log->tail.blocks++;
304  if (!ret) {
305 write_failed:
306  /* We set the flag directly instead of invoking
307  dict_set_corrupted_index_cache_only(index) here,
308  because the index is not "public" yet. */
309  index->type |= DICT_CORRUPT;
310  }
311  UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
312  memcpy(log->tail.block, log->tail.buf + avail_size,
313  mrec_size - avail_size);
314  log->tail.bytes = mrec_size - avail_size;
315  } else {
316  log->tail.bytes += mrec_size;
317  ut_ad(b == log->tail.block + log->tail.bytes);
318  }
319 
320  UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
321  mutex_exit(&log->mutex);
322 }
323 
324 /******************************************************/
327 UNIV_INTERN
328 dberr_t
330 /*====================*/
331  const dict_index_t* index)
333 {
334  ut_ad(dict_index_is_clust(index));
336  return(index->online_log->error);
337 }
338 
339 /******************************************************/
342 static __attribute__((nonnull, warn_unused_result))
343 byte*
344 row_log_table_open(
345 /*===============*/
346  row_log_t* log,
347  ulint size,
348  ulint* avail)
349 {
350  mutex_enter(&log->mutex);
351 
352  UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
353 
354  if (log->error != DB_SUCCESS) {
355  mutex_exit(&log->mutex);
356  return(NULL);
357  }
358 
359  ut_ad(log->tail.bytes < srv_sort_buf_size);
360  *avail = srv_sort_buf_size - log->tail.bytes;
361 
362  if (size > *avail) {
363  return(log->tail.buf);
364  } else {
365  return(log->tail.block + log->tail.bytes);
366  }
367 }
368 
369 /******************************************************/
371 static __attribute__((nonnull))
372 void
373 row_log_table_close_func(
374 /*=====================*/
375  row_log_t* log,
376 #ifdef UNIV_DEBUG
377  const byte* b,
378 #endif /* UNIV_DEBUG */
379  ulint size,
380  ulint avail)
381 {
382  ut_ad(mutex_own(&log->mutex));
383 
384  if (size >= avail) {
385  const os_offset_t byte_offset
386  = (os_offset_t) log->tail.blocks
388  ibool ret;
389 
390  if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
391  goto write_failed;
392  }
393 
394  if (size == avail) {
395  ut_ad(b == &log->tail.block[srv_sort_buf_size]);
396  } else {
397  ut_ad(b == log->tail.buf + size);
398  memcpy(log->tail.block + log->tail.bytes,
399  log->tail.buf, avail);
400  }
401  UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
402  ret = os_file_write(
403  "(modification log)",
404  OS_FILE_FROM_FD(log->fd),
405  log->tail.block, byte_offset, srv_sort_buf_size);
406  log->tail.blocks++;
407  if (!ret) {
408 write_failed:
409  log->error = DB_ONLINE_LOG_TOO_BIG;
410  }
411  UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
412  memcpy(log->tail.block, log->tail.buf + avail, size - avail);
413  log->tail.bytes = size - avail;
414  } else {
415  log->tail.bytes += size;
416  ut_ad(b == log->tail.block + log->tail.bytes);
417  }
418 
419  log->tail.total += size;
420  UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
421  mutex_exit(&log->mutex);
422 }
423 
424 #ifdef UNIV_DEBUG
425 # define row_log_table_close(log, b, size, avail) \
426  row_log_table_close_func(log, b, size, avail)
427 #else /* UNIV_DEBUG */
428 # define row_log_table_close(log, b, size, avail) \
429  row_log_table_close_func(log, size, avail)
430 #endif /* UNIV_DEBUG */
431 
432 /******************************************************/
435 UNIV_INTERN
436 void
438 /*=================*/
439  const rec_t* rec,
443  const ulint* offsets,
444  bool purge,
445  trx_id_t trx_id)
447 {
448  ulint old_pk_extra_size;
449  ulint old_pk_size;
450  ulint ext_size = 0;
451  ulint mrec_size;
452  ulint avail_size;
453  mem_heap_t* heap = NULL;
454  const dtuple_t* old_pk;
455  row_ext_t* ext;
456 
457  ut_ad(dict_index_is_clust(index));
458  ut_ad(rec_offs_validate(rec, index, offsets));
459  ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
460  ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
461 #ifdef UNIV_SYNC_DEBUG
462  ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
463  || rw_lock_own(&index->lock, RW_LOCK_EX));
464 #endif /* UNIV_SYNC_DEBUG */
465 
466  if (dict_index_is_corrupted(index)
467  || !dict_index_is_online_ddl(index)
468  || index->online_log->error != DB_SUCCESS) {
469  return;
470  }
471 
472  dict_table_t* new_table = index->online_log->table;
473  dict_index_t* new_index = dict_table_get_first_index(new_table);
474 
475  ut_ad(dict_index_is_clust(new_index));
476  ut_ad(!dict_index_is_online_ddl(new_index));
477 
478  /* Create the tuple PRIMARY KEY, DB_TRX_ID in the new_table. */
479  if (index->online_log->same_pk) {
480  byte* db_trx_id;
481  dtuple_t* tuple;
482  ut_ad(new_index->n_uniq == index->n_uniq);
483 
484  /* The PRIMARY KEY and DB_TRX_ID are in the first
485  fields of the record. */
486  heap = mem_heap_create(
487  DATA_TRX_ID_LEN
488  + DTUPLE_EST_ALLOC(new_index->n_uniq + 1));
489  old_pk = tuple = dtuple_create(heap, new_index->n_uniq + 1);
490  dict_index_copy_types(tuple, new_index, tuple->n_fields);
491  dtuple_set_n_fields_cmp(tuple, new_index->n_uniq);
492 
493  for (ulint i = 0; i < new_index->n_uniq; i++) {
494  ulint len;
495  const void* field = rec_get_nth_field(
496  rec, offsets, i, &len);
497  dfield_t* dfield = dtuple_get_nth_field(
498  tuple, i);
499  ut_ad(len != UNIV_SQL_NULL);
500  ut_ad(!rec_offs_nth_extern(offsets, i));
501  dfield_set_data(dfield, field, len);
502  }
503 
504  db_trx_id = static_cast<byte*>(
505  mem_heap_alloc(heap, DATA_TRX_ID_LEN));
506  trx_write_trx_id(db_trx_id, trx_id);
507 
508  dfield_set_data(dtuple_get_nth_field(tuple, new_index->n_uniq),
509  db_trx_id, DATA_TRX_ID_LEN);
510  } else {
511  /* The PRIMARY KEY has changed. Translate the tuple. */
512  dfield_t* dfield;
513 
514  old_pk = row_log_table_get_pk(rec, index, offsets, &heap);
515 
516  if (!old_pk) {
517  ut_ad(index->online_log->error != DB_SUCCESS);
518  return;
519  }
520 
521  /* Remove DB_ROLL_PTR. */
523  == dict_index_get_n_unique(new_index));
524  ut_ad(dtuple_get_n_fields(old_pk)
525  == dict_index_get_n_unique(new_index) + 2);
526  const_cast<ulint&>(old_pk->n_fields)--;
527 
528  /* Overwrite DB_TRX_ID with the old trx_id. */
529  dfield = dtuple_get_nth_field(old_pk, new_index->n_uniq);
530  ut_ad(dfield_get_type(dfield)->mtype == DATA_SYS);
531  ut_ad(dfield_get_type(dfield)->prtype
532  == (DATA_NOT_NULL | DATA_TRX_ID));
533  ut_ad(dfield_get_len(dfield) == DATA_TRX_ID_LEN);
534  dfield_dup(dfield, heap);
535  trx_write_trx_id(static_cast<byte*>(dfield->data), trx_id);
536  }
537 
538  ut_ad(dtuple_get_n_fields(old_pk) > 1);
539  ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
540  old_pk, old_pk->n_fields - 1)->len);
541  old_pk_size = rec_get_converted_size_temp(
542  new_index, old_pk->fields, old_pk->n_fields,
543  &old_pk_extra_size);
544  ut_ad(old_pk_extra_size < 0x100);
545 
546  mrec_size = 4 + old_pk_size;
547 
548  /* Log enough prefix of the BLOB unless both the
549  old and new table are in COMPACT or REDUNDANT format,
550  which store the prefix in the clustered index record. */
551  if (purge && rec_offs_any_extern(offsets)
552  && (dict_table_get_format(index->table) >= UNIV_FORMAT_B
553  || dict_table_get_format(new_table) >= UNIV_FORMAT_B)) {
554 
555  /* Build a cache of those off-page column prefixes
556  that are referenced by secondary indexes. It can be
557  that none of the off-page columns are needed. */
558  row_build(ROW_COPY_DATA, index, rec,
559  offsets, NULL, NULL, NULL, &ext, heap);
560  if (ext) {
561  /* Log the row_ext_t, ext->ext and ext->buf */
562  ext_size = ext->n_ext * ext->max_len
563  + sizeof(*ext)
564  + ext->n_ext * sizeof(ulint)
565  + (ext->n_ext - 1) * sizeof ext->len;
566  mrec_size += ext_size;
567  }
568  }
569 
570  if (byte* b = row_log_table_open(index->online_log,
571  mrec_size, &avail_size)) {
572  *b++ = ROW_T_DELETE;
573  *b++ = static_cast<byte>(old_pk_extra_size);
574 
575  /* Log the size of external prefix we saved */
576  mach_write_to_2(b, ext_size);
577  b += 2;
578 
580  b + old_pk_extra_size, new_index,
581  old_pk->fields, old_pk->n_fields);
582 
583  b += old_pk_size;
584 
585  if (ext_size) {
586  ulint cur_ext_size = sizeof(*ext)
587  + (ext->n_ext - 1) * sizeof ext->len;
588 
589  memcpy(b, ext, cur_ext_size);
590  b += cur_ext_size;
591 
592  /* Check if we need to col_map to adjust the column
593  number. If columns were added/removed/reordered,
594  adjust the column number. */
595  if (const ulint* col_map =
596  index->online_log->col_map) {
597  for (ulint i = 0; i < ext->n_ext; i++) {
598  const_cast<ulint&>(ext->ext[i]) =
599  col_map[ext->ext[i]];
600  }
601  }
602 
603  memcpy(b, ext->ext, ext->n_ext * sizeof(*ext->ext));
604  b += ext->n_ext * sizeof(*ext->ext);
605 
606  ext_size -= cur_ext_size
607  + ext->n_ext * sizeof(*ext->ext);
608  memcpy(b, ext->buf, ext_size);
609  b += ext_size;
610  }
611 
612  row_log_table_close(
613  index->online_log, b, mrec_size, avail_size);
614  }
615 
616  mem_heap_free(heap);
617 }
618 
619 /******************************************************/
621 static
622 void
623 row_log_table_low_redundant(
624 /*========================*/
625  const rec_t* rec,
628  dict_index_t* index,
630  bool insert,
632  const dtuple_t* old_pk,
635  const dict_index_t* new_index)
638 {
639  ulint old_pk_size;
640  ulint old_pk_extra_size;
641  ulint size;
642  ulint extra_size;
643  ulint mrec_size;
644  ulint avail_size;
645  mem_heap_t* heap = NULL;
646  dtuple_t* tuple;
647 
648  ut_ad(!page_is_comp(page_align(rec)));
650  ut_ad(dict_tf_is_valid(index->table->flags));
651  ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */
652  ut_ad(dict_index_is_clust(new_index));
653 
654  heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields));
655  tuple = dtuple_create(heap, index->n_fields);
656  dict_index_copy_types(tuple, index, index->n_fields);
658 
659  if (rec_get_1byte_offs_flag(rec)) {
660  for (ulint i = 0; i < index->n_fields; i++) {
661  dfield_t* dfield;
662  ulint len;
663  const void* field;
664 
665  dfield = dtuple_get_nth_field(tuple, i);
666  field = rec_get_nth_field_old(rec, i, &len);
667 
668  dfield_set_data(dfield, field, len);
669  }
670  } else {
671  for (ulint i = 0; i < index->n_fields; i++) {
672  dfield_t* dfield;
673  ulint len;
674  const void* field;
675 
676  dfield = dtuple_get_nth_field(tuple, i);
677  field = rec_get_nth_field_old(rec, i, &len);
678 
679  dfield_set_data(dfield, field, len);
680 
681  if (rec_2_is_field_extern(rec, i)) {
682  dfield_set_ext(dfield);
683  }
684  }
685  }
686 
688  index, tuple->fields, tuple->n_fields, &extra_size);
689 
690  mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80);
691 
692  if (insert || index->online_log->same_pk) {
693  ut_ad(!old_pk);
694  old_pk_extra_size = old_pk_size = 0;
695  } else {
696  ut_ad(old_pk);
697  ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
698  ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
699  old_pk, old_pk->n_fields - 2)->len);
700  ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
701  old_pk, old_pk->n_fields - 1)->len);
702 
703  old_pk_size = rec_get_converted_size_temp(
704  new_index, old_pk->fields, old_pk->n_fields,
705  &old_pk_extra_size);
706  ut_ad(old_pk_extra_size < 0x100);
707  mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
708  }
709 
710  if (byte* b = row_log_table_open(index->online_log,
711  mrec_size, &avail_size)) {
712  *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
713 
714  if (old_pk_size) {
715  *b++ = static_cast<byte>(old_pk_extra_size);
716 
718  b + old_pk_extra_size, new_index,
719  old_pk->fields, old_pk->n_fields);
720  b += old_pk_size;
721  }
722 
723  if (extra_size < 0x80) {
724  *b++ = static_cast<byte>(extra_size);
725  } else {
726  ut_ad(extra_size < 0x8000);
727  *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
728  *b++ = static_cast<byte>(extra_size);
729  }
730 
732  b + extra_size, index, tuple->fields, tuple->n_fields);
733  b += size;
734 
735  row_log_table_close(
736  index->online_log, b, mrec_size, avail_size);
737  }
738 
739  mem_heap_free(heap);
740 }
741 
742 /******************************************************/
744 static __attribute__((nonnull(1,2,3)))
745 void
746 row_log_table_low(
747 /*==============*/
748  const rec_t* rec,
750  dict_index_t* index,
752  const ulint* offsets,
753  bool insert,
754  const dtuple_t* old_pk)
756 {
757  ulint omit_size;
758  ulint old_pk_size;
759  ulint old_pk_extra_size;
760  ulint extra_size;
761  ulint mrec_size;
762  ulint avail_size;
763  const dict_index_t* new_index = dict_table_get_first_index(
764  index->online_log->table);
765  ut_ad(dict_index_is_clust(index));
766  ut_ad(dict_index_is_clust(new_index));
767  ut_ad(!dict_index_is_online_ddl(new_index));
768  ut_ad(rec_offs_validate(rec, index, offsets));
769  ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
770  ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
771 #ifdef UNIV_SYNC_DEBUG
772  ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
773  || rw_lock_own(&index->lock, RW_LOCK_EX));
774 #endif /* UNIV_SYNC_DEBUG */
777  ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets));
778 
779  if (dict_index_is_corrupted(index)
780  || !dict_index_is_online_ddl(index)
781  || index->online_log->error != DB_SUCCESS) {
782  return;
783  }
784 
785  if (!rec_offs_comp(offsets)) {
786  row_log_table_low_redundant(
787  rec, index, insert, old_pk, new_index);
788  return;
789  }
790 
792  ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
793 
794  omit_size = REC_N_NEW_EXTRA_BYTES;
795 
796  extra_size = rec_offs_extra_size(offsets) - omit_size;
797 
798  mrec_size = ROW_LOG_HEADER_SIZE
799  + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size;
800 
801  if (insert || index->online_log->same_pk) {
802  ut_ad(!old_pk);
803  old_pk_extra_size = old_pk_size = 0;
804  } else {
805  ut_ad(old_pk);
806  ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
807  ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
808  old_pk, old_pk->n_fields - 2)->len);
809  ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
810  old_pk, old_pk->n_fields - 1)->len);
811 
812  old_pk_size = rec_get_converted_size_temp(
813  new_index, old_pk->fields, old_pk->n_fields,
814  &old_pk_extra_size);
815  ut_ad(old_pk_extra_size < 0x100);
816  mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
817  }
818 
819  if (byte* b = row_log_table_open(index->online_log,
820  mrec_size, &avail_size)) {
821  *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
822 
823  if (old_pk_size) {
824  *b++ = static_cast<byte>(old_pk_extra_size);
825 
827  b + old_pk_extra_size, new_index,
828  old_pk->fields, old_pk->n_fields);
829  b += old_pk_size;
830  }
831 
832  if (extra_size < 0x80) {
833  *b++ = static_cast<byte>(extra_size);
834  } else {
835  ut_ad(extra_size < 0x8000);
836  *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
837  *b++ = static_cast<byte>(extra_size);
838  }
839 
840  memcpy(b, rec - rec_offs_extra_size(offsets), extra_size);
841  b += extra_size;
842  memcpy(b, rec, rec_offs_data_size(offsets));
843  b += rec_offs_data_size(offsets);
844 
845  row_log_table_close(
846  index->online_log, b, mrec_size, avail_size);
847  }
848 }
849 
850 /******************************************************/
853 UNIV_INTERN
854 void
856 /*=================*/
857  const rec_t* rec,
859  dict_index_t* index,
861  const ulint* offsets,
862  const dtuple_t* old_pk)
864 {
865  row_log_table_low(rec, index, offsets, false, old_pk);
866 }
867 
873 static
874 const dict_col_t*
875 row_log_table_get_pk_old_col(
876 /*=========================*/
877  const dict_table_t* table,
878  const ulint* col_map,
879  ulint col_no)
880 {
881  for (ulint i = 0; i < table->n_cols; i++) {
882  if (col_no == col_map[i]) {
883  return(dict_table_get_nth_col(table, i));
884  }
885  }
886 
887  return(NULL);
888 }
889 
902 static
903 dberr_t
904 row_log_table_get_pk_col(
905 /*=====================*/
906  const dict_col_t* col,
907  const dict_field_t* ifield,
908  dfield_t* dfield,
909  mem_heap_t* heap,
910  const rec_t* rec,
911  const ulint* offsets,
912  ulint i,
913  ulint zip_size,
914  ulint max_len)
915 {
916  const byte* field;
917  ulint len;
918 
919  ut_ad(ut_is_2pow(zip_size));
920 
921  field = rec_get_nth_field(rec, offsets, i, &len);
922 
923  if (len == UNIV_SQL_NULL) {
924  return(DB_INVALID_NULL);
925  }
926 
927  if (rec_offs_nth_extern(offsets, i)) {
928  ulint field_len = ifield->prefix_len;
929  byte* blob_field;
930 
931  if (!field_len) {
932  field_len = ifield->fixed_len;
933  if (!field_len) {
934  field_len = max_len + 1;
935  }
936  }
937 
938  blob_field = static_cast<byte*>(
939  mem_heap_alloc(heap, field_len));
940 
942  blob_field, field_len, zip_size, field, len);
943  if (len >= max_len + 1) {
944  return(DB_TOO_BIG_INDEX_COL);
945  }
946 
947  dfield_set_data(dfield, blob_field, len);
948  } else {
949  dfield_set_data(dfield, mem_heap_dup(heap, field, len), len);
950  }
951 
952  return(DB_SUCCESS);
953 }
954 
955 /******************************************************/
960 UNIV_INTERN
961 const dtuple_t*
963 /*=================*/
964  const rec_t* rec,
966  dict_index_t* index,
968  const ulint* offsets,
969  mem_heap_t** heap)
970 {
971  dtuple_t* tuple = NULL;
972  row_log_t* log = index->online_log;
973 
974  ut_ad(dict_index_is_clust(index));
976  ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
977 #ifdef UNIV_SYNC_DEBUG
978  ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
979  || rw_lock_own(&index->lock, RW_LOCK_EX));
980 #endif /* UNIV_SYNC_DEBUG */
981 
982  ut_ad(log);
983  ut_ad(log->table);
984 
985  if (log->same_pk) {
986  /* The PRIMARY KEY columns are unchanged. */
987  return(NULL);
988  }
989 
990  mutex_enter(&log->mutex);
991 
992  /* log->error is protected by log->mutex. */
993  if (log->error == DB_SUCCESS) {
994  dict_table_t* new_table = log->table;
995  dict_index_t* new_index
996  = dict_table_get_first_index(new_table);
997  const ulint new_n_uniq
998  = dict_index_get_n_unique(new_index);
999 
1000  if (!*heap) {
1001  ulint size = 0;
1002 
1003  if (!offsets) {
1004  size += (1 + REC_OFFS_HEADER_SIZE
1005  + index->n_fields)
1006  * sizeof *offsets;
1007  }
1008 
1009  for (ulint i = 0; i < new_n_uniq; i++) {
1010  size += dict_col_get_min_size(
1011  dict_index_get_nth_col(new_index, i));
1012  }
1013 
1014  *heap = mem_heap_create(
1015  DTUPLE_EST_ALLOC(new_n_uniq + 2) + size);
1016  }
1017 
1018  if (!offsets) {
1019  offsets = rec_get_offsets(rec, index, NULL,
1020  ULINT_UNDEFINED, heap);
1021  }
1022 
1023  tuple = dtuple_create(*heap, new_n_uniq + 2);
1024  dict_index_copy_types(tuple, new_index, tuple->n_fields);
1025  dtuple_set_n_fields_cmp(tuple, new_n_uniq);
1026 
1027  const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table);
1028  const ulint zip_size = dict_table_zip_size(index->table);
1029 
1030  for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
1031  dict_field_t* ifield;
1032  dfield_t* dfield;
1033  ulint prtype;
1034  ulint mbminmaxlen;
1035 
1036  ifield = dict_index_get_nth_field(new_index, new_i);
1037  dfield = dtuple_get_nth_field(tuple, new_i);
1038 
1039  const ulint col_no
1040  = dict_field_get_col(ifield)->ind;
1041 
1042  if (const dict_col_t* col
1043  = row_log_table_get_pk_old_col(
1044  index->table, log->col_map, col_no)) {
1045  ulint i = dict_col_get_clust_pos(col, index);
1046 
1047  if (i == ULINT_UNDEFINED) {
1048  ut_ad(0);
1049  log->error = DB_CORRUPTION;
1050  goto err_exit;
1051  }
1052 
1053  log->error = row_log_table_get_pk_col(
1054  col, ifield, dfield, *heap,
1055  rec, offsets, i, zip_size, max_len);
1056 
1057  if (log->error != DB_SUCCESS) {
1058 err_exit:
1059  tuple = NULL;
1060  goto func_exit;
1061  }
1062 
1063  mbminmaxlen = col->mbminmaxlen;
1064  prtype = col->prtype;
1065  } else {
1066  /* No matching column was found in the old
1067  table, so this must be an added column.
1068  Copy the default value. */
1069  ut_ad(log->add_cols);
1070 
1071  dfield_copy(dfield, dtuple_get_nth_field(
1072  log->add_cols, col_no));
1073  mbminmaxlen = dfield->type.mbminmaxlen;
1074  prtype = dfield->type.prtype;
1075  }
1076 
1077  ut_ad(!dfield_is_ext(dfield));
1078  ut_ad(!dfield_is_null(dfield));
1079 
1080  if (ifield->prefix_len) {
1081  ulint len = dtype_get_at_most_n_mbchars(
1082  prtype, mbminmaxlen,
1083  ifield->prefix_len,
1084  dfield_get_len(dfield),
1085  static_cast<const char*>(
1086  dfield_get_data(dfield)));
1087 
1088  ut_ad(len <= dfield_get_len(dfield));
1089  dfield_set_len(dfield, len);
1090  }
1091  }
1092 
1093  const byte* trx_roll = rec
1094  + row_get_trx_id_offset(index, offsets);
1095 
1096  dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
1097  trx_roll, DATA_TRX_ID_LEN);
1098  dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
1099  trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
1100  }
1101 
1102 func_exit:
1103  mutex_exit(&log->mutex);
1104  return(tuple);
1105 }
1106 
1107 /******************************************************/
1110 UNIV_INTERN
1111 void
1113 /*=================*/
1114  const rec_t* rec,
1116  dict_index_t* index,
1118  const ulint* offsets)
1119 {
1120  row_log_table_low(rec, index, offsets, true, NULL);
1121 }
1122 
1123 /******************************************************/
1125 UNIV_INTERN
1126 void
1128 /*====================*/
1129  dict_index_t* index,
1130  ulint page_no)
1131 {
1132  ut_ad(dict_index_is_clust(index));
1134 #ifdef UNIV_SYNC_DEBUG
1135  ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX));
1136 #endif /* UNIV_SYNC_DEBUG */
1137  ut_ad(page_no != FIL_NULL);
1138 
1139  if (index->online_log->error != DB_SUCCESS) {
1140  return;
1141  }
1142 
1143  page_no_map* blobs = index->online_log->blobs;
1144 
1145  if (!blobs) {
1146  index->online_log->blobs = blobs = new page_no_map();
1147  }
1148 
1149 #ifdef UNIV_DEBUG
1150  const ulonglong log_pos = index->online_log->tail.total;
1151 #else
1152 # define log_pos /* empty */
1153 #endif /* UNIV_DEBUG */
1154 
1155  const page_no_map::value_type v(page_no,
1156  row_log_table_blob_t(log_pos));
1157 
1158  std::pair<page_no_map::iterator,bool> p = blobs->insert(v);
1159 
1160  if (!p.second) {
1161  /* Update the existing mapping. */
1162  ut_ad(p.first->first == page_no);
1163  p.first->second.blob_free(log_pos);
1164  }
1165 #undef log_pos
1166 }
1167 
1168 /******************************************************/
1170 UNIV_INTERN
1171 void
1173 /*=====================*/
1174  dict_index_t* index,
1175  ulint page_no)
1176 {
1177  ut_ad(dict_index_is_clust(index));
1179 #ifdef UNIV_SYNC_DEBUG
1180  ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX));
1181 #endif /* UNIV_SYNC_DEBUG */
1182  ut_ad(page_no != FIL_NULL);
1183 
1184  if (index->online_log->error != DB_SUCCESS) {
1185  return;
1186  }
1187 
1188  /* Only track allocations if the same page has been freed
1189  earlier. Double allocation without a free is not allowed. */
1190  if (page_no_map* blobs = index->online_log->blobs) {
1191  page_no_map::iterator p = blobs->find(page_no);
1192 
1193  if (p != blobs->end()) {
1194  ut_ad(p->first == page_no);
1195  p->second.blob_alloc(index->online_log->tail.total);
1196  }
1197  }
1198 }
1199 
1200 /******************************************************/
1204 static __attribute__((nonnull, warn_unused_result))
1205 const dtuple_t*
1206 row_log_table_apply_convert_mrec(
1207 /*=============================*/
1208  const mrec_t* mrec,
1209  dict_index_t* index,
1210  const ulint* offsets,
1211  const row_log_t* log,
1212  mem_heap_t* heap,
1213  trx_id_t trx_id,
1214  dberr_t* error)
1216 {
1217  dtuple_t* row;
1218 
1219  /* This is based on row_build(). */
1220  if (log->add_cols) {
1221  row = dtuple_copy(log->add_cols, heap);
1222  /* dict_table_copy_types() would set the fields to NULL */
1223  for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) {
1225  dict_table_get_nth_col(log->table, i),
1226  dfield_get_type(dtuple_get_nth_field(row, i)));
1227  }
1228  } else {
1229  row = dtuple_create(heap, dict_table_get_n_cols(log->table));
1230  dict_table_copy_types(row, log->table);
1231  }
1232 
1233  for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
1234  const dict_field_t* ind_field
1235  = dict_index_get_nth_field(index, i);
1236 
1237  if (ind_field->prefix_len) {
1238  /* Column prefixes can only occur in key
1239  fields, which cannot be stored externally. For
1240  a column prefix, there should also be the full
1241  field in the clustered index tuple. The row
1242  tuple comprises full fields, not prefixes. */
1243  ut_ad(!rec_offs_nth_extern(offsets, i));
1244  continue;
1245  }
1246 
1247  const dict_col_t* col
1248  = dict_field_get_col(ind_field);
1249  ulint col_no
1250  = log->col_map[dict_col_get_no(col)];
1251 
1252  if (col_no == ULINT_UNDEFINED) {
1253  /* dropped column */
1254  continue;
1255  }
1256 
1257  dfield_t* dfield
1258  = dtuple_get_nth_field(row, col_no);
1259  ulint len;
1260  const byte* data= NULL;
1261 
1262  if (rec_offs_nth_extern(offsets, i)) {
1263  ut_ad(rec_offs_any_extern(offsets));
1264  rw_lock_x_lock(dict_index_get_lock(index));
1265 
1266  if (const page_no_map* blobs = log->blobs) {
1267  data = rec_get_nth_field(
1268  mrec, offsets, i, &len);
1270 
1271  ulint page_no = mach_read_from_4(
1272  data + len - (BTR_EXTERN_FIELD_REF_SIZE
1273  - BTR_EXTERN_PAGE_NO));
1274  page_no_map::const_iterator p = blobs->find(
1275  page_no);
1276  if (p != blobs->end()
1277  && p->second.is_freed(log->head.total)) {
1278  /* This BLOB has been freed.
1279  We must not access the row. */
1280  row = NULL;
1281  }
1282  }
1283 
1284  if (row) {
1286  mrec, offsets,
1287  dict_table_zip_size(index->table),
1288  i, &len, heap);
1289  ut_a(data);
1290  }
1291 
1292  rw_lock_x_unlock(dict_index_get_lock(index));
1293 
1294  if (!row) {
1295  goto func_exit;
1296  }
1297  } else {
1298  data = rec_get_nth_field(mrec, offsets, i, &len);
1299  }
1300 
1301  dfield_set_data(dfield, data, len);
1302 
1303  /* See if any columns were changed to NULL or NOT NULL. */
1304  const dict_col_t* new_col
1305  = dict_table_get_nth_col(log->table, col_no);
1306  ut_ad(new_col->mtype == col->mtype);
1307 
1308  /* Assert that prtype matches except for nullability. */
1309  ut_ad(!((new_col->prtype ^ col->prtype) & ~DATA_NOT_NULL));
1310  ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype)
1311  & ~DATA_NOT_NULL));
1312 
1313  if (new_col->prtype == col->prtype) {
1314  continue;
1315  }
1316 
1317  if ((new_col->prtype & DATA_NOT_NULL)
1318  && dfield_is_null(dfield)) {
1319  /* We got a NULL value for a NOT NULL column. */
1320  *error = DB_INVALID_NULL;
1321  return(NULL);
1322  }
1323 
1324  /* Adjust the DATA_NOT_NULL flag in the parsed row. */
1325  dfield_get_type(dfield)->prtype = new_col->prtype;
1326 
1327  ut_ad(dict_col_type_assert_equal(new_col,
1328  dfield_get_type(dfield)));
1329  }
1330 
1331 func_exit:
1332  *error = DB_SUCCESS;
1333  return(row);
1334 }
1335 
1336 /******************************************************/
1339 static __attribute__((nonnull, warn_unused_result))
1340 dberr_t
1341 row_log_table_apply_insert_low(
1342 /*===========================*/
1343  que_thr_t* thr,
1344  const dtuple_t* row,
1346  trx_id_t trx_id,
1349  mem_heap_t* heap,
1350  row_merge_dup_t* dup)
1352 {
1353  dberr_t error;
1354  dtuple_t* entry;
1355  const row_log_t*log = dup->index->online_log;
1356  dict_index_t* index = dict_table_get_first_index(log->table);
1357 
1358  ut_ad(dtuple_validate(row));
1359  ut_ad(trx_id);
1360 
1361 #ifdef ROW_LOG_APPLY_PRINT
1362  if (row_log_apply_print) {
1363  fprintf(stderr, "table apply insert "
1364  IB_ID_FMT " " IB_ID_FMT "\n",
1365  index->table->id, index->id);
1366  dtuple_print(stderr, row);
1367  }
1368 #endif /* ROW_LOG_APPLY_PRINT */
1369 
1370  static const ulint flags
1371  = (BTR_CREATE_FLAG
1374  | BTR_KEEP_SYS_FLAG);
1375 
1376  entry = row_build_index_entry(row, NULL, index, heap);
1377 
1379  flags, BTR_MODIFY_TREE, index, index->n_uniq, entry, 0, thr);
1380 
1381  switch (error) {
1382  case DB_SUCCESS:
1383  break;
1384  case DB_SUCCESS_LOCKED_REC:
1385  /* The row had already been copied to the table. */
1386  return(DB_SUCCESS);
1387  default:
1388  return(error);
1389  }
1390 
1391  do {
1392  if (!(index = dict_table_get_next_index(index))) {
1393  break;
1394  }
1395 
1396  if (index->type & DICT_FTS) {
1397  continue;
1398  }
1399 
1400  entry = row_build_index_entry(row, NULL, index, heap);
1402  flags, BTR_MODIFY_TREE,
1403  index, offsets_heap, heap, entry, trx_id, thr);
1404  } while (error == DB_SUCCESS);
1405 
1406  return(error);
1407 }
1408 
1409 /******************************************************/
1412 static __attribute__((nonnull, warn_unused_result))
1413 dberr_t
1414 row_log_table_apply_insert(
1415 /*=======================*/
1416  que_thr_t* thr,
1417  const mrec_t* mrec,
1418  const ulint* offsets,
1419  mem_heap_t* offsets_heap,
1421  mem_heap_t* heap,
1422  row_merge_dup_t* dup,
1424  trx_id_t trx_id)
1425 {
1426  const row_log_t*log = dup->index->online_log;
1427  dberr_t error;
1428  const dtuple_t* row = row_log_table_apply_convert_mrec(
1429  mrec, dup->index, offsets, log, heap, trx_id, &error);
1430 
1431  ut_ad(error == DB_SUCCESS || !row);
1432  /* Handling of duplicate key error requires storing
1433  of offending key in a record buffer. */
1434  ut_ad(error != DB_DUPLICATE_KEY);
1435 
1436  if (error != DB_SUCCESS)
1437  return(error);
1438 
1439  if (row) {
1440  error = row_log_table_apply_insert_low(
1441  thr, row, trx_id, offsets_heap, heap, dup);
1442  if (error != DB_SUCCESS) {
1443  /* Report the erroneous row using the new
1444  version of the table. */
1445  innobase_row_to_mysql(dup->table, log->table, row);
1446  }
1447  }
1448  return(error);
1449 }
1450 
1451 /******************************************************/
1454 static __attribute__((nonnull(1, 2, 4, 5), warn_unused_result))
1455 dberr_t
1456 row_log_table_apply_delete_low(
1457 /*===========================*/
1458  btr_pcur_t* pcur,
1460  const ulint* offsets,
1461  const row_ext_t* save_ext,
1463  mem_heap_t* heap,
1464  mtr_t* mtr)
1466 {
1467  dberr_t error;
1468  row_ext_t* ext;
1469  dtuple_t* row;
1470  dict_index_t* index = btr_pcur_get_btr_cur(pcur)->index;
1471 
1472  ut_ad(dict_index_is_clust(index));
1473 
1474 #ifdef ROW_LOG_APPLY_PRINT
1475  if (row_log_apply_print) {
1476  fprintf(stderr, "table apply delete "
1477  IB_ID_FMT " " IB_ID_FMT "\n",
1478  index->table->id, index->id);
1479  rec_print_new(stderr, btr_pcur_get_rec(pcur), offsets);
1480  }
1481 #endif /* ROW_LOG_APPLY_PRINT */
1482  if (dict_table_get_next_index(index)) {
1483  /* Build a row template for purging secondary index entries. */
1484  row = row_build(
1485  ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
1486  offsets, NULL, NULL, NULL,
1487  save_ext ? NULL : &ext, heap);
1488  if (!save_ext) {
1489  save_ext = ext;
1490  }
1491  } else {
1492  row = NULL;
1493  }
1494 
1495  btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
1496  BTR_CREATE_FLAG, RB_NONE, mtr);
1497  mtr_commit(mtr);
1498 
1499  if (error != DB_SUCCESS) {
1500  return(error);
1501  }
1502 
1503  while ((index = dict_table_get_next_index(index)) != NULL) {
1504  if (index->type & DICT_FTS) {
1505  continue;
1506  }
1507 
1508  const dtuple_t* entry = row_build_index_entry(
1509  row, save_ext, index, heap);
1510  mtr_start(mtr);
1511  btr_pcur_open(index, entry, PAGE_CUR_LE,
1512  BTR_MODIFY_TREE, pcur, mtr);
1513 #ifdef UNIV_DEBUG
1514  switch (btr_pcur_get_btr_cur(pcur)->flag) {
1515  case BTR_CUR_DELETE_REF:
1516  case BTR_CUR_DEL_MARK_IBUF:
1517  case BTR_CUR_DELETE_IBUF:
1519  /* We did not request buffering. */
1520  break;
1521  case BTR_CUR_HASH:
1522  case BTR_CUR_HASH_FAIL:
1523  case BTR_CUR_BINARY:
1524  goto flag_ok;
1525  }
1526  ut_ad(0);
1527 flag_ok:
1528 #endif /* UNIV_DEBUG */
1529 
1530  if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
1531  || btr_pcur_get_low_match(pcur) < index->n_uniq) {
1532  /* All secondary index entries should be
1533  found, because new_table is being modified by
1534  this thread only, and all indexes should be
1535  updated in sync. */
1536  mtr_commit(mtr);
1537  return(DB_INDEX_CORRUPT);
1538  }
1539 
1540  btr_cur_pessimistic_delete(&error, FALSE,
1541  btr_pcur_get_btr_cur(pcur),
1542  BTR_CREATE_FLAG, RB_NONE, mtr);
1543  mtr_commit(mtr);
1544  }
1545 
1546  return(error);
1547 }
1548 
1549 /******************************************************/
1552 static __attribute__((nonnull(1, 3, 4, 5, 6, 7), warn_unused_result))
1553 dberr_t
1554 row_log_table_apply_delete(
1555 /*=======================*/
1556  que_thr_t* thr,
1557  ulint trx_id_col,
1560  const mrec_t* mrec,
1561  const ulint* moffsets,
1562  mem_heap_t* offsets_heap,
1564  mem_heap_t* heap,
1565  dict_table_t* new_table,
1566  const row_ext_t* save_ext)
1568 {
1569  dict_index_t* index = dict_table_get_first_index(new_table);
1570  dtuple_t* old_pk;
1571  mtr_t mtr;
1572  btr_pcur_t pcur;
1573  ulint* offsets;
1574 
1575  ut_ad(rec_offs_n_fields(moffsets)
1576  == dict_index_get_n_unique(index) + 1);
1577  ut_ad(!rec_offs_any_extern(moffsets));
1578 
1579  /* Convert the row to a search tuple. */
1580  old_pk = dtuple_create(heap, index->n_uniq + 1);
1581  dict_index_copy_types(old_pk, index, old_pk->n_fields);
1582  dtuple_set_n_fields_cmp(old_pk, index->n_uniq);
1583 
1584  for (ulint i = 0; i <= index->n_uniq; i++) {
1585  ulint len;
1586  const void* field;
1587  field = rec_get_nth_field(mrec, moffsets, i, &len);
1588  ut_ad(len != UNIV_SQL_NULL);
1589  dfield_set_data(dtuple_get_nth_field(old_pk, i),
1590  field, len);
1591  }
1592 
1593  mtr_start(&mtr);
1594  btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1595  BTR_MODIFY_TREE, &pcur, &mtr);
1596 #ifdef UNIV_DEBUG
1597  switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1598  case BTR_CUR_DELETE_REF:
1599  case BTR_CUR_DEL_MARK_IBUF:
1600  case BTR_CUR_DELETE_IBUF:
1602  /* We did not request buffering. */
1603  break;
1604  case BTR_CUR_HASH:
1605  case BTR_CUR_HASH_FAIL:
1606  case BTR_CUR_BINARY:
1607  goto flag_ok;
1608  }
1609  ut_ad(0);
1610 flag_ok:
1611 #endif /* UNIV_DEBUG */
1612 
1613  if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1614  || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1615 all_done:
1616  mtr_commit(&mtr);
1617  /* The record was not found. All done. */
1618  return(DB_SUCCESS);
1619  }
1620 
1621  offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL,
1622  ULINT_UNDEFINED, &offsets_heap);
1623 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
1624  ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
1625 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
1626 
1627  /* Only remove the record if DB_TRX_ID matches what was
1628  buffered. */
1629 
1630  {
1631  ulint len;
1632  const void* mrec_trx_id
1633  = rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
1634  ut_ad(len == DATA_TRX_ID_LEN);
1635  const void* rec_trx_id
1636  = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1637  trx_id_col, &len);
1638  ut_ad(len == DATA_TRX_ID_LEN);
1639  if (memcmp(mrec_trx_id, rec_trx_id, DATA_TRX_ID_LEN)) {
1640  goto all_done;
1641  }
1642  }
1643 
1644  return(row_log_table_apply_delete_low(&pcur, offsets, save_ext,
1645  heap, &mtr));
1646 }
1647 
1648 /******************************************************/
1651 static __attribute__((nonnull, warn_unused_result))
1652 dberr_t
1653 row_log_table_apply_update(
1654 /*=======================*/
1655  que_thr_t* thr,
1656  ulint trx_id_col,
1659  ulint new_trx_id_col,
1662  const mrec_t* mrec,
1663  const ulint* offsets,
1664  mem_heap_t* offsets_heap,
1666  mem_heap_t* heap,
1667  row_merge_dup_t* dup,
1669  trx_id_t trx_id,
1670  const dtuple_t* old_pk)
1674 {
1675  const row_log_t*log = dup->index->online_log;
1676  const dtuple_t* row;
1677  dict_index_t* index = dict_table_get_first_index(log->table);
1678  mtr_t mtr;
1679  btr_pcur_t pcur;
1680  dberr_t error;
1681 
1683  == dict_index_get_n_unique(index));
1684  ut_ad(dtuple_get_n_fields(old_pk)
1685  == dict_index_get_n_unique(index)
1686  + (dup->index->online_log->same_pk ? 0 : 2));
1687 
1688  row = row_log_table_apply_convert_mrec(
1689  mrec, dup->index, offsets, log, heap, trx_id, &error);
1690 
1691  ut_ad(error == DB_SUCCESS || !row);
1692  /* Handling of duplicate key error requires storing
1693  of offending key in a record buffer. */
1694  ut_ad(error != DB_DUPLICATE_KEY);
1695 
1696  if (!row) {
1697  return(error);
1698  }
1699 
1700  mtr_start(&mtr);
1701  btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1702  BTR_MODIFY_TREE, &pcur, &mtr);
1703 #ifdef UNIV_DEBUG
1704  switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1705  case BTR_CUR_DELETE_REF:
1706  case BTR_CUR_DEL_MARK_IBUF:
1707  case BTR_CUR_DELETE_IBUF:
1709  ut_ad(0);/* We did not request buffering. */
1710  case BTR_CUR_HASH:
1711  case BTR_CUR_HASH_FAIL:
1712  case BTR_CUR_BINARY:
1713  break;
1714  }
1715 #endif /* UNIV_DEBUG */
1716 
1717  if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1718  || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1719  mtr_commit(&mtr);
1720 insert:
1721  ut_ad(mtr.state == MTR_COMMITTED);
1722  /* The row was not found. Insert it. */
1723  error = row_log_table_apply_insert_low(
1724  thr, row, trx_id, offsets_heap, heap, dup);
1725  if (error != DB_SUCCESS) {
1726 err_exit:
1727  /* Report the erroneous row using the new
1728  version of the table. */
1729  innobase_row_to_mysql(dup->table, log->table, row);
1730  }
1731 
1732  return(error);
1733  }
1734 
1735  /* Update the record. */
1736  ulint* cur_offsets = rec_get_offsets(
1737  btr_pcur_get_rec(&pcur),
1738  index, NULL, ULINT_UNDEFINED, &offsets_heap);
1739 
1741  row, NULL, index, heap);
1742  const upd_t* update = row_upd_build_difference_binary(
1743  index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
1744  false, NULL, heap);
1745 
1746  error = DB_SUCCESS;
1747 
1748  if (!update->n_fields) {
1749  /* Nothing to do. */
1750  goto func_exit;
1751  }
1752 
1753  if (rec_offs_any_extern(cur_offsets)) {
1754  /* If the record contains any externally stored
1755  columns, perform the update by delete and insert,
1756  because we will not write any undo log that would
1757  allow purge to free any orphaned externally stored
1758  columns. */
1759 delete_insert:
1760  error = row_log_table_apply_delete_low(
1761  &pcur, cur_offsets, NULL, heap, &mtr);
1762  ut_ad(mtr.state == MTR_COMMITTED);
1763 
1764  if (error != DB_SUCCESS) {
1765  goto err_exit;
1766  }
1767 
1768  goto insert;
1769  }
1770 
1771  if (upd_get_nth_field(update, 0)->field_no < new_trx_id_col) {
1772  if (dup->index->online_log->same_pk) {
1773  /* The ROW_T_UPDATE log record should only be
1774  written when the PRIMARY KEY fields of the
1775  record did not change in the old table. We
1776  can only get a change of PRIMARY KEY columns
1777  in the rebuilt table if the PRIMARY KEY was
1778  redefined (!same_pk). */
1779  ut_ad(0);
1780  error = DB_CORRUPTION;
1781  goto func_exit;
1782  }
1783 
1784  /* The PRIMARY KEY columns have changed.
1785  Delete the record with the old PRIMARY KEY value,
1786  provided that it carries the same
1787  DB_TRX_ID,DB_ROLL_PTR. Then, insert the new row. */
1788  ulint len;
1789  const byte* cur_trx_roll = rec_get_nth_field(
1790  mrec, offsets, trx_id_col, &len);
1791  ut_ad(len == DATA_TRX_ID_LEN);
1792  const dfield_t* new_trx_roll = dtuple_get_nth_field(
1793  old_pk, new_trx_id_col);
1794  /* We assume that DB_TRX_ID,DB_ROLL_PTR are stored
1795  in one contiguous block. */
1796  ut_ad(rec_get_nth_field(mrec, offsets, trx_id_col + 1, &len)
1797  == cur_trx_roll + DATA_TRX_ID_LEN);
1798  ut_ad(len == DATA_ROLL_PTR_LEN);
1799  ut_ad(new_trx_roll->len == DATA_TRX_ID_LEN);
1800  ut_ad(dtuple_get_nth_field(old_pk, new_trx_id_col + 1)
1801  -> len == DATA_ROLL_PTR_LEN);
1802  ut_ad(static_cast<const byte*>(
1803  dtuple_get_nth_field(old_pk, new_trx_id_col + 1)
1804  ->data)
1805  == static_cast<const byte*>(new_trx_roll->data)
1806  + DATA_TRX_ID_LEN);
1807 
1808  if (!memcmp(cur_trx_roll, new_trx_roll->data,
1809  DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
1810  /* The old row exists. Remove it. */
1811  goto delete_insert;
1812  }
1813 
1814  /* Unless we called row_log_table_apply_delete_low(),
1815  this will likely cause a duplicate key error. */
1816  mtr_commit(&mtr);
1817  goto insert;
1818  }
1819 
1820  dtuple_t* old_row;
1821  row_ext_t* old_ext;
1822 
1823  if (dict_table_get_next_index(index)) {
1824  /* Construct the row corresponding to the old value of
1825  the record. */
1826  old_row = row_build(
1827  ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur),
1828  cur_offsets, NULL, NULL, NULL, &old_ext, heap);
1829  ut_ad(old_row);
1830 #ifdef ROW_LOG_APPLY_PRINT
1831  if (row_log_apply_print) {
1832  fprintf(stderr, "table apply update "
1833  IB_ID_FMT " " IB_ID_FMT "\n",
1834  index->table->id, index->id);
1835  dtuple_print(stderr, old_row);
1836  dtuple_print(stderr, row);
1837  }
1838 #endif /* ROW_LOG_APPLY_PRINT */
1839  } else {
1840  old_row = NULL;
1841  old_ext = NULL;
1842  }
1843 
1844  big_rec_t* big_rec;
1845 
1850  btr_pcur_get_btr_cur(&pcur),
1851  &cur_offsets, &offsets_heap, heap, &big_rec,
1852  update, 0, thr, 0, &mtr);
1853 
1854  if (big_rec) {
1855  if (error == DB_SUCCESS) {
1857  index, btr_pcur_get_block(&pcur),
1858  btr_pcur_get_rec(&pcur), cur_offsets,
1859  big_rec, &mtr, BTR_STORE_UPDATE);
1860  }
1861 
1862  dtuple_big_rec_free(big_rec);
1863  }
1864 
1865  while ((index = dict_table_get_next_index(index)) != NULL) {
1866  if (error != DB_SUCCESS) {
1867  break;
1868  }
1869 
1870  if (index->type & DICT_FTS) {
1871  continue;
1872  }
1873 
1874  if (!row_upd_changes_ord_field_binary(
1875  index, update, thr, old_row, NULL)) {
1876  continue;
1877  }
1878 
1879  mtr_commit(&mtr);
1880 
1881  entry = row_build_index_entry(old_row, old_ext, index, heap);
1882  if (!entry) {
1883  ut_ad(0);
1884  return(DB_CORRUPTION);
1885  }
1886 
1887  mtr_start(&mtr);
1888 
1890  index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
1891  ut_ad(0);
1892  error = DB_CORRUPTION;
1893  break;
1894  }
1895 
1897  &error, FALSE, btr_pcur_get_btr_cur(&pcur),
1898  BTR_CREATE_FLAG, RB_NONE, &mtr);
1899 
1900  if (error != DB_SUCCESS) {
1901  break;
1902  }
1903 
1904  mtr_commit(&mtr);
1905 
1906  entry = row_build_index_entry(row, NULL, index, heap);
1910  BTR_MODIFY_TREE, index, offsets_heap, heap,
1911  entry, trx_id, thr);
1912 
1913  mtr_start(&mtr);
1914  }
1915 
1916 func_exit:
1917  mtr_commit(&mtr);
1918  if (error != DB_SUCCESS) {
1919  goto err_exit;
1920  }
1921 
1922  return(error);
1923 }
1924 
1925 /******************************************************/
1929 static __attribute__((nonnull, warn_unused_result))
1930 const mrec_t*
1931 row_log_table_apply_op(
1932 /*===================*/
1933  que_thr_t* thr,
1934  ulint trx_id_col,
1936  ulint new_trx_id_col,
1938  row_merge_dup_t* dup,
1940  dberr_t* error,
1942  mem_heap_t* offsets_heap,
1944  mem_heap_t* heap,
1945  const mrec_t* mrec,
1946  const mrec_t* mrec_end,
1947  ulint* offsets)
1949 {
1950  row_log_t* log = dup->index->online_log;
1951  dict_index_t* new_index = dict_table_get_first_index(log->table);
1952  ulint extra_size;
1953  const mrec_t* next_mrec;
1954  dtuple_t* old_pk;
1955  row_ext_t* ext;
1956  ulint ext_size;
1957 
1958  ut_ad(dict_index_is_clust(dup->index));
1959  ut_ad(dup->index->table != log->table);
1960  ut_ad(log->head.total <= log->tail.total);
1961 
1962  *error = DB_SUCCESS;
1963 
1964  /* 3 = 1 (op type) + 1 (ext_size) + at least 1 byte payload */
1965  if (mrec + 3 >= mrec_end) {
1966  return(NULL);
1967  }
1968 
1969  const mrec_t* const mrec_start = mrec;
1970 
1971  switch (*mrec++) {
1972  default:
1973  ut_ad(0);
1974  *error = DB_CORRUPTION;
1975  return(NULL);
1976  case ROW_T_INSERT:
1977  extra_size = *mrec++;
1978 
1979  if (extra_size >= 0x80) {
1980  /* Read another byte of extra_size. */
1981 
1982  extra_size = (extra_size & 0x7f) << 8;
1983  extra_size |= *mrec++;
1984  }
1985 
1986  mrec += extra_size;
1987 
1988  if (mrec > mrec_end) {
1989  return(NULL);
1990  }
1991 
1992  rec_offs_set_n_fields(offsets, dup->index->n_fields);
1993  rec_init_offsets_temp(mrec, dup->index, offsets);
1994 
1995  next_mrec = mrec + rec_offs_data_size(offsets);
1996 
1997  if (next_mrec > mrec_end) {
1998  return(NULL);
1999  } else {
2000  log->head.total += next_mrec - mrec_start;
2001 
2002  ulint len;
2003  const byte* db_trx_id
2004  = rec_get_nth_field(
2005  mrec, offsets, trx_id_col, &len);
2006  ut_ad(len == DATA_TRX_ID_LEN);
2007  *error = row_log_table_apply_insert(
2008  thr, mrec, offsets, offsets_heap,
2009  heap, dup, trx_read_trx_id(db_trx_id));
2010  }
2011  break;
2012 
2013  case ROW_T_DELETE:
2014  /* 1 (extra_size) + 2 (ext_size) + at least 1 (payload) */
2015  if (mrec + 4 >= mrec_end) {
2016  return(NULL);
2017  }
2018 
2019  extra_size = *mrec++;
2020  ext_size = mach_read_from_2(mrec);
2021  mrec += 2;
2022  ut_ad(mrec < mrec_end);
2023 
2024  /* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
2025  For fixed-length PRIMARY key columns, it is 0. */
2026  mrec += extra_size;
2027 
2028  rec_offs_set_n_fields(offsets, new_index->n_uniq + 1);
2029  rec_init_offsets_temp(mrec, new_index, offsets);
2030  next_mrec = mrec + rec_offs_data_size(offsets) + ext_size;
2031  if (next_mrec > mrec_end) {
2032  return(NULL);
2033  }
2034 
2035  log->head.total += next_mrec - mrec_start;
2036 
2037  /* If there are external fields, retrieve those logged
2038  prefix info and reconstruct the row_ext_t */
2039  if (ext_size) {
2040  /* We use memcpy to avoid unaligned
2041  access on some non-x86 platforms.*/
2042  ext = static_cast<row_ext_t*>(
2043  mem_heap_dup(heap,
2044  mrec + rec_offs_data_size(offsets),
2045  ext_size));
2046 
2047  byte* ext_start = reinterpret_cast<byte*>(ext);
2048 
2049  ulint ext_len = sizeof(*ext)
2050  + (ext->n_ext - 1) * sizeof ext->len;
2051 
2052  ext->ext = reinterpret_cast<ulint*>(ext_start + ext_len);
2053  ext_len += ext->n_ext * sizeof(*ext->ext);
2054 
2055  ext->buf = static_cast<byte*>(ext_start + ext_len);
2056  } else {
2057  ext = NULL;
2058  }
2059 
2060  *error = row_log_table_apply_delete(
2061  thr, new_trx_id_col,
2062  mrec, offsets, offsets_heap, heap,
2063  log->table, ext);
2064  break;
2065 
2066  case ROW_T_UPDATE:
2067  /* Logically, the log entry consists of the
2068  (PRIMARY KEY,DB_TRX_ID) of the old value (converted
2069  to the new primary key definition) followed by
2070  the new value in the old table definition. If the
2071  definition of the columns belonging to PRIMARY KEY
2072  is not changed, the log will only contain
2073  DB_TRX_ID,new_row. */
2074 
2075  if (dup->index->online_log->same_pk) {
2076  ut_ad(new_index->n_uniq == dup->index->n_uniq);
2077 
2078  extra_size = *mrec++;
2079 
2080  if (extra_size >= 0x80) {
2081  /* Read another byte of extra_size. */
2082 
2083  extra_size = (extra_size & 0x7f) << 8;
2084  extra_size |= *mrec++;
2085  }
2086 
2087  mrec += extra_size;
2088 
2089  if (mrec > mrec_end) {
2090  return(NULL);
2091  }
2092 
2093  rec_offs_set_n_fields(offsets, dup->index->n_fields);
2094  rec_init_offsets_temp(mrec, dup->index, offsets);
2095 
2096  next_mrec = mrec + rec_offs_data_size(offsets);
2097 
2098  if (next_mrec > mrec_end) {
2099  return(NULL);
2100  }
2101 
2102  old_pk = dtuple_create(heap, new_index->n_uniq);
2104  old_pk, new_index, old_pk->n_fields);
2105 
2106  /* Copy the PRIMARY KEY fields from mrec to old_pk. */
2107  for (ulint i = 0; i < new_index->n_uniq; i++) {
2108  const void* field;
2109  ulint len;
2110  dfield_t* dfield;
2111 
2112  ut_ad(!rec_offs_nth_extern(offsets, i));
2113 
2114  field = rec_get_nth_field(
2115  mrec, offsets, i, &len);
2116  ut_ad(len != UNIV_SQL_NULL);
2117 
2118  dfield = dtuple_get_nth_field(old_pk, i);
2119  dfield_set_data(dfield, field, len);
2120  }
2121  } else {
2122  /* We assume extra_size < 0x100
2123  for the PRIMARY KEY prefix. */
2124  mrec += *mrec + 1;
2125 
2126  if (mrec > mrec_end) {
2127  return(NULL);
2128  }
2129 
2130  /* Get offsets for PRIMARY KEY,
2131  DB_TRX_ID, DB_ROLL_PTR. */
2132  rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2133  rec_init_offsets_temp(mrec, new_index, offsets);
2134 
2135  next_mrec = mrec + rec_offs_data_size(offsets);
2136  if (next_mrec + 2 > mrec_end) {
2137  return(NULL);
2138  }
2139 
2140  /* Copy the PRIMARY KEY fields and
2141  DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */
2142  old_pk = dtuple_create(heap, new_index->n_uniq + 2);
2143  dict_index_copy_types(old_pk, new_index,
2144  old_pk->n_fields);
2145 
2146  for (ulint i = 0;
2147  i < dict_index_get_n_unique(new_index) + 2;
2148  i++) {
2149  const void* field;
2150  ulint len;
2151  dfield_t* dfield;
2152 
2153  ut_ad(!rec_offs_nth_extern(offsets, i));
2154 
2155  field = rec_get_nth_field(
2156  mrec, offsets, i, &len);
2157  ut_ad(len != UNIV_SQL_NULL);
2158 
2159  dfield = dtuple_get_nth_field(old_pk, i);
2160  dfield_set_data(dfield, field, len);
2161  }
2162 
2163  mrec = next_mrec;
2164 
2165  /* Fetch the new value of the row as it was
2166  in the old table definition. */
2167  extra_size = *mrec++;
2168 
2169  if (extra_size >= 0x80) {
2170  /* Read another byte of extra_size. */
2171 
2172  extra_size = (extra_size & 0x7f) << 8;
2173  extra_size |= *mrec++;
2174  }
2175 
2176  mrec += extra_size;
2177 
2178  if (mrec > mrec_end) {
2179  return(NULL);
2180  }
2181 
2182  rec_offs_set_n_fields(offsets, dup->index->n_fields);
2183  rec_init_offsets_temp(mrec, dup->index, offsets);
2184 
2185  next_mrec = mrec + rec_offs_data_size(offsets);
2186 
2187  if (next_mrec > mrec_end) {
2188  return(NULL);
2189  }
2190  }
2191 
2192  ut_ad(next_mrec <= mrec_end);
2193  log->head.total += next_mrec - mrec_start;
2194  dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq);
2195 
2196  {
2197  ulint len;
2198  const byte* db_trx_id
2199  = rec_get_nth_field(
2200  mrec, offsets, trx_id_col, &len);
2201  ut_ad(len == DATA_TRX_ID_LEN);
2202  *error = row_log_table_apply_update(
2203  thr, trx_id_col, new_trx_id_col,
2204  mrec, offsets, offsets_heap,
2205  heap, dup, trx_read_trx_id(db_trx_id), old_pk);
2206  }
2207 
2208  break;
2209  }
2210 
2211  ut_ad(log->head.total <= log->tail.total);
2212  mem_heap_empty(offsets_heap);
2213  mem_heap_empty(heap);
2214  return(next_mrec);
2215 }
2216 
2217 /******************************************************/
2220 static __attribute__((nonnull, warn_unused_result))
2221 dberr_t
2222 row_log_table_apply_ops(
2223 /*====================*/
2224  que_thr_t* thr,
2225  row_merge_dup_t*dup)
2227 {
2228  dberr_t error;
2229  const mrec_t* mrec = NULL;
2230  const mrec_t* next_mrec;
2231  const mrec_t* mrec_end = NULL; /* silence bogus warning */
2232  const mrec_t* next_mrec_end;
2233  mem_heap_t* heap;
2235  ulint* offsets;
2236  bool has_index_lock;
2237  dict_index_t* index = const_cast<dict_index_t*>(
2238  dup->index);
2239  dict_table_t* new_table = index->online_log->table;
2240  dict_index_t* new_index = dict_table_get_first_index(
2241  new_table);
2242  const ulint i = 1 + REC_OFFS_HEADER_SIZE
2244  dict_index_get_n_unique(new_index) + 2);
2245  const ulint trx_id_col = dict_col_get_clust_pos(
2246  dict_table_get_sys_col(index->table, DATA_TRX_ID), index);
2247  const ulint new_trx_id_col = dict_col_get_clust_pos(
2248  dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index);
2249  trx_t* trx = thr_get_trx(thr);
2250 
2251  ut_ad(dict_index_is_clust(index));
2253  ut_ad(trx->mysql_thd);
2254 #ifdef UNIV_SYNC_DEBUG
2255  ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2256 #endif /* UNIV_SYNC_DEBUG */
2257  ut_ad(!dict_index_is_online_ddl(new_index));
2258  ut_ad(trx_id_col > 0);
2259  ut_ad(trx_id_col != ULINT_UNDEFINED);
2260  ut_ad(new_trx_id_col > 0);
2261  ut_ad(new_trx_id_col != ULINT_UNDEFINED);
2262 
2263  UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
2264 
2265  offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
2266  offsets[0] = i;
2267  offsets[1] = dict_index_get_n_fields(index);
2268 
2269  heap = mem_heap_create(UNIV_PAGE_SIZE);
2270  offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
2271  has_index_lock = true;
2272 
2273 next_block:
2274  ut_ad(has_index_lock);
2275 #ifdef UNIV_SYNC_DEBUG
2276  ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2277 #endif /* UNIV_SYNC_DEBUG */
2278  ut_ad(index->online_log->head.bytes == 0);
2279 
2280  if (trx_is_interrupted(trx)) {
2281  goto interrupted;
2282  }
2283 
2284  if (dict_index_is_corrupted(index)) {
2285  error = DB_INDEX_CORRUPT;
2286  goto func_exit;
2287  }
2288 
2290 
2291  error = index->online_log->error;
2292 
2293  if (error != DB_SUCCESS) {
2294  goto func_exit;
2295  }
2296 
2297  if (UNIV_UNLIKELY(index->online_log->head.blocks
2298  > index->online_log->tail.blocks)) {
2299 unexpected_eof:
2300  fprintf(stderr, "InnoDB: unexpected end of temporary file"
2301  " for table %s\n", index->table_name);
2302 corruption:
2303  error = DB_CORRUPTION;
2304  goto func_exit;
2305  }
2306 
2307  if (index->online_log->head.blocks
2308  == index->online_log->tail.blocks) {
2309  if (index->online_log->head.blocks) {
2310 #ifdef HAVE_FTRUNCATE
2311  /* Truncate the file in order to save space. */
2312  ftruncate(index->online_log->fd, 0);
2313 #endif /* HAVE_FTRUNCATE */
2314  index->online_log->head.blocks
2315  = index->online_log->tail.blocks = 0;
2316  }
2317 
2318  next_mrec = index->online_log->tail.block;
2319  next_mrec_end = next_mrec + index->online_log->tail.bytes;
2320 
2321  if (next_mrec_end == next_mrec) {
2322  /* End of log reached. */
2323 all_done:
2324  ut_ad(has_index_lock);
2325  ut_ad(index->online_log->head.blocks == 0);
2326  ut_ad(index->online_log->tail.blocks == 0);
2327  index->online_log->head.bytes = 0;
2328  index->online_log->tail.bytes = 0;
2329  error = DB_SUCCESS;
2330  goto func_exit;
2331  }
2332  } else {
2333  os_offset_t ofs;
2334  ibool success;
2335 
2336  ofs = (os_offset_t) index->online_log->head.blocks
2338 
2339  ut_ad(has_index_lock);
2340  has_index_lock = false;
2341  rw_lock_x_unlock(dict_index_get_lock(index));
2342 
2343  log_free_check();
2344 
2346 
2347  success = os_file_read_no_error_handling(
2348  OS_FILE_FROM_FD(index->online_log->fd),
2349  index->online_log->head.block, ofs,
2351 
2352  if (!success) {
2353  fprintf(stderr, "InnoDB: unable to read temporary file"
2354  " for table %s\n", index->table_name);
2355  goto corruption;
2356  }
2357 
2358 #ifdef POSIX_FADV_DONTNEED
2359  /* Each block is read exactly once. Free up the file cache. */
2360  posix_fadvise(index->online_log->fd,
2361  ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
2362 #endif /* POSIX_FADV_DONTNEED */
2363 #if 0 //def FALLOC_FL_PUNCH_HOLE
2364  /* Try to deallocate the space for the file on disk.
2365  This should work on ext4 on Linux 2.6.39 and later,
2366  and be ignored when the operation is unsupported. */
2367  fallocate(index->online_log->fd,
2368  FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
2369  ofs, srv_buf_size);
2370 #endif /* FALLOC_FL_PUNCH_HOLE */
2371 
2372  next_mrec = index->online_log->head.block;
2373  next_mrec_end = next_mrec + srv_sort_buf_size;
2374  }
2375 
2376  /* This read is not protected by index->online_log->mutex for
2377  performance reasons. We will eventually notice any error that
2378  was flagged by a DML thread. */
2379  error = index->online_log->error;
2380 
2381  if (error != DB_SUCCESS) {
2382  goto func_exit;
2383  }
2384 
2385  if (mrec) {
2386  /* A partial record was read from the previous block.
2387  Copy the temporary buffer full, as we do not know the
2388  length of the record. Parse subsequent records from
2389  the bigger buffer index->online_log->head.block
2390  or index->online_log->tail.block. */
2391 
2392  ut_ad(mrec == index->online_log->head.buf);
2393  ut_ad(mrec_end > mrec);
2394  ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
2395 
2396  memcpy((mrec_t*) mrec_end, next_mrec,
2397  (&index->online_log->head.buf)[1] - mrec_end);
2398  mrec = row_log_table_apply_op(
2399  thr, trx_id_col, new_trx_id_col,
2400  dup, &error, offsets_heap, heap,
2401  index->online_log->head.buf,
2402  (&index->online_log->head.buf)[1], offsets);
2403  if (error != DB_SUCCESS) {
2404  goto func_exit;
2405  } else if (UNIV_UNLIKELY(mrec == NULL)) {
2406  /* The record was not reassembled properly. */
2407  goto corruption;
2408  }
2409  /* The record was previously found out to be
2410  truncated. Now that the parse buffer was extended,
2411  it should proceed beyond the old end of the buffer. */
2412  ut_a(mrec > mrec_end);
2413 
2414  index->online_log->head.bytes = mrec - mrec_end;
2415  next_mrec += index->online_log->head.bytes;
2416  }
2417 
2418  ut_ad(next_mrec <= next_mrec_end);
2419  /* The following loop must not be parsing the temporary
2420  buffer, but head.block or tail.block. */
2421 
2422  /* mrec!=NULL means that the next record starts from the
2423  middle of the block */
2424  ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
2425 
2426 #ifdef UNIV_DEBUG
2427  if (next_mrec_end == index->online_log->head.block
2428  + srv_sort_buf_size) {
2429  /* If tail.bytes == 0, next_mrec_end can also be at
2430  the end of tail.block. */
2431  if (index->online_log->tail.bytes == 0) {
2432  ut_ad(next_mrec == next_mrec_end);
2433  ut_ad(index->online_log->tail.blocks == 0);
2434  ut_ad(index->online_log->head.blocks == 0);
2435  ut_ad(index->online_log->head.bytes == 0);
2436  } else {
2437  ut_ad(next_mrec == index->online_log->head.block
2438  + index->online_log->head.bytes);
2439  ut_ad(index->online_log->tail.blocks
2440  > index->online_log->head.blocks);
2441  }
2442  } else if (next_mrec_end == index->online_log->tail.block
2443  + index->online_log->tail.bytes) {
2444  ut_ad(next_mrec == index->online_log->tail.block
2445  + index->online_log->head.bytes);
2446  ut_ad(index->online_log->tail.blocks == 0);
2447  ut_ad(index->online_log->head.blocks == 0);
2448  ut_ad(index->online_log->head.bytes
2449  <= index->online_log->tail.bytes);
2450  } else {
2451  ut_error;
2452  }
2453 #endif /* UNIV_DEBUG */
2454 
2455  mrec_end = next_mrec_end;
2456 
2457  while (!trx_is_interrupted(trx)) {
2458  mrec = next_mrec;
2459  ut_ad(mrec < mrec_end);
2460 
2461  if (!has_index_lock) {
2462  /* We are applying operations from a different
2463  block than the one that is being written to.
2464  We do not hold index->lock in order to
2465  allow other threads to concurrently buffer
2466  modifications. */
2467  ut_ad(mrec >= index->online_log->head.block);
2468  ut_ad(mrec_end == index->online_log->head.block
2469  + srv_sort_buf_size);
2470  ut_ad(index->online_log->head.bytes
2471  < srv_sort_buf_size);
2472 
2473  /* Take the opportunity to do a redo log
2474  checkpoint if needed. */
2475  log_free_check();
2476  } else {
2477  /* We are applying operations from the last block.
2478  Do not allow other threads to buffer anything,
2479  so that we can finally catch up and synchronize. */
2480  ut_ad(index->online_log->head.blocks == 0);
2481  ut_ad(index->online_log->tail.blocks == 0);
2482  ut_ad(mrec_end == index->online_log->tail.block
2483  + index->online_log->tail.bytes);
2484  ut_ad(mrec >= index->online_log->tail.block);
2485  }
2486 
2487  /* This read is not protected by index->online_log->mutex
2488  for performance reasons. We will eventually notice any
2489  error that was flagged by a DML thread. */
2490  error = index->online_log->error;
2491 
2492  if (error != DB_SUCCESS) {
2493  goto func_exit;
2494  }
2495 
2496  next_mrec = row_log_table_apply_op(
2497  thr, trx_id_col, new_trx_id_col,
2498  dup, &error, offsets_heap, heap,
2499  mrec, mrec_end, offsets);
2500 
2501  if (error != DB_SUCCESS) {
2502  goto func_exit;
2503  } else if (next_mrec == next_mrec_end) {
2504  /* The record happened to end on a block boundary.
2505  Do we have more blocks left? */
2506  if (has_index_lock) {
2507  /* The index will be locked while
2508  applying the last block. */
2509  goto all_done;
2510  }
2511 
2512  mrec = NULL;
2513 process_next_block:
2514  rw_lock_x_lock(dict_index_get_lock(index));
2515  has_index_lock = true;
2516 
2517  index->online_log->head.bytes = 0;
2518  index->online_log->head.blocks++;
2519  goto next_block;
2520  } else if (next_mrec != NULL) {
2521  ut_ad(next_mrec < next_mrec_end);
2522  index->online_log->head.bytes += next_mrec - mrec;
2523  } else if (has_index_lock) {
2524  /* When mrec is within tail.block, it should
2525  be a complete record, because we are holding
2526  index->lock and thus excluding the writer. */
2527  ut_ad(index->online_log->tail.blocks == 0);
2528  ut_ad(mrec_end == index->online_log->tail.block
2529  + index->online_log->tail.bytes);
2530  ut_ad(0);
2531  goto unexpected_eof;
2532  } else {
2533  memcpy(index->online_log->head.buf, mrec,
2534  mrec_end - mrec);
2535  mrec_end += index->online_log->head.buf - mrec;
2536  mrec = index->online_log->head.buf;
2537  goto process_next_block;
2538  }
2539  }
2540 
2541 interrupted:
2542  error = DB_INTERRUPTED;
2543 func_exit:
2544  if (!has_index_lock) {
2545  rw_lock_x_lock(dict_index_get_lock(index));
2546  }
2547 
2548  mem_heap_free(offsets_heap);
2549  mem_heap_free(heap);
2550  ut_free(offsets);
2551  return(error);
2552 }
2553 
2554 /******************************************************/
2557 UNIV_INTERN
2558 dberr_t
2560 /*================*/
2561  que_thr_t* thr,
2562  dict_table_t* old_table,
2564  struct TABLE* table)
2566 {
2567  dberr_t error;
2568  dict_index_t* clust_index;
2569 
2570  thr_get_trx(thr)->error_key_num = 0;
2571 
2572 #ifdef UNIV_SYNC_DEBUG
2573  ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
2574 #endif /* UNIV_SYNC_DEBUG */
2575  clust_index = dict_table_get_first_index(old_table);
2576 
2577  rw_lock_x_lock(dict_index_get_lock(clust_index));
2578 
2579  if (!clust_index->online_log) {
2580  ut_ad(dict_index_get_online_status(clust_index)
2582  /* This function should not be called unless
2583  rebuilding a table online. Build in some fault
2584  tolerance. */
2585  ut_ad(0);
2586  error = DB_ERROR;
2587  } else {
2588  row_merge_dup_t dup = {
2589  clust_index, table,
2590  clust_index->online_log->col_map, 0
2591  };
2592 
2593  error = row_log_table_apply_ops(thr, &dup);
2594 
2595  ut_ad(error != DB_SUCCESS
2596  || clust_index->online_log->head.total
2597  == clust_index->online_log->tail.total);
2598  }
2599 
2600  rw_lock_x_unlock(dict_index_get_lock(clust_index));
2601  return(error);
2602 }
2603 
2604 /******************************************************/
2608 UNIV_INTERN
2609 bool
2611 /*=============*/
2612  dict_index_t* index,
2613  dict_table_t* table,
2615  bool same_pk,
2617  const dtuple_t* add_cols,
2620  const ulint* col_map)
2622 {
2623  byte* buf;
2624  row_log_t* log;
2625  ulint size;
2626  DBUG_ENTER("row_log_allocate");
2627 
2629  ut_ad(dict_index_is_clust(index) == !!table);
2630  ut_ad(!table || index->table != table);
2631  ut_ad(same_pk || table);
2632  ut_ad(!table || col_map);
2633  ut_ad(!add_cols || col_map);
2634 #ifdef UNIV_SYNC_DEBUG
2635  ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2636 #endif /* UNIV_SYNC_DEBUG */
2637  size = 2 * srv_sort_buf_size + sizeof *log;
2638  buf = (byte*) os_mem_alloc_large(&size);
2639  if (!buf) {
2640  DBUG_RETURN(false);
2641  }
2642 
2643  log = (row_log_t*) &buf[2 * srv_sort_buf_size];
2644  log->size = size;
2645  log->fd = row_merge_file_create_low();
2646  if (log->fd < 0) {
2647  os_mem_free_large(buf, size);
2648  DBUG_RETURN(false);
2649  }
2650  mutex_create(index_online_log_key, &log->mutex,
2651  SYNC_INDEX_ONLINE_LOG);
2652  log->blobs = NULL;
2653  log->table = table;
2654  log->same_pk = same_pk;
2655  log->add_cols = add_cols;
2656  log->col_map = col_map;
2657  log->error = DB_SUCCESS;
2658  log->max_trx = 0;
2659  log->head.block = buf;
2660  log->tail.block = buf + srv_sort_buf_size;
2661  log->tail.blocks = log->tail.bytes = 0;
2662  log->tail.total = 0;
2663  log->head.blocks = log->head.bytes = 0;
2664  log->head.total = 0;
2666  index->online_log = log;
2667 
2668  /* While we might be holding an exclusive data dictionary lock
2669  here, in row_log_abort_sec() we will not always be holding it. Use
2670  atomic operations in both cases. */
2671  MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX);
2672 
2673  DBUG_RETURN(true);
2674 }
2675 
2676 /******************************************************/
2678 UNIV_INTERN
2679 void
2680 row_log_free(
2681 /*=========*/
2682  row_log_t*& log)
2683 {
2684  MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
2685 
2686  delete log->blobs;
2688  mutex_free(&log->mutex);
2689  os_mem_free_large(log->head.block, log->size);
2690  log = 0;
2691 }
2692 
2693 /******************************************************/
2697 UNIV_INTERN
2698 trx_id_t
2700 /*================*/
2701  dict_index_t* index)
2702 {
2704 #ifdef UNIV_SYNC_DEBUG
2705  ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED)
2706  && mutex_own(&index->online_log->mutex))
2707  || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2708 #endif /* UNIV_SYNC_DEBUG */
2709  return(index->online_log->max_trx);
2710 }
2711 
2712 /******************************************************/
2714 static __attribute__((nonnull))
2715 void
2716 row_log_apply_op_low(
2717 /*=================*/
2718  dict_index_t* index,
2719  row_merge_dup_t*dup,
2721  dberr_t* error,
2722  mem_heap_t* offsets_heap,
2724  bool has_index_lock,
2726  enum row_op op,
2727  trx_id_t trx_id,
2728  const dtuple_t* entry)
2729 {
2730  mtr_t mtr;
2731  btr_cur_t cursor;
2732  ulint* offsets = NULL;
2733 
2734  ut_ad(!dict_index_is_clust(index));
2735 #ifdef UNIV_SYNC_DEBUG
2736  ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)
2737  == has_index_lock);
2738 #endif /* UNIV_SYNC_DEBUG */
2739  ut_ad(!dict_index_is_corrupted(index));
2740  ut_ad(trx_id != 0 || op == ROW_OP_DELETE);
2741 
2742  mtr_start(&mtr);
2743 
2744  /* We perform the pessimistic variant of the operations if we
2745  already hold index->lock exclusively. First, search the
2746  record. The operation may already have been performed,
2747  depending on when the row in the clustered index was
2748  scanned. */
2749  btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2750  has_index_lock
2751  ? BTR_MODIFY_TREE
2752  : BTR_MODIFY_LEAF,
2753  &cursor, 0, __FILE__, __LINE__,
2754  &mtr);
2755 
2756  ut_ad(dict_index_get_n_unique(index) > 0);
2757  /* This test is somewhat similar to row_ins_must_modify_rec(),
2758  but not identical for unique secondary indexes. */
2759  if (cursor.low_match >= dict_index_get_n_unique(index)
2760  && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) {
2761  /* We have a matching record. */
2762  bool exists = (cursor.low_match
2763  == dict_index_get_n_fields(index));
2764 #ifdef UNIV_DEBUG
2765  rec_t* rec = btr_cur_get_rec(&cursor);
2768 #endif /* UNIV_DEBUG */
2769 
2770  ut_ad(exists || dict_index_is_unique(index));
2771 
2772  switch (op) {
2773  case ROW_OP_DELETE:
2774  if (!exists) {
2775  /* The record was already deleted. */
2776  goto func_exit;
2777  }
2778 
2779  if (btr_cur_optimistic_delete(
2780  &cursor, BTR_CREATE_FLAG, &mtr)) {
2781  *error = DB_SUCCESS;
2782  break;
2783  }
2784 
2785  if (!has_index_lock) {
2786  /* This needs a pessimistic operation.
2787  Lock the index tree exclusively. */
2788  mtr_commit(&mtr);
2789  mtr_start(&mtr);
2791  index, 0, entry, PAGE_CUR_LE,
2792  BTR_MODIFY_TREE, &cursor, 0,
2793  __FILE__, __LINE__, &mtr);
2794 
2795  /* No other thread than the current one
2796  is allowed to modify the index tree.
2797  Thus, the record should still exist. */
2798  ut_ad(cursor.low_match
2799  >= dict_index_get_n_fields(index));
2801  btr_cur_get_rec(&cursor)));
2802  }
2803 
2804  /* As there are no externally stored fields in
2805  a secondary index record, the parameter
2806  rb_ctx = RB_NONE will be ignored. */
2807 
2809  error, FALSE, &cursor,
2810  BTR_CREATE_FLAG, RB_NONE, &mtr);
2811  break;
2812  case ROW_OP_INSERT:
2813  if (exists) {
2814  /* The record already exists. There
2815  is nothing to be inserted. */
2816  goto func_exit;
2817  }
2818 
2819  if (dtuple_contains_null(entry)) {
2820  /* The UNIQUE KEY columns match, but
2821  there is a NULL value in the key, and
2822  NULL!=NULL. */
2823  goto insert_the_rec;
2824  }
2825 
2826  /* Duplicate key error */
2827  ut_ad(dict_index_is_unique(index));
2828  row_merge_dup_report(dup, entry->fields);
2829  goto func_exit;
2830  }
2831  } else {
2832  switch (op) {
2833  rec_t* rec;
2834  big_rec_t* big_rec;
2835  case ROW_OP_DELETE:
2836  /* The record does not exist. */
2837  goto func_exit;
2838  case ROW_OP_INSERT:
2839  if (dict_index_is_unique(index)
2840  && (cursor.up_match
2841  >= dict_index_get_n_unique(index)
2842  || cursor.low_match
2843  >= dict_index_get_n_unique(index))
2844  && (!index->n_nullable
2845  || !dtuple_contains_null(entry))) {
2846  /* Duplicate key */
2847  row_merge_dup_report(dup, entry->fields);
2848  goto func_exit;
2849  }
2850 insert_the_rec:
2851  /* Insert the record. As we are inserting into
2852  a secondary index, there cannot be externally
2853  stored columns (!big_rec). */
2854  *error = btr_cur_optimistic_insert(
2857  | BTR_CREATE_FLAG,
2858  &cursor, &offsets, &offsets_heap,
2859  const_cast<dtuple_t*>(entry),
2860  &rec, &big_rec, 0, NULL, &mtr);
2861  ut_ad(!big_rec);
2862  if (*error != DB_FAIL) {
2863  break;
2864  }
2865 
2866  if (!has_index_lock) {
2867  /* This needs a pessimistic operation.
2868  Lock the index tree exclusively. */
2869  mtr_commit(&mtr);
2870  mtr_start(&mtr);
2872  index, 0, entry, PAGE_CUR_LE,
2873  BTR_MODIFY_TREE, &cursor, 0,
2874  __FILE__, __LINE__, &mtr);
2875  }
2876 
2877  /* We already determined that the
2878  record did not exist. No other thread
2879  than the current one is allowed to
2880  modify the index tree. Thus, the
2881  record should still not exist. */
2882 
2883  *error = btr_cur_pessimistic_insert(
2886  | BTR_CREATE_FLAG,
2887  &cursor, &offsets, &offsets_heap,
2888  const_cast<dtuple_t*>(entry),
2889  &rec, &big_rec,
2890  0, NULL, &mtr);
2891  ut_ad(!big_rec);
2892  break;
2893  }
2894  mem_heap_empty(offsets_heap);
2895  }
2896 
2897  if (*error == DB_SUCCESS && trx_id) {
2898  page_update_max_trx_id(btr_cur_get_block(&cursor),
2899  btr_cur_get_page_zip(&cursor),
2900  trx_id, &mtr);
2901  }
2902 
2903 func_exit:
2904  mtr_commit(&mtr);
2905 }
2906 
2907 /******************************************************/
2911 static __attribute__((nonnull, warn_unused_result))
2912 const mrec_t*
2913 row_log_apply_op(
2914 /*=============*/
2915  dict_index_t* index,
2916  row_merge_dup_t*dup,
2918  dberr_t* error,
2919  mem_heap_t* offsets_heap,
2921  mem_heap_t* heap,
2923  bool has_index_lock,
2925  const mrec_t* mrec,
2926  const mrec_t* mrec_end,
2927  ulint* offsets)
2930 {
2931  enum row_op op;
2932  ulint extra_size;
2933  ulint data_size;
2934  ulint n_ext;
2935  dtuple_t* entry;
2936  trx_id_t trx_id;
2937 
2938  /* Online index creation is only used for secondary indexes. */
2939  ut_ad(!dict_index_is_clust(index));
2940 #ifdef UNIV_SYNC_DEBUG
2941  ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)
2942  == has_index_lock);
2943 #endif /* UNIV_SYNC_DEBUG */
2944 
2945  if (dict_index_is_corrupted(index)) {
2946  *error = DB_INDEX_CORRUPT;
2947  return(NULL);
2948  }
2949 
2950  *error = DB_SUCCESS;
2951 
2952  if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) {
2953  return(NULL);
2954  }
2955 
2956  switch (*mrec) {
2957  case ROW_OP_INSERT:
2958  if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) {
2959  return(NULL);
2960  }
2961 
2962  op = static_cast<enum row_op>(*mrec++);
2963  trx_id = trx_read_trx_id(mrec);
2964  mrec += DATA_TRX_ID_LEN;
2965  break;
2966  case ROW_OP_DELETE:
2967  op = static_cast<enum row_op>(*mrec++);
2968  trx_id = 0;
2969  break;
2970  default:
2971 corrupted:
2972  ut_ad(0);
2973  *error = DB_CORRUPTION;
2974  return(NULL);
2975  }
2976 
2977  extra_size = *mrec++;
2978 
2979  ut_ad(mrec < mrec_end);
2980 
2981  if (extra_size >= 0x80) {
2982  /* Read another byte of extra_size. */
2983 
2984  extra_size = (extra_size & 0x7f) << 8;
2985  extra_size |= *mrec++;
2986  }
2987 
2988  mrec += extra_size;
2989 
2990  if (mrec > mrec_end) {
2991  return(NULL);
2992  }
2993 
2994  rec_init_offsets_temp(mrec, index, offsets);
2995 
2996  if (rec_offs_any_extern(offsets)) {
2997  /* There should never be any externally stored fields
2998  in a secondary index, which is what online index
2999  creation is used for. Therefore, the log file must be
3000  corrupted. */
3001  goto corrupted;
3002  }
3003 
3004  data_size = rec_offs_data_size(offsets);
3005 
3006  mrec += data_size;
3007 
3008  if (mrec > mrec_end) {
3009  return(NULL);
3010  }
3011 
3013  mrec - data_size, index, offsets, &n_ext, heap);
3014  /* Online index creation is only implemented for secondary
3015  indexes, which never contain off-page columns. */
3016  ut_ad(n_ext == 0);
3017 #ifdef ROW_LOG_APPLY_PRINT
3018  if (row_log_apply_print) {
3019  fprintf(stderr, "apply " IB_ID_FMT " " TRX_ID_FMT " %u %u ",
3020  index->id, trx_id,
3021  unsigned (op), unsigned (has_index_lock));
3022  for (const byte* m = mrec - data_size; m < mrec; m++) {
3023  fprintf(stderr, "%02x", *m);
3024  }
3025  putc('\n', stderr);
3026  }
3027 #endif /* ROW_LOG_APPLY_PRINT */
3028  row_log_apply_op_low(index, dup, error, offsets_heap,
3029  has_index_lock, op, trx_id, entry);
3030  return(mrec);
3031 }
3032 
3033 /******************************************************/
3036 static __attribute__((nonnull))
3037 dberr_t
3038 row_log_apply_ops(
3039 /*==============*/
3040  trx_t* trx,
3042  dict_index_t* index,
3043  row_merge_dup_t*dup)
3045 {
3046  dberr_t error;
3047  const mrec_t* mrec = NULL;
3048  const mrec_t* next_mrec;
3049  const mrec_t* mrec_end= NULL; /* silence bogus warning */
3050  const mrec_t* next_mrec_end;
3052  mem_heap_t* heap;
3053  ulint* offsets;
3054  bool has_index_lock;
3055  const ulint i = 1 + REC_OFFS_HEADER_SIZE
3056  + dict_index_get_n_fields(index);
3057 
3059  ut_ad(*index->name == TEMP_INDEX_PREFIX);
3060 #ifdef UNIV_SYNC_DEBUG
3061  ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
3062 #endif /* UNIV_SYNC_DEBUG */
3063  ut_ad(index->online_log);
3064  UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
3065 
3066  offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
3067  offsets[0] = i;
3068  offsets[1] = dict_index_get_n_fields(index);
3069 
3070  offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
3071  heap = mem_heap_create(UNIV_PAGE_SIZE);
3072  has_index_lock = true;
3073 
3074 next_block:
3075  ut_ad(has_index_lock);
3076 #ifdef UNIV_SYNC_DEBUG
3077  ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
3078 #endif /* UNIV_SYNC_DEBUG */
3079  ut_ad(index->online_log->head.bytes == 0);
3080 
3081  if (trx_is_interrupted(trx)) {
3082  goto interrupted;
3083  }
3084 
3085  if (dict_index_is_corrupted(index)) {
3086  error = DB_INDEX_CORRUPT;
3087  goto func_exit;
3088  }
3089 
3090  if (UNIV_UNLIKELY(index->online_log->head.blocks
3091  > index->online_log->tail.blocks)) {
3092 unexpected_eof:
3093  fprintf(stderr, "InnoDB: unexpected end of temporary file"
3094  " for index %s\n", index->name + 1);
3095 corruption:
3096  error = DB_CORRUPTION;
3097  goto func_exit;
3098  }
3099 
3100  if (index->online_log->head.blocks
3101  == index->online_log->tail.blocks) {
3102  if (index->online_log->head.blocks) {
3103 #ifdef HAVE_FTRUNCATE
3104  /* Truncate the file in order to save space. */
3105  ftruncate(index->online_log->fd, 0);
3106 #endif /* HAVE_FTRUNCATE */
3107  index->online_log->head.blocks
3108  = index->online_log->tail.blocks = 0;
3109  }
3110 
3111  next_mrec = index->online_log->tail.block;
3112  next_mrec_end = next_mrec + index->online_log->tail.bytes;
3113 
3114  if (next_mrec_end == next_mrec) {
3115  /* End of log reached. */
3116 all_done:
3117  ut_ad(has_index_lock);
3118  ut_ad(index->online_log->head.blocks == 0);
3119  ut_ad(index->online_log->tail.blocks == 0);
3120  error = DB_SUCCESS;
3121  goto func_exit;
3122  }
3123  } else {
3124  os_offset_t ofs;
3125  ibool success;
3126 
3127  ofs = (os_offset_t) index->online_log->head.blocks
3129 
3130  ut_ad(has_index_lock);
3131  has_index_lock = false;
3132  rw_lock_x_unlock(dict_index_get_lock(index));
3133 
3134  log_free_check();
3135 
3136  success = os_file_read_no_error_handling(
3137  OS_FILE_FROM_FD(index->online_log->fd),
3138  index->online_log->head.block, ofs,
3140 
3141  if (!success) {
3142  fprintf(stderr, "InnoDB: unable to read temporary file"
3143  " for index %s\n", index->name + 1);
3144  goto corruption;
3145  }
3146 
3147 #ifdef POSIX_FADV_DONTNEED
3148  /* Each block is read exactly once. Free up the file cache. */
3149  posix_fadvise(index->online_log->fd,
3150  ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
3151 #endif /* POSIX_FADV_DONTNEED */
3152 #if 0 //def FALLOC_FL_PUNCH_HOLE
3153  /* Try to deallocate the space for the file on disk.
3154  This should work on ext4 on Linux 2.6.39 and later,
3155  and be ignored when the operation is unsupported. */
3156  fallocate(index->online_log->fd,
3157  FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
3158  ofs, srv_buf_size);
3159 #endif /* FALLOC_FL_PUNCH_HOLE */
3160 
3161  next_mrec = index->online_log->head.block;
3162  next_mrec_end = next_mrec + srv_sort_buf_size;
3163  }
3164 
3165  if (mrec) {
3166  /* A partial record was read from the previous block.
3167  Copy the temporary buffer full, as we do not know the
3168  length of the record. Parse subsequent records from
3169  the bigger buffer index->online_log->head.block
3170  or index->online_log->tail.block. */
3171 
3172  ut_ad(mrec == index->online_log->head.buf);
3173  ut_ad(mrec_end > mrec);
3174  ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
3175 
3176  memcpy((mrec_t*) mrec_end, next_mrec,
3177  (&index->online_log->head.buf)[1] - mrec_end);
3178  mrec = row_log_apply_op(
3179  index, dup, &error, offsets_heap, heap,
3180  has_index_lock, index->online_log->head.buf,
3181  (&index->online_log->head.buf)[1], offsets);
3182  if (error != DB_SUCCESS) {
3183  goto func_exit;
3184  } else if (UNIV_UNLIKELY(mrec == NULL)) {
3185  /* The record was not reassembled properly. */
3186  goto corruption;
3187  }
3188  /* The record was previously found out to be
3189  truncated. Now that the parse buffer was extended,
3190  it should proceed beyond the old end of the buffer. */
3191  ut_a(mrec > mrec_end);
3192 
3193  index->online_log->head.bytes = mrec - mrec_end;
3194  next_mrec += index->online_log->head.bytes;
3195  }
3196 
3197  ut_ad(next_mrec <= next_mrec_end);
3198  /* The following loop must not be parsing the temporary
3199  buffer, but head.block or tail.block. */
3200 
3201  /* mrec!=NULL means that the next record starts from the
3202  middle of the block */
3203  ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
3204 
3205 #ifdef UNIV_DEBUG
3206  if (next_mrec_end == index->online_log->head.block
3207  + srv_sort_buf_size) {
3208  /* If tail.bytes == 0, next_mrec_end can also be at
3209  the end of tail.block. */
3210  if (index->online_log->tail.bytes == 0) {
3211  ut_ad(next_mrec == next_mrec_end);
3212  ut_ad(index->online_log->tail.blocks == 0);
3213  ut_ad(index->online_log->head.blocks == 0);
3214  ut_ad(index->online_log->head.bytes == 0);
3215  } else {
3216  ut_ad(next_mrec == index->online_log->head.block
3217  + index->online_log->head.bytes);
3218  ut_ad(index->online_log->tail.blocks
3219  > index->online_log->head.blocks);
3220  }
3221  } else if (next_mrec_end == index->online_log->tail.block
3222  + index->online_log->tail.bytes) {
3223  ut_ad(next_mrec == index->online_log->tail.block
3224  + index->online_log->head.bytes);
3225  ut_ad(index->online_log->tail.blocks == 0);
3226  ut_ad(index->online_log->head.blocks == 0);
3227  ut_ad(index->online_log->head.bytes
3228  <= index->online_log->tail.bytes);
3229  } else {
3230  ut_error;
3231  }
3232 #endif /* UNIV_DEBUG */
3233 
3234  mrec_end = next_mrec_end;
3235 
3236  while (!trx_is_interrupted(trx)) {
3237  mrec = next_mrec;
3238  ut_ad(mrec < mrec_end);
3239 
3240  if (!has_index_lock) {
3241  /* We are applying operations from a different
3242  block than the one that is being written to.
3243  We do not hold index->lock in order to
3244  allow other threads to concurrently buffer
3245  modifications. */
3246  ut_ad(mrec >= index->online_log->head.block);
3247  ut_ad(mrec_end == index->online_log->head.block
3248  + srv_sort_buf_size);
3249  ut_ad(index->online_log->head.bytes
3250  < srv_sort_buf_size);
3251 
3252  /* Take the opportunity to do a redo log
3253  checkpoint if needed. */
3254  log_free_check();
3255  } else {
3256  /* We are applying operations from the last block.
3257  Do not allow other threads to buffer anything,
3258  so that we can finally catch up and synchronize. */
3259  ut_ad(index->online_log->head.blocks == 0);
3260  ut_ad(index->online_log->tail.blocks == 0);
3261  ut_ad(mrec_end == index->online_log->tail.block
3262  + index->online_log->tail.bytes);
3263  ut_ad(mrec >= index->online_log->tail.block);
3264  }
3265 
3266  next_mrec = row_log_apply_op(
3267  index, dup, &error, offsets_heap, heap,
3268  has_index_lock, mrec, mrec_end, offsets);
3269 
3270  if (error != DB_SUCCESS) {
3271  goto func_exit;
3272  } else if (next_mrec == next_mrec_end) {
3273  /* The record happened to end on a block boundary.
3274  Do we have more blocks left? */
3275  if (has_index_lock) {
3276  /* The index will be locked while
3277  applying the last block. */
3278  goto all_done;
3279  }
3280 
3281  mrec = NULL;
3282 process_next_block:
3283  rw_lock_x_lock(dict_index_get_lock(index));
3284  has_index_lock = true;
3285 
3286  index->online_log->head.bytes = 0;
3287  index->online_log->head.blocks++;
3288  goto next_block;
3289  } else if (next_mrec != NULL) {
3290  ut_ad(next_mrec < next_mrec_end);
3291  index->online_log->head.bytes += next_mrec - mrec;
3292  } else if (has_index_lock) {
3293  /* When mrec is within tail.block, it should
3294  be a complete record, because we are holding
3295  index->lock and thus excluding the writer. */
3296  ut_ad(index->online_log->tail.blocks == 0);
3297  ut_ad(mrec_end == index->online_log->tail.block
3298  + index->online_log->tail.bytes);
3299  ut_ad(0);
3300  goto unexpected_eof;
3301  } else {
3302  memcpy(index->online_log->head.buf, mrec,
3303  mrec_end - mrec);
3304  mrec_end += index->online_log->head.buf - mrec;
3305  mrec = index->online_log->head.buf;
3306  goto process_next_block;
3307  }
3308  }
3309 
3310 interrupted:
3311  error = DB_INTERRUPTED;
3312 func_exit:
3313  if (!has_index_lock) {
3314  rw_lock_x_lock(dict_index_get_lock(index));
3315  }
3316 
3317  switch (error) {
3318  case DB_SUCCESS:
3319  break;
3320  case DB_INDEX_CORRUPT:
3321  if (((os_offset_t) index->online_log->tail.blocks + 1)
3323  /* The log file grew too big. */
3324  error = DB_ONLINE_LOG_TOO_BIG;
3325  }
3326  /* fall through */
3327  default:
3328  /* We set the flag directly instead of invoking
3329  dict_set_corrupted_index_cache_only(index) here,
3330  because the index is not "public" yet. */
3331  index->type |= DICT_CORRUPT;
3332  }
3333 
3334  mem_heap_free(heap);
3335  mem_heap_free(offsets_heap);
3336  ut_free(offsets);
3337  return(error);
3338 }
3339 
3340 /******************************************************/
3343 UNIV_INTERN
3344 dberr_t
3346 /*==========*/
3347  trx_t* trx,
3349  dict_index_t* index,
3350  struct TABLE* table)
3352 {
3353  dberr_t error;
3354  row_log_t* log;
3355  row_merge_dup_t dup = { index, table, NULL, 0 };
3356  DBUG_ENTER("row_log_apply");
3357 
3359  ut_ad(!dict_index_is_clust(index));
3360 
3361  log_free_check();
3362 
3363  rw_lock_x_lock(dict_index_get_lock(index));
3364 
3365  if (!dict_table_is_corrupted(index->table)) {
3366  error = row_log_apply_ops(trx, index, &dup);
3367  } else {
3368  error = DB_SUCCESS;
3369  }
3370 
3371  if (error != DB_SUCCESS || dup.n_dup) {
3373  /* We set the flag directly instead of invoking
3374  dict_set_corrupted_index_cache_only(index) here,
3375  because the index is not "public" yet. */
3376  index->type |= DICT_CORRUPT;
3377  index->table->drop_aborted = TRUE;
3378 
3379  if (error == DB_SUCCESS) {
3380  error = DB_DUPLICATE_KEY;
3381  }
3382 
3384  } else {
3386  }
3387 
3388  log = index->online_log;
3389  index->online_log = NULL;
3390  /* We could remove the TEMP_INDEX_PREFIX and update the data
3391  dictionary to say that this index is complete, if we had
3392  access to the .frm file here. If the server crashes before
3393  all requested indexes have been created, this completed index
3394  will be dropped. */
3395  rw_lock_x_unlock(dict_index_get_lock(index));
3396 
3397  row_log_free(log);
3398 
3399  DBUG_RETURN(error);
3400 }