MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
row0ins.cc
Go to the documentation of this file.
1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2013, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 
19 /**************************************************/
26 #include "row0ins.h"
27 
28 #ifdef UNIV_NONINL
29 #include "row0ins.ic"
30 #endif
31 
32 #include "ha_prototypes.h"
33 #include "dict0dict.h"
34 #include "dict0boot.h"
35 #include "trx0rec.h"
36 #include "trx0undo.h"
37 #include "btr0btr.h"
38 #include "btr0cur.h"
39 #include "mach0data.h"
40 #include "que0que.h"
41 #include "row0upd.h"
42 #include "row0sel.h"
43 #include "row0row.h"
44 #include "row0log.h"
45 #include "rem0cmp.h"
46 #include "lock0lock.h"
47 #include "log0log.h"
48 #include "eval0eval.h"
49 #include "data0data.h"
50 #include "usr0sess.h"
51 #include "buf0lru.h"
52 #include "fts0fts.h"
53 #include "fts0types.h"
54 #include "m_string.h"
55 
56 /*************************************************************************
57 IMPORTANT NOTE: Any operation that generates redo MUST check that there
58 is enough space in the redo log before for that operation. This is
59 done by calling log_free_check(). The reason for checking the
60 availability of the redo log space before the start of the operation is
61 that we MUST not hold any synchonization objects when performing the
62 check.
63 If you make a change in this module make sure that no codepath is
64 introduced where a call to log_free_check() is bypassed. */
65 
66 /*********************************************************************/
69 UNIV_INTERN
72 /*============*/
73  ulint ins_type,
75  mem_heap_t* heap)
76 {
77  ins_node_t* node;
78 
79  node = static_cast<ins_node_t*>(
80  mem_heap_alloc(heap, sizeof(ins_node_t)));
81 
82  node->common.type = QUE_NODE_INSERT;
83 
84  node->ins_type = ins_type;
85 
86  node->state = INS_NODE_SET_IX_LOCK;
87  node->table = table;
88  node->index = NULL;
89  node->entry = NULL;
90 
91  node->select = NULL;
92 
93  node->trx_id = 0;
94 
95  node->entry_sys_heap = mem_heap_create(128);
96 
97  node->magic_n = INS_NODE_MAGIC_N;
98 
99  return(node);
100 }
101 
102 /***********************************************************/
104 static
105 void
106 ins_node_create_entry_list(
107 /*=======================*/
108  ins_node_t* node)
109 {
111  dtuple_t* entry;
112 
113  ut_ad(node->entry_sys_heap);
114 
115  UT_LIST_INIT(node->entry_list);
116 
117  /* We will include all indexes (include those corrupted
118  secondary indexes) in the entry list. Filteration of
119  these corrupted index will be done in row_ins() */
120 
121  for (index = dict_table_get_first_index(node->table);
122  index != 0;
123  index = dict_table_get_next_index(index)) {
124 
125  entry = row_build_index_entry(
126  node->row, NULL, index, node->entry_sys_heap);
127 
128  UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
129  }
130 }
131 
132 /*****************************************************************/
134 static
135 void
136 row_ins_alloc_sys_fields(
137 /*=====================*/
138  ins_node_t* node)
139 {
140  dtuple_t* row;
142  mem_heap_t* heap;
143  const dict_col_t* col;
144  dfield_t* dfield;
145  byte* ptr;
146 
147  row = node->row;
148  table = node->table;
149  heap = node->entry_sys_heap;
150 
151  ut_ad(row && table && heap);
153 
154  /* 1. Allocate buffer for row id */
155 
156  col = dict_table_get_sys_col(table, DATA_ROW_ID);
157 
158  dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
159 
160  ptr = static_cast<byte*>(mem_heap_zalloc(heap, DATA_ROW_ID_LEN));
161 
162  dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
163 
164  node->row_id_buf = ptr;
165 
166  /* 3. Allocate buffer for trx id */
167 
168  col = dict_table_get_sys_col(table, DATA_TRX_ID);
169 
170  dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
171  ptr = static_cast<byte*>(mem_heap_zalloc(heap, DATA_TRX_ID_LEN));
172 
173  dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
174 
175  node->trx_id_buf = ptr;
176 
177  /* 4. Allocate buffer for roll ptr */
178 
179  col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
180 
181  dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
182  ptr = static_cast<byte*>(mem_heap_zalloc(heap, DATA_ROLL_PTR_LEN));
183 
184  dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
185 }
186 
187 /*********************************************************************/
191 UNIV_INTERN
192 void
194 /*=================*/
195  ins_node_t* node,
196  dtuple_t* row)
197 {
198  node->state = INS_NODE_SET_IX_LOCK;
199  node->index = NULL;
200  node->entry = NULL;
201 
202  node->row = row;
203 
204  mem_heap_empty(node->entry_sys_heap);
205 
206  /* Create templates for index entries */
207 
208  ins_node_create_entry_list(node);
209 
210  /* Allocate from entry_sys_heap buffers for sys fields */
211 
212  row_ins_alloc_sys_fields(node);
213 
214  /* As we allocated a new trx id buf, the trx id should be written
215  there again: */
216 
217  node->trx_id = 0;
218 }
219 
220 /*******************************************************************/
225 static __attribute__((nonnull, warn_unused_result))
226 dberr_t
227 row_ins_sec_index_entry_by_modify(
228 /*==============================*/
229  ulint flags,
230  ulint mode,
233  btr_cur_t* cursor,
234  ulint** offsets,
237  mem_heap_t* heap,
238  const dtuple_t* entry,
239  que_thr_t* thr,
240  mtr_t* mtr)
242 {
243  big_rec_t* dummy_big_rec;
244  upd_t* update;
245  rec_t* rec;
246  dberr_t err;
247 
248  rec = btr_cur_get_rec(cursor);
249 
250  ut_ad(!dict_index_is_clust(cursor->index));
251  ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
252  ut_ad(!entry->info_bits);
253 
254  /* We know that in the alphabetical ordering, entry and rec are
255  identified. But in their binary form there may be differences if
256  there are char fields in them. Therefore we have to calculate the
257  difference. */
258 
260  rec, cursor->index, *offsets, entry, heap);
261 
262  if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
263  /* We should never insert in place of a record that
264  has not been delete-marked. The only exception is when
265  online CREATE INDEX copied the changes that we already
266  made to the clustered index, and completed the
267  secondary index creation before we got here. In this
268  case, the change would already be there. The CREATE
269  INDEX should be waiting for a MySQL meta-data lock
270  upgrade at least until this INSERT or UPDATE
271  returns. After that point, the TEMP_INDEX_PREFIX
272  would be dropped from the index name in
273  commit_inplace_alter_table(). */
274  ut_a(update->n_fields == 0);
275  ut_a(*cursor->index->name == TEMP_INDEX_PREFIX);
276  ut_ad(!dict_index_is_online_ddl(cursor->index));
277  return(DB_SUCCESS);
278  }
279 
280  if (mode == BTR_MODIFY_LEAF) {
281  /* Try an optimistic updating of the record, keeping changes
282  within the page */
283 
284  /* TODO: pass only *offsets */
286  flags | BTR_KEEP_SYS_FLAG, cursor,
287  offsets, &offsets_heap, update, 0, thr,
288  thr_get_trx(thr)->id, mtr);
289  switch (err) {
290  case DB_OVERFLOW:
291  case DB_UNDERFLOW:
292  case DB_ZIP_OVERFLOW:
293  err = DB_FAIL;
294  default:
295  break;
296  }
297  } else {
298  ut_a(mode == BTR_MODIFY_TREE);
300 
301  return(DB_LOCK_TABLE_FULL);
302  }
303 
305  flags | BTR_KEEP_SYS_FLAG, cursor,
306  offsets, &offsets_heap,
307  heap, &dummy_big_rec, update, 0,
308  thr, thr_get_trx(thr)->id, mtr);
309  ut_ad(!dummy_big_rec);
310  }
311 
312  return(err);
313 }
314 
315 /*******************************************************************/
320 static __attribute__((nonnull, warn_unused_result))
321 dberr_t
322 row_ins_clust_index_entry_by_modify(
323 /*================================*/
324  ulint flags,
325  ulint mode,
328  btr_cur_t* cursor,
329  ulint** offsets,
330  mem_heap_t** offsets_heap,
333  mem_heap_t* heap,
334  big_rec_t** big_rec,
337  const dtuple_t* entry,
338  que_thr_t* thr,
339  mtr_t* mtr)
341 {
342  const rec_t* rec;
343  const upd_t* update;
344  dberr_t err;
345 
346  ut_ad(dict_index_is_clust(cursor->index));
347 
348  *big_rec = NULL;
349 
350  rec = btr_cur_get_rec(cursor);
351 
353  dict_table_is_comp(cursor->index->table)));
354 
355  /* Build an update vector containing all the fields to be modified;
356  NOTE that this vector may NOT contain system columns trx_id or
357  roll_ptr */
358 
360  cursor->index, entry, rec, NULL, true,
361  thr_get_trx(thr), heap);
362  if (mode != BTR_MODIFY_TREE) {
364 
365  /* Try optimistic updating of the record, keeping changes
366  within the page */
367 
369  flags, cursor, offsets, offsets_heap, update, 0, thr,
370  thr_get_trx(thr)->id, mtr);
371  switch (err) {
372  case DB_OVERFLOW:
373  case DB_UNDERFLOW:
374  case DB_ZIP_OVERFLOW:
375  err = DB_FAIL;
376  default:
377  break;
378  }
379  } else {
381 
382  return(DB_LOCK_TABLE_FULL);
383 
384  }
386  flags | BTR_KEEP_POS_FLAG,
387  cursor, offsets, offsets_heap, heap,
388  big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
389  }
390 
391  return(err);
392 }
393 
394 /*********************************************************************/
398 static
399 ibool
400 row_ins_cascade_ancestor_updates_table(
401 /*===================================*/
402  que_node_t* node,
403  dict_table_t* table)
404 {
405  que_node_t* parent;
406 
407  for (parent = que_node_get_parent(node);
408  que_node_get_type(parent) == QUE_NODE_UPDATE;
409  parent = que_node_get_parent(parent)) {
410 
411  upd_node_t* upd_node;
412 
413  upd_node = static_cast<upd_node_t*>(parent);
414 
415  if (upd_node->table == table && upd_node->is_delete == FALSE) {
416 
417  return(TRUE);
418  }
419  }
420 
421  return(FALSE);
422 }
423 
424 /*********************************************************************/
428 static __attribute__((nonnull, warn_unused_result))
429 ulint
430 row_ins_cascade_n_ancestors(
431 /*========================*/
432  que_node_t* node)
433 {
434  que_node_t* parent;
435  ulint n_ancestors = 0;
436 
437  for (parent = que_node_get_parent(node);
438  que_node_get_type(parent) == QUE_NODE_UPDATE;
439  parent = que_node_get_parent(parent)) {
440 
441  n_ancestors++;
442  }
443 
444  return(n_ancestors);
445 }
446 
447 /******************************************************************/
454 static __attribute__((nonnull, warn_unused_result))
455 ulint
456 row_ins_cascade_calc_update_vec(
457 /*============================*/
458  upd_node_t* node,
460  dict_foreign_t* foreign,
462  mem_heap_t* heap,
464  trx_t* trx,
465  ibool* fts_col_affected)
466 {
467  upd_node_t* cascade = node->cascade_node;
468  dict_table_t* table = foreign->foreign_table;
469  dict_index_t* index = foreign->foreign_index;
470  upd_t* update;
471  dict_table_t* parent_table;
472  dict_index_t* parent_index;
473  upd_t* parent_update;
474  ulint n_fields_updated;
475  ulint parent_field_no;
476  ulint i;
477  ulint j;
478  ibool doc_id_updated = FALSE;
479  ulint doc_id_pos = 0;
480  doc_id_t new_doc_id = FTS_NULL_DOC_ID;
481 
482  ut_a(node);
483  ut_a(foreign);
484  ut_a(cascade);
485  ut_a(table);
486  ut_a(index);
487 
488  /* Calculate the appropriate update vector which will set the fields
489  in the child index record to the same value (possibly padded with
490  spaces if the column is a fixed length CHAR or FIXBINARY column) as
491  the referenced index record will get in the update. */
492 
493  parent_table = node->table;
494  ut_a(parent_table == foreign->referenced_table);
495  parent_index = foreign->referenced_index;
496  parent_update = node->update;
497 
498  update = cascade->update;
499 
500  update->info_bits = 0;
501  update->n_fields = foreign->n_fields;
502 
503  n_fields_updated = 0;
504 
505  *fts_col_affected = FALSE;
506 
507  if (table->fts) {
508  doc_id_pos = dict_table_get_nth_col_pos(
509  table, table->fts->doc_col);
510  }
511 
512  for (i = 0; i < foreign->n_fields; i++) {
513 
514  parent_field_no = dict_table_get_nth_col_pos(
515  parent_table,
516  dict_index_get_nth_col_no(parent_index, i));
517 
518  for (j = 0; j < parent_update->n_fields; j++) {
519  const upd_field_t* parent_ufield
520  = &parent_update->fields[j];
521 
522  if (parent_ufield->field_no == parent_field_no) {
523 
524  ulint min_size;
525  const dict_col_t* col;
526  ulint ufield_len;
527  upd_field_t* ufield;
528 
529  col = dict_index_get_nth_col(index, i);
530 
531  /* A field in the parent index record is
532  updated. Let us make the update vector
533  field for the child table. */
534 
535  ufield = update->fields + n_fields_updated;
536 
537  ufield->field_no
539  table, dict_col_get_no(col));
540 
541  ufield->orig_len = 0;
542  ufield->exp = NULL;
543 
544  ufield->new_val = parent_ufield->new_val;
545  ufield_len = dfield_get_len(&ufield->new_val);
546 
547  /* Clear the "external storage" flag */
548  dfield_set_len(&ufield->new_val, ufield_len);
549 
550  /* Do not allow a NOT NULL column to be
551  updated as NULL */
552 
553  if (dfield_is_null(&ufield->new_val)
554  && (col->prtype & DATA_NOT_NULL)) {
555 
556  return(ULINT_UNDEFINED);
557  }
558 
559  /* If the new value would not fit in the
560  column, do not allow the update */
561 
562  if (!dfield_is_null(&ufield->new_val)
564  col->prtype, col->mbminmaxlen,
565  col->len,
566  ufield_len,
567  static_cast<char*>(
568  dfield_get_data(
569  &ufield->new_val)))
570  < ufield_len) {
571 
572  return(ULINT_UNDEFINED);
573  }
574 
575  /* If the parent column type has a different
576  length than the child column type, we may
577  need to pad with spaces the new value of the
578  child column */
579 
580  min_size = dict_col_get_min_size(col);
581 
582  /* Because UNIV_SQL_NULL (the marker
583  of SQL NULL values) exceeds all possible
584  values of min_size, the test below will
585  not hold for SQL NULL columns. */
586 
587  if (min_size > ufield_len) {
588 
589  byte* pad;
590  ulint pad_len;
591  byte* padded_data;
592  ulint mbminlen;
593 
594  padded_data = static_cast<byte*>(
596  heap, min_size));
597 
598  pad = padded_data + ufield_len;
599  pad_len = min_size - ufield_len;
600 
601  memcpy(padded_data,
602  dfield_get_data(&ufield
603  ->new_val),
604  ufield_len);
605 
606  mbminlen = dict_col_get_mbminlen(col);
607 
608  ut_ad(!(ufield_len % mbminlen));
609  ut_ad(!(min_size % mbminlen));
610 
611  if (mbminlen == 1
613  col->prtype)
614  == DATA_MYSQL_BINARY_CHARSET_COLL) {
615  /* Do not pad BINARY columns */
616  return(ULINT_UNDEFINED);
617  }
618 
619  row_mysql_pad_col(mbminlen,
620  pad, pad_len);
621  dfield_set_data(&ufield->new_val,
622  padded_data, min_size);
623  }
624 
625  /* Check whether the current column has
626  FTS index on it */
627  if (table->fts
628  && dict_table_is_fts_column(
629  table->fts->indexes,
630  dict_col_get_no(col))
631  != ULINT_UNDEFINED) {
632  *fts_col_affected = TRUE;
633  }
634 
635  /* If Doc ID is updated, check whether the
636  Doc ID is valid */
637  if (table->fts
638  && ufield->field_no == doc_id_pos) {
639  doc_id_t n_doc_id;
640 
641  n_doc_id =
642  table->fts->cache->next_doc_id;
643 
644  new_doc_id = fts_read_doc_id(
645  static_cast<const byte*>(
646  dfield_get_data(
647  &ufield->new_val)));
648 
649  if (new_doc_id <= 0) {
650  fprintf(stderr,
651  "InnoDB: FTS Doc ID "
652  "must be larger than "
653  "0 \n");
654  return(ULINT_UNDEFINED);
655  }
656 
657  if (new_doc_id < n_doc_id) {
658  fprintf(stderr,
659  "InnoDB: FTS Doc ID "
660  "must be larger than "
661  IB_ID_FMT" for table",
662  n_doc_id -1);
663 
664  ut_print_name(stderr, trx,
665  TRUE,
666  table->name);
667 
668  putc('\n', stderr);
669  return(ULINT_UNDEFINED);
670  }
671 
672  *fts_col_affected = TRUE;
673  doc_id_updated = TRUE;
674  }
675 
676  n_fields_updated++;
677  }
678  }
679  }
680 
681  /* Generate a new Doc ID if FTS index columns get updated */
682  if (table->fts && *fts_col_affected) {
683  if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
684  doc_id_t doc_id;
685  upd_field_t* ufield;
686 
687  ut_ad(!doc_id_updated);
688  ufield = update->fields + n_fields_updated;
689  fts_get_next_doc_id(table, &trx->fts_next_doc_id);
690  doc_id = fts_update_doc_id(table, ufield,
691  &trx->fts_next_doc_id);
692  n_fields_updated++;
693  fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
694  } else {
695  if (doc_id_updated) {
696  ut_ad(new_doc_id);
697  fts_trx_add_op(trx, table, new_doc_id,
698  FTS_INSERT, NULL);
699  } else {
700  fprintf(stderr, "InnoDB: FTS Doc ID must be "
701  "updated along with FTS indexed "
702  "column for table ");
703  ut_print_name(stderr, trx, TRUE, table->name);
704  putc('\n', stderr);
705  return(ULINT_UNDEFINED);
706  }
707  }
708  }
709 
710  update->n_fields = n_fields_updated;
711 
712  return(n_fields_updated);
713 }
714 
715 /*********************************************************************/
718 static
719 void
720 row_ins_set_detailed(
721 /*=================*/
722  trx_t* trx,
723  dict_foreign_t* foreign)
724 {
726 
727  mutex_enter(&srv_misc_tmpfile_mutex);
728  rewind(srv_misc_tmpfile);
729 
731  ut_print_name(srv_misc_tmpfile, trx, TRUE,
732  foreign->foreign_table_name);
734  srv_misc_tmpfile, trx, foreign, FALSE);
736  } else {
737  trx_set_detailed_error(trx, "temp file operation failed");
738  }
739 
740  mutex_exit(&srv_misc_tmpfile_mutex);
741 }
742 
743 /*********************************************************************/
747 static
748 void
749 row_ins_foreign_trx_print(
750 /*======================*/
751  trx_t* trx)
752 {
753  ulint n_rec_locks;
754  ulint n_trx_locks;
755  ulint heap_size;
756 
757  if (srv_read_only_mode) {
758  return;
759  }
760 
762  n_rec_locks = lock_number_of_rows_locked(&trx->lock);
763  n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
764  heap_size = mem_heap_get_size(trx->lock.lock_heap);
765  lock_mutex_exit();
766 
767  mutex_enter(&trx_sys->mutex);
768 
769  mutex_enter(&dict_foreign_err_mutex);
770  rewind(dict_foreign_err_file);
771  ut_print_timestamp(dict_foreign_err_file);
772  fputs(" Transaction:\n", dict_foreign_err_file);
773 
774  trx_print_low(dict_foreign_err_file, trx, 600,
775  n_rec_locks, n_trx_locks, heap_size);
776 
777  mutex_exit(&trx_sys->mutex);
778 
779  ut_ad(mutex_own(&dict_foreign_err_mutex));
780 }
781 
782 /*********************************************************************/
785 static
786 void
787 row_ins_foreign_report_err(
788 /*=======================*/
789  const char* errstr,
791  que_thr_t* thr,
793  dict_foreign_t* foreign,
794  const rec_t* rec,
796  const dtuple_t* entry)
798 {
799  if (srv_read_only_mode) {
800  return;
801  }
802 
803  FILE* ef = dict_foreign_err_file;
804  trx_t* trx = thr_get_trx(thr);
805 
806  row_ins_set_detailed(trx, foreign);
807 
808  row_ins_foreign_trx_print(trx);
809 
810  fputs("Foreign key constraint fails for table ", ef);
811  ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
812  fputs(":\n", ef);
814  TRUE);
815  putc('\n', ef);
816  fputs(errstr, ef);
817  fputs(" in parent table, in index ", ef);
818  ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
819  if (entry) {
820  fputs(" tuple:\n", ef);
821  dtuple_print(ef, entry);
822  }
823  fputs("\nBut in child table ", ef);
824  ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
825  fputs(", in index ", ef);
826  ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
827  if (rec) {
828  fputs(", there is a record:\n", ef);
829  rec_print(ef, rec, foreign->foreign_index);
830  } else {
831  fputs(", the record is not available\n", ef);
832  }
833  putc('\n', ef);
834 
835  mutex_exit(&dict_foreign_err_mutex);
836 }
837 
838 /*********************************************************************/
842 static
843 void
844 row_ins_foreign_report_add_err(
845 /*===========================*/
846  trx_t* trx,
847  dict_foreign_t* foreign,
848  const rec_t* rec,
851  const dtuple_t* entry)
853 {
854  if (srv_read_only_mode) {
855  return;
856  }
857 
858  FILE* ef = dict_foreign_err_file;
859 
860  row_ins_set_detailed(trx, foreign);
861 
862  row_ins_foreign_trx_print(trx);
863 
864  fputs("Foreign key constraint fails for table ", ef);
865  ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
866  fputs(":\n", ef);
868  TRUE);
869  fputs("\nTrying to add in child table, in index ", ef);
870  ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
871  if (entry) {
872  fputs(" tuple:\n", ef);
873  /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
874  It would be better to only display the user columns. */
875  dtuple_print(ef, entry);
876  }
877  fputs("\nBut in parent table ", ef);
878  ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
879  fputs(", in index ", ef);
880  ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
881  fputs(",\nthe closest match we can find is record:\n", ef);
882  if (rec && page_rec_is_supremum(rec)) {
883  /* If the cursor ended on a supremum record, it is better
884  to report the previous record in the error message, so that
885  the user gets a more descriptive error message. */
886  rec = page_rec_get_prev_const(rec);
887  }
888 
889  if (rec) {
890  rec_print(ef, rec, foreign->referenced_index);
891  }
892  putc('\n', ef);
893 
894  mutex_exit(&dict_foreign_err_mutex);
895 }
896 
897 /*********************************************************************/
899 static
900 void
901 row_ins_invalidate_query_cache(
902 /*===========================*/
903  que_thr_t* thr,
905  const char* name)
907 {
908  char* buf;
909  char* ptr;
910  ulint len = strlen(name) + 1;
911 
912  buf = mem_strdupl(name, len);
913 
914  ptr = strchr(buf, '/');
915  ut_a(ptr);
916  *ptr = '\0';
917 
919  mem_free(buf);
920 }
921 
922 /*********************************************************************/
927 static __attribute__((nonnull, warn_unused_result))
928 dberr_t
929 row_ins_foreign_check_on_constraint(
930 /*================================*/
931  que_thr_t* thr,
933  dict_foreign_t* foreign,
935  btr_pcur_t* pcur,
937  dtuple_t* entry,
939  mtr_t* mtr)
941 {
942  upd_node_t* node;
943  upd_node_t* cascade;
944  dict_table_t* table = foreign->foreign_table;
946  dict_index_t* clust_index;
947  dtuple_t* ref;
948  mem_heap_t* upd_vec_heap = NULL;
949  const rec_t* rec;
950  const rec_t* clust_rec;
951  const buf_block_t* clust_block;
952  upd_t* update;
953  ulint n_to_update;
954  dberr_t err;
955  ulint i;
956  trx_t* trx;
957  mem_heap_t* tmp_heap = NULL;
958  doc_id_t doc_id = FTS_NULL_DOC_ID;
959  ibool fts_col_affacted = FALSE;
960 
961  ut_a(thr);
962  ut_a(foreign);
963  ut_a(pcur);
964  ut_a(mtr);
965 
966  trx = thr_get_trx(thr);
967 
968  /* Since we are going to delete or update a row, we have to invalidate
969  the MySQL query cache for table. A deadlock of threads is not possible
970  here because the caller of this function does not hold any latches with
971  the sync0sync.h rank above the lock_sys_t::mutex. The query cache mutex
972  has a rank just above the lock_sys_t::mutex. */
973 
974  row_ins_invalidate_query_cache(thr, table->name);
975 
976  node = static_cast<upd_node_t*>(thr->run_node);
977 
978  if (node->is_delete && 0 == (foreign->type
981 
982  row_ins_foreign_report_err("Trying to delete",
983  thr, foreign,
984  btr_pcur_get_rec(pcur), entry);
985 
986  return(DB_ROW_IS_REFERENCED);
987  }
988 
989  if (!node->is_delete && 0 == (foreign->type
992 
993  /* This is an UPDATE */
994 
995  row_ins_foreign_report_err("Trying to update",
996  thr, foreign,
997  btr_pcur_get_rec(pcur), entry);
998 
999  return(DB_ROW_IS_REFERENCED);
1000  }
1001 
1002  if (node->cascade_node == NULL) {
1003  /* Extend our query graph by creating a child to current
1004  update node. The child is used in the cascade or set null
1005  operation. */
1006 
1007  node->cascade_heap = mem_heap_create(128);
1008  node->cascade_node = row_create_update_node_for_mysql(
1009  table, node->cascade_heap);
1010  que_node_set_parent(node->cascade_node, node);
1011  }
1012 
1013  /* Initialize cascade_node to do the operation we want. Note that we
1014  use the SAME cascade node to do all foreign key operations of the
1015  SQL DELETE: the table of the cascade node may change if there are
1016  several child tables to the table where the delete is done! */
1017 
1018  cascade = node->cascade_node;
1019 
1020  cascade->table = table;
1021 
1022  cascade->foreign = foreign;
1023 
1024  if (node->is_delete
1025  && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1026  cascade->is_delete = TRUE;
1027  } else {
1028  cascade->is_delete = FALSE;
1029 
1030  if (foreign->n_fields > cascade->update_n_fields) {
1031  /* We have to make the update vector longer */
1032 
1033  cascade->update = upd_create(foreign->n_fields,
1034  node->cascade_heap);
1035  cascade->update_n_fields = foreign->n_fields;
1036  }
1037  }
1038 
1039  /* We do not allow cyclic cascaded updating (DELETE is allowed,
1040  but not UPDATE) of the same table, as this can lead to an infinite
1041  cycle. Check that we are not updating the same table which is
1042  already being modified in this cascade chain. We have to check
1043  this also because the modification of the indexes of a 'parent'
1044  table may still be incomplete, and we must avoid seeing the indexes
1045  of the parent table in an inconsistent state! */
1046 
1047  if (!cascade->is_delete
1048  && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1049 
1050  /* We do not know if this would break foreign key
1051  constraints, but play safe and return an error */
1052 
1053  err = DB_ROW_IS_REFERENCED;
1054 
1055  row_ins_foreign_report_err(
1056  "Trying an update, possibly causing a cyclic"
1057  " cascaded update\n"
1058  "in the child table,", thr, foreign,
1059  btr_pcur_get_rec(pcur), entry);
1060 
1061  goto nonstandard_exit_func;
1062  }
1063 
1064  if (row_ins_cascade_n_ancestors(cascade) >= 15) {
1065  err = DB_ROW_IS_REFERENCED;
1066 
1067  row_ins_foreign_report_err(
1068  "Trying a too deep cascaded delete or update\n",
1069  thr, foreign, btr_pcur_get_rec(pcur), entry);
1070 
1071  goto nonstandard_exit_func;
1072  }
1073 
1074  index = btr_pcur_get_btr_cur(pcur)->index;
1075 
1076  ut_a(index == foreign->foreign_index);
1077 
1078  rec = btr_pcur_get_rec(pcur);
1079 
1080  tmp_heap = mem_heap_create(256);
1081 
1082  if (dict_index_is_clust(index)) {
1083  /* pcur is already positioned in the clustered index of
1084  the child table */
1085 
1086  clust_index = index;
1087  clust_rec = rec;
1088  clust_block = btr_pcur_get_block(pcur);
1089  } else {
1090  /* We have to look for the record in the clustered index
1091  in the child table */
1092 
1093  clust_index = dict_table_get_first_index(table);
1094 
1095  ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1096  tmp_heap);
1097  btr_pcur_open_with_no_init(clust_index, ref,
1098  PAGE_CUR_LE, BTR_SEARCH_LEAF,
1099  cascade->pcur, 0, mtr);
1100 
1101  clust_rec = btr_pcur_get_rec(cascade->pcur);
1102  clust_block = btr_pcur_get_block(cascade->pcur);
1103 
1104  if (!page_rec_is_user_rec(clust_rec)
1105  || btr_pcur_get_low_match(cascade->pcur)
1106  < dict_index_get_n_unique(clust_index)) {
1107 
1108  fputs("InnoDB: error in cascade of a foreign key op\n"
1109  "InnoDB: ", stderr);
1110  dict_index_name_print(stderr, trx, index);
1111 
1112  fputs("\n"
1113  "InnoDB: record ", stderr);
1114  rec_print(stderr, rec, index);
1115  fputs("\n"
1116  "InnoDB: clustered record ", stderr);
1117  rec_print(stderr, clust_rec, clust_index);
1118  fputs("\n"
1119  "InnoDB: Submit a detailed bug report to"
1120  " http://bugs.mysql.com\n", stderr);
1121  ut_ad(0);
1122  err = DB_SUCCESS;
1123 
1124  goto nonstandard_exit_func;
1125  }
1126  }
1127 
1128  /* Set an X-lock on the row to delete or update in the child table */
1129 
1130  err = lock_table(0, table, LOCK_IX, thr);
1131 
1132  if (err == DB_SUCCESS) {
1133  /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1134  we already have a normal shared lock on the appropriate
1135  gap if the search criterion was not unique */
1136 
1138  0, clust_block, clust_rec, clust_index,
1139  LOCK_X, LOCK_REC_NOT_GAP, thr);
1140  }
1141 
1142  if (err != DB_SUCCESS) {
1143 
1144  goto nonstandard_exit_func;
1145  }
1146 
1147  if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1148  /* This can happen if there is a circular reference of
1149  rows such that cascading delete comes to delete a row
1150  already in the process of being delete marked */
1151  err = DB_SUCCESS;
1152 
1153  goto nonstandard_exit_func;
1154  }
1155 
1156  if (table->fts) {
1157  doc_id = fts_get_doc_id_from_rec(table, clust_rec, tmp_heap);
1158  }
1159 
1160  if (node->is_delete
1161  ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1162  : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1163 
1164  /* Build the appropriate update vector which sets
1165  foreign->n_fields first fields in rec to SQL NULL */
1166 
1167  update = cascade->update;
1168 
1169  update->info_bits = 0;
1170  update->n_fields = foreign->n_fields;
1171  UNIV_MEM_INVALID(update->fields,
1172  update->n_fields * sizeof *update->fields);
1173 
1174  for (i = 0; i < foreign->n_fields; i++) {
1175  upd_field_t* ufield = &update->fields[i];
1176 
1178  table,
1179  dict_index_get_nth_col_no(index, i));
1180  ufield->orig_len = 0;
1181  ufield->exp = NULL;
1182  dfield_set_null(&ufield->new_val);
1183 
1184  if (table->fts && dict_table_is_fts_column(
1185  table->fts->indexes,
1186  dict_index_get_nth_col_no(index, i))
1187  != ULINT_UNDEFINED) {
1188  fts_col_affacted = TRUE;
1189  }
1190  }
1191 
1192  if (fts_col_affacted) {
1193  fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1194  }
1195  } else if (table->fts && cascade->is_delete) {
1196  /* DICT_FOREIGN_ON_DELETE_CASCADE case */
1197  for (i = 0; i < foreign->n_fields; i++) {
1198  if (table->fts && dict_table_is_fts_column(
1199  table->fts->indexes,
1200  dict_index_get_nth_col_no(index, i))
1201  != ULINT_UNDEFINED) {
1202  fts_col_affacted = TRUE;
1203  }
1204  }
1205 
1206  if (fts_col_affacted) {
1207  fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1208  }
1209  }
1210 
1211  if (!node->is_delete
1212  && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1213 
1214  /* Build the appropriate update vector which sets changing
1215  foreign->n_fields first fields in rec to new values */
1216 
1217  upd_vec_heap = mem_heap_create(256);
1218 
1219  n_to_update = row_ins_cascade_calc_update_vec(
1220  node, foreign, upd_vec_heap, trx, &fts_col_affacted);
1221 
1222  if (n_to_update == ULINT_UNDEFINED) {
1223  err = DB_ROW_IS_REFERENCED;
1224 
1225  row_ins_foreign_report_err(
1226  "Trying a cascaded update where the"
1227  " updated value in the child\n"
1228  "table would not fit in the length"
1229  " of the column, or the value would\n"
1230  "be NULL and the column is"
1231  " declared as not NULL in the child table,",
1232  thr, foreign, btr_pcur_get_rec(pcur), entry);
1233 
1234  goto nonstandard_exit_func;
1235  }
1236 
1237  if (cascade->update->n_fields == 0) {
1238 
1239  /* The update does not change any columns referred
1240  to in this foreign key constraint: no need to do
1241  anything */
1242 
1243  err = DB_SUCCESS;
1244 
1245  goto nonstandard_exit_func;
1246  }
1247 
1248  /* Mark the old Doc ID as deleted */
1249  if (fts_col_affacted) {
1250  ut_ad(table->fts);
1251  fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1252  }
1253  }
1254 
1255  /* Store pcur position and initialize or store the cascade node
1256  pcur stored position */
1257 
1258  btr_pcur_store_position(pcur, mtr);
1259 
1260  if (index == clust_index) {
1261  btr_pcur_copy_stored_position(cascade->pcur, pcur);
1262  } else {
1263  btr_pcur_store_position(cascade->pcur, mtr);
1264  }
1265 
1266  mtr_commit(mtr);
1267 
1268  ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1269 
1270  cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1271 
1272  err = row_update_cascade_for_mysql(thr, cascade,
1273  foreign->foreign_table);
1274 
1275  if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
1276  fprintf(stderr,
1277  "InnoDB: error: table %s has the counter 0"
1278  " though there is\n"
1279  "InnoDB: a FOREIGN KEY check running on it.\n",
1280  foreign->foreign_table->name);
1281  }
1282 
1283  /* Release the data dictionary latch for a while, so that we do not
1284  starve other threads from doing CREATE TABLE etc. if we have a huge
1285  cascaded operation running. The counter n_foreign_key_checks_running
1286  will prevent other users from dropping or ALTERing the table when we
1287  release the latch. */
1288 
1290 
1291  DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1292 
1293  row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1294 
1295  mtr_start(mtr);
1296 
1297  /* Restore pcur position */
1298 
1299  btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1300 
1301  if (tmp_heap) {
1302  mem_heap_free(tmp_heap);
1303  }
1304 
1305  if (upd_vec_heap) {
1306  mem_heap_free(upd_vec_heap);
1307  }
1308 
1309  return(err);
1310 
1311 nonstandard_exit_func:
1312  if (tmp_heap) {
1313  mem_heap_free(tmp_heap);
1314  }
1315 
1316  if (upd_vec_heap) {
1317  mem_heap_free(upd_vec_heap);
1318  }
1319 
1320  btr_pcur_store_position(pcur, mtr);
1321 
1322  mtr_commit(mtr);
1323  mtr_start(mtr);
1324 
1325  btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1326 
1327  return(err);
1328 }
1329 
1330 /*********************************************************************/
1334 static
1335 dberr_t
1336 row_ins_set_shared_rec_lock(
1337 /*========================*/
1338  ulint type,
1340  const buf_block_t* block,
1341  const rec_t* rec,
1342  dict_index_t* index,
1343  const ulint* offsets,
1344  que_thr_t* thr)
1345 {
1346  dberr_t err;
1347 
1348  ut_ad(rec_offs_validate(rec, index, offsets));
1349 
1350  if (dict_index_is_clust(index)) {
1352  0, block, rec, index, offsets, LOCK_S, type, thr);
1353  } else {
1355  0, block, rec, index, offsets, LOCK_S, type, thr);
1356  }
1357 
1358  return(err);
1359 }
1360 
1361 /*********************************************************************/
1365 static
1366 dberr_t
1367 row_ins_set_exclusive_rec_lock(
1368 /*===========================*/
1369  ulint type,
1371  const buf_block_t* block,
1372  const rec_t* rec,
1373  dict_index_t* index,
1374  const ulint* offsets,
1375  que_thr_t* thr)
1376 {
1377  dberr_t err;
1378 
1379  ut_ad(rec_offs_validate(rec, index, offsets));
1380 
1381  if (dict_index_is_clust(index)) {
1383  0, block, rec, index, offsets, LOCK_X, type, thr);
1384  } else {
1386  0, block, rec, index, offsets, LOCK_X, type, thr);
1387  }
1388 
1389  return(err);
1390 }
1391 
1392 /***************************************************************/
1397 UNIV_INTERN
1398 dberr_t
1400 /*=============================*/
1401  ibool check_ref,
1404  dict_foreign_t* foreign,
1407  dict_table_t* table,
1409  dtuple_t* entry,
1410  que_thr_t* thr)
1411 {
1412  dberr_t err;
1413  upd_node_t* upd_node;
1414  dict_table_t* check_table;
1415  dict_index_t* check_index;
1416  ulint n_fields_cmp;
1417  btr_pcur_t pcur;
1418  int cmp;
1419  ulint i;
1420  mtr_t mtr;
1421  trx_t* trx = thr_get_trx(thr);
1422  mem_heap_t* heap = NULL;
1423  ulint offsets_[REC_OFFS_NORMAL_SIZE];
1424  ulint* offsets = offsets_;
1425  rec_offs_init(offsets_);
1426 
1427 run_again:
1428 #ifdef UNIV_SYNC_DEBUG
1429  ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
1430 #endif /* UNIV_SYNC_DEBUG */
1431 
1432  err = DB_SUCCESS;
1433 
1434  if (trx->check_foreigns == FALSE) {
1435  /* The user has suppressed foreign key checks currently for
1436  this session */
1437  goto exit_func;
1438  }
1439 
1440  /* If any of the foreign key fields in entry is SQL NULL, we
1441  suppress the foreign key check: this is compatible with Oracle,
1442  for example */
1443 
1444  for (i = 0; i < foreign->n_fields; i++) {
1445  if (UNIV_SQL_NULL == dfield_get_len(
1446  dtuple_get_nth_field(entry, i))) {
1447 
1448  goto exit_func;
1449  }
1450  }
1451 
1452  if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1453  upd_node = static_cast<upd_node_t*>(thr->run_node);
1454 
1455  if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1456  /* If a cascaded update is done as defined by a
1457  foreign key constraint, do not check that
1458  constraint for the child row. In ON UPDATE CASCADE
1459  the update of the parent row is only half done when
1460  we come here: if we would check the constraint here
1461  for the child row it would fail.
1462 
1463  A QUESTION remains: if in the child table there are
1464  several constraints which refer to the same parent
1465  table, we should merge all updates to the child as
1466  one update? And the updates can be contradictory!
1467  Currently we just perform the update associated
1468  with each foreign key constraint, one after
1469  another, and the user has problems predicting in
1470  which order they are performed. */
1471 
1472  goto exit_func;
1473  }
1474  }
1475 
1476  if (check_ref) {
1477  check_table = foreign->referenced_table;
1478  check_index = foreign->referenced_index;
1479  } else {
1480  check_table = foreign->foreign_table;
1481  check_index = foreign->foreign_index;
1482  }
1483 
1484  if (check_table == NULL
1485  || check_table->ibd_file_missing
1486  || check_index == NULL) {
1487 
1488  if (!srv_read_only_mode && check_ref) {
1489  FILE* ef = dict_foreign_err_file;
1490 
1491  row_ins_set_detailed(trx, foreign);
1492 
1493  row_ins_foreign_trx_print(trx);
1494 
1495  fputs("Foreign key constraint fails for table ", ef);
1496  ut_print_name(ef, trx, TRUE,
1497  foreign->foreign_table_name);
1498  fputs(":\n", ef);
1500  ef, trx, foreign, TRUE);
1501  fputs("\nTrying to add to index ", ef);
1502  ut_print_name(ef, trx, FALSE,
1503  foreign->foreign_index->name);
1504  fputs(" tuple:\n", ef);
1505  dtuple_print(ef, entry);
1506  fputs("\nBut the parent table ", ef);
1507  ut_print_name(ef, trx, TRUE,
1508  foreign->referenced_table_name);
1509  fputs("\nor its .ibd file does"
1510  " not currently exist!\n", ef);
1511  mutex_exit(&dict_foreign_err_mutex);
1512 
1513  err = DB_NO_REFERENCED_ROW;
1514  }
1515 
1516  goto exit_func;
1517  }
1518 
1519  if (check_table != table) {
1520  /* We already have a LOCK_IX on table, but not necessarily
1521  on check_table */
1522 
1523  err = lock_table(0, check_table, LOCK_IS, thr);
1524 
1525  if (err != DB_SUCCESS) {
1526 
1527  goto do_possible_lock_wait;
1528  }
1529  }
1530 
1531  mtr_start(&mtr);
1532 
1533  /* Store old value on n_fields_cmp */
1534 
1535  n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1536 
1537  dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1538 
1539  btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1540  BTR_SEARCH_LEAF, &pcur, &mtr);
1541 
1542  /* Scan index records and check if there is a matching record */
1543 
1544  do {
1545  const rec_t* rec = btr_pcur_get_rec(&pcur);
1546  const buf_block_t* block = btr_pcur_get_block(&pcur);
1547 
1548  if (page_rec_is_infimum(rec)) {
1549 
1550  continue;
1551  }
1552 
1553  offsets = rec_get_offsets(rec, check_index,
1554  offsets, ULINT_UNDEFINED, &heap);
1555 
1556  if (page_rec_is_supremum(rec)) {
1557 
1558  err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1559  rec, check_index,
1560  offsets, thr);
1561  switch (err) {
1562  case DB_SUCCESS_LOCKED_REC:
1563  case DB_SUCCESS:
1564  continue;
1565  default:
1566  goto end_scan;
1567  }
1568  }
1569 
1570  cmp = cmp_dtuple_rec(entry, rec, offsets);
1571 
1572  if (cmp == 0) {
1573  if (rec_get_deleted_flag(rec,
1574  rec_offs_comp(offsets))) {
1575  err = row_ins_set_shared_rec_lock(
1576  LOCK_ORDINARY, block,
1577  rec, check_index, offsets, thr);
1578  switch (err) {
1579  case DB_SUCCESS_LOCKED_REC:
1580  case DB_SUCCESS:
1581  break;
1582  default:
1583  goto end_scan;
1584  }
1585  } else {
1586  /* Found a matching record. Lock only
1587  a record because we can allow inserts
1588  into gaps */
1589 
1590  err = row_ins_set_shared_rec_lock(
1591  LOCK_REC_NOT_GAP, block,
1592  rec, check_index, offsets, thr);
1593 
1594  switch (err) {
1595  case DB_SUCCESS_LOCKED_REC:
1596  case DB_SUCCESS:
1597  break;
1598  default:
1599  goto end_scan;
1600  }
1601 
1602  if (check_ref) {
1603  err = DB_SUCCESS;
1604 
1605  goto end_scan;
1606  } else if (foreign->type != 0) {
1607  /* There is an ON UPDATE or ON DELETE
1608  condition: check them in a separate
1609  function */
1610 
1611  err = row_ins_foreign_check_on_constraint(
1612  thr, foreign, &pcur, entry,
1613  &mtr);
1614  if (err != DB_SUCCESS) {
1615  /* Since reporting a plain
1616  "duplicate key" error
1617  message to the user in
1618  cases where a long CASCADE
1619  operation would lead to a
1620  duplicate key in some
1621  other table is very
1622  confusing, map duplicate
1623  key errors resulting from
1624  FK constraints to a
1625  separate error code. */
1626 
1627  if (err == DB_DUPLICATE_KEY) {
1629  }
1630 
1631  goto end_scan;
1632  }
1633 
1634  /* row_ins_foreign_check_on_constraint
1635  may have repositioned pcur on a
1636  different block */
1637  block = btr_pcur_get_block(&pcur);
1638  } else {
1639  row_ins_foreign_report_err(
1640  "Trying to delete or update",
1641  thr, foreign, rec, entry);
1642 
1643  err = DB_ROW_IS_REFERENCED;
1644  goto end_scan;
1645  }
1646  }
1647  } else {
1648  ut_a(cmp < 0);
1649 
1650  err = row_ins_set_shared_rec_lock(
1651  LOCK_GAP, block,
1652  rec, check_index, offsets, thr);
1653 
1654  switch (err) {
1655  case DB_SUCCESS_LOCKED_REC:
1656  case DB_SUCCESS:
1657  if (check_ref) {
1658  err = DB_NO_REFERENCED_ROW;
1659  row_ins_foreign_report_add_err(
1660  trx, foreign, rec, entry);
1661  } else {
1662  err = DB_SUCCESS;
1663  }
1664  default:
1665  break;
1666  }
1667 
1668  goto end_scan;
1669  }
1670  } while (btr_pcur_move_to_next(&pcur, &mtr));
1671 
1672  if (check_ref) {
1673  row_ins_foreign_report_add_err(
1674  trx, foreign, btr_pcur_get_rec(&pcur), entry);
1675  err = DB_NO_REFERENCED_ROW;
1676  } else {
1677  err = DB_SUCCESS;
1678  }
1679 
1680 end_scan:
1681  btr_pcur_close(&pcur);
1682 
1683  mtr_commit(&mtr);
1684 
1685  /* Restore old value */
1686  dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1687 
1688 do_possible_lock_wait:
1689  if (err == DB_LOCK_WAIT) {
1690  bool verified = false;
1691 
1692  trx->error_state = err;
1693 
1695 
1697 
1698  if (check_table->to_be_dropped) {
1699  /* The table is being dropped. We shall timeout
1700  this operation */
1701  err = DB_LOCK_WAIT_TIMEOUT;
1702  goto exit_func;
1703  }
1704 
1705  /* We had temporarily released dict_operation_lock in
1706  above lock sleep wait, now we have the lock again, and
1707  we will need to re-check whether the foreign key has been
1708  dropped. We only need to verify if the table is referenced
1709  table case (check_ref == 0), since MDL lock will prevent
1710  concurrent DDL and DML on the same table */
1711  if (!check_ref) {
1712  for (const dict_foreign_t* check_foreign
1714  check_foreign;
1715  check_foreign = UT_LIST_GET_NEXT(
1716  referenced_list, check_foreign)) {
1717  if (check_foreign == foreign) {
1718  verified = true;
1719  break;
1720  }
1721  }
1722  } else {
1723  verified = true;
1724  }
1725 
1726  if (!verified) {
1727  err = DB_DICT_CHANGED;
1728  } else if (trx->error_state == DB_SUCCESS) {
1729  goto run_again;
1730  } else {
1731  err = trx->error_state;
1732  }
1733  }
1734 
1735 exit_func:
1736  if (UNIV_LIKELY_NULL(heap)) {
1737  mem_heap_free(heap);
1738  }
1739  return(err);
1740 }
1741 
1742 /***************************************************************/
1749 static __attribute__((nonnull, warn_unused_result))
1750 dberr_t
1751 row_ins_check_foreign_constraints(
1752 /*==============================*/
1753  dict_table_t* table,
1754  dict_index_t* index,
1755  dtuple_t* entry,
1756  que_thr_t* thr)
1757 {
1758  dict_foreign_t* foreign;
1759  dberr_t err;
1760  trx_t* trx;
1761  ibool got_s_lock = FALSE;
1762 
1763  trx = thr_get_trx(thr);
1764 
1765  foreign = UT_LIST_GET_FIRST(table->foreign_list);
1766 
1767  DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1768  "foreign_constraint_check_for_ins");
1769 
1770  while (foreign) {
1771  if (foreign->foreign_index == index) {
1772  dict_table_t* ref_table = NULL;
1773  dict_table_t* foreign_table = foreign->foreign_table;
1775  = foreign->referenced_table;
1776 
1777  if (referenced_table == NULL) {
1778 
1779  ref_table = dict_table_open_on_name(
1781  FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1782  }
1783 
1784  if (0 == trx->dict_operation_lock_mode) {
1785  got_s_lock = TRUE;
1786 
1787  row_mysql_freeze_data_dictionary(trx);
1788  }
1789 
1790  if (referenced_table) {
1791  os_inc_counter(dict_sys->mutex,
1792  foreign_table
1794  }
1795 
1796  /* NOTE that if the thread ends up waiting for a lock
1797  we will release dict_operation_lock temporarily!
1798  But the counter on the table protects the referenced
1799  table from being dropped while the check is running. */
1800 
1802  TRUE, foreign, table, entry, thr);
1803 
1804  DBUG_EXECUTE_IF("row_ins_dict_change_err",
1805  err = DB_DICT_CHANGED;);
1806 
1807  if (referenced_table) {
1808  os_dec_counter(dict_sys->mutex,
1809  foreign_table
1811  }
1812 
1813  if (got_s_lock) {
1815  }
1816 
1817  if (ref_table != NULL) {
1818  dict_table_close(ref_table, FALSE, FALSE);
1819  }
1820 
1821  if (err != DB_SUCCESS) {
1822 
1823  return(err);
1824  }
1825  }
1826 
1827  foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
1828  }
1829 
1830  return(DB_SUCCESS);
1831 }
1832 
1833 /***************************************************************/
1837 static
1838 ibool
1839 row_ins_dupl_error_with_rec(
1840 /*========================*/
1841  const rec_t* rec,
1844  const dtuple_t* entry,
1845  dict_index_t* index,
1846  const ulint* offsets)
1847 {
1848  ulint matched_fields;
1849  ulint matched_bytes;
1850  ulint n_unique;
1851  ulint i;
1852 
1853  ut_ad(rec_offs_validate(rec, index, offsets));
1854 
1855  n_unique = dict_index_get_n_unique(index);
1856 
1857  matched_fields = 0;
1858  matched_bytes = 0;
1859 
1860  cmp_dtuple_rec_with_match(entry, rec, offsets,
1861  &matched_fields, &matched_bytes);
1862 
1863  if (matched_fields < n_unique) {
1864 
1865  return(FALSE);
1866  }
1867 
1868  /* In a unique secondary index we allow equal key values if they
1869  contain SQL NULLs */
1870 
1871  if (!dict_index_is_clust(index)) {
1872 
1873  for (i = 0; i < n_unique; i++) {
1874  if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1875 
1876  return(FALSE);
1877  }
1878  }
1879  }
1880 
1881  return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1882 }
1883 
1884 /***************************************************************/
1889 static __attribute__((nonnull, warn_unused_result))
1890 dberr_t
1891 row_ins_scan_sec_index_for_duplicate(
1892 /*=================================*/
1893  ulint flags,
1894  dict_index_t* index,
1895  dtuple_t* entry,
1896  que_thr_t* thr,
1897  bool s_latch,
1898  mtr_t* mtr,
1899  mem_heap_t* offsets_heap)
1901 {
1902  ulint n_unique;
1903  int cmp;
1904  ulint n_fields_cmp;
1905  btr_pcur_t pcur;
1906  dberr_t err = DB_SUCCESS;
1907  ulint allow_duplicates;
1908  ulint* offsets = NULL;
1909 
1910 #ifdef UNIV_SYNC_DEBUG
1911  ut_ad(s_latch == rw_lock_own(&index->lock, RW_LOCK_SHARED));
1912 #endif /* UNIV_SYNC_DEBUG */
1913 
1914  n_unique = dict_index_get_n_unique(index);
1915 
1916  /* If the secondary index is unique, but one of the fields in the
1917  n_unique first fields is NULL, a unique key violation cannot occur,
1918  since we define NULL != NULL in this case */
1919 
1920  for (ulint i = 0; i < n_unique; i++) {
1921  if (UNIV_SQL_NULL == dfield_get_len(
1922  dtuple_get_nth_field(entry, i))) {
1923 
1924  return(DB_SUCCESS);
1925  }
1926  }
1927 
1928  /* Store old value on n_fields_cmp */
1929 
1930  n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1931 
1932  dtuple_set_n_fields_cmp(entry, n_unique);
1933 
1934  btr_pcur_open(index, entry, PAGE_CUR_GE,
1935  s_latch
1937  : BTR_SEARCH_LEAF,
1938  &pcur, mtr);
1939 
1940  allow_duplicates = thr_get_trx(thr)->duplicates;
1941 
1942  /* Scan index records and check if there is a duplicate */
1943 
1944  do {
1945  const rec_t* rec = btr_pcur_get_rec(&pcur);
1946  const buf_block_t* block = btr_pcur_get_block(&pcur);
1947  ulint lock_type;
1948 
1949  if (page_rec_is_infimum(rec)) {
1950 
1951  continue;
1952  }
1953 
1954  offsets = rec_get_offsets(rec, index, offsets,
1955  ULINT_UNDEFINED, &offsets_heap);
1956 
1957  /* If the transaction isolation level is no stronger than
1958  READ COMMITTED, then avoid gap locks. */
1959  if (!page_rec_is_supremum(rec)
1960  && thr_get_trx(thr)->isolation_level
1961  <= TRX_ISO_READ_COMMITTED) {
1962  lock_type = LOCK_REC_NOT_GAP;
1963  } else {
1964  lock_type = LOCK_ORDINARY;
1965  }
1966 
1967  if (flags & BTR_NO_LOCKING_FLAG) {
1968  /* Set no locks when applying log
1969  in online table rebuild. */
1970  } else if (allow_duplicates) {
1971 
1972  /* If the SQL-query will update or replace
1973  duplicate key we will take X-lock for
1974  duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1975  INSERT ON DUPLICATE KEY UPDATE). */
1976 
1977  err = row_ins_set_exclusive_rec_lock(
1978  lock_type, block, rec, index, offsets, thr);
1979  } else {
1980 
1981  err = row_ins_set_shared_rec_lock(
1982  lock_type, block, rec, index, offsets, thr);
1983  }
1984 
1985  switch (err) {
1986  case DB_SUCCESS_LOCKED_REC:
1987  err = DB_SUCCESS;
1988  case DB_SUCCESS:
1989  break;
1990  default:
1991  goto end_scan;
1992  }
1993 
1994  if (page_rec_is_supremum(rec)) {
1995 
1996  continue;
1997  }
1998 
1999  cmp = cmp_dtuple_rec(entry, rec, offsets);
2000 
2001  if (cmp == 0) {
2002  if (row_ins_dupl_error_with_rec(rec, entry,
2003  index, offsets)) {
2004  err = DB_DUPLICATE_KEY;
2005 
2006  thr_get_trx(thr)->error_info = index;
2007 
2008  /* If the duplicate is on hidden FTS_DOC_ID,
2009  state so in the error log */
2010  if (DICT_TF2_FLAG_IS_SET(
2011  index->table,
2013  && strcmp(index->name,
2014  FTS_DOC_ID_INDEX_NAME) == 0) {
2015  ib_logf(IB_LOG_LEVEL_ERROR,
2016  "Duplicate FTS_DOC_ID value"
2017  " on table %s",
2018  index->table->name);
2019  }
2020 
2021  goto end_scan;
2022  }
2023  } else {
2024  ut_a(cmp < 0);
2025  goto end_scan;
2026  }
2027  } while (btr_pcur_move_to_next(&pcur, mtr));
2028 
2029 end_scan:
2030  /* Restore old value */
2031  dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2032 
2033  return(err);
2034 }
2035 
2041 static __attribute__((nonnull, warn_unused_result))
2042 dberr_t
2043 row_ins_duplicate_online(
2044 /*=====================*/
2045  ulint n_uniq,
2046  const dtuple_t* entry,
2047  const rec_t* rec,
2048  ulint* offsets)
2049 {
2050  ulint fields = 0;
2051  ulint bytes = 0;
2052 
2053  /* During rebuild, there should not be any delete-marked rows
2054  in the new table. */
2055  ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2056  ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2057 
2058  /* Compare the PRIMARY KEY fields and the
2059  DB_TRX_ID, DB_ROLL_PTR. */
2061  entry, rec, offsets, n_uniq + 2, &fields, &bytes);
2062 
2063  if (fields < n_uniq) {
2064  /* Not a duplicate. */
2065  return(DB_SUCCESS);
2066  }
2067 
2068  if (fields == n_uniq + 2) {
2069  /* rec is an exact match of entry. */
2070  ut_ad(bytes == 0);
2071  return(DB_SUCCESS_LOCKED_REC);
2072  }
2073 
2074  return(DB_DUPLICATE_KEY);
2075 }
2076 
2082 static __attribute__((nonnull, warn_unused_result))
2083 dberr_t
2084 row_ins_duplicate_error_in_clust_online(
2085 /*====================================*/
2086  ulint n_uniq,
2087  const dtuple_t* entry,
2088  const btr_cur_t*cursor,
2089  ulint** offsets,
2090  mem_heap_t** heap)
2091 {
2092  dberr_t err = DB_SUCCESS;
2093  const rec_t* rec = btr_cur_get_rec(cursor);
2094 
2095  if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2096  *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2097  ULINT_UNDEFINED, heap);
2098  err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2099  if (err != DB_SUCCESS) {
2100  return(err);
2101  }
2102  }
2103 
2104  rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2105 
2106  if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2107  *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2108  ULINT_UNDEFINED, heap);
2109  err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2110  }
2111 
2112  return(err);
2113 }
2114 
2115 /***************************************************************/
2125 static __attribute__((nonnull, warn_unused_result))
2126 dberr_t
2127 row_ins_duplicate_error_in_clust(
2128 /*=============================*/
2129  ulint flags,
2130  btr_cur_t* cursor,
2131  const dtuple_t* entry,
2132  que_thr_t* thr,
2133  mtr_t* mtr)
2134 {
2135  dberr_t err;
2136  rec_t* rec;
2137  ulint n_unique;
2138  trx_t* trx = thr_get_trx(thr);
2139  mem_heap_t*heap = NULL;
2140  ulint offsets_[REC_OFFS_NORMAL_SIZE];
2141  ulint* offsets = offsets_;
2142  rec_offs_init(offsets_);
2143 
2144  UT_NOT_USED(mtr);
2145 
2146  ut_ad(dict_index_is_clust(cursor->index));
2147 
2148  /* NOTE: For unique non-clustered indexes there may be any number
2149  of delete marked records with the same value for the non-clustered
2150  index key (remember multiversioning), and which differ only in
2151  the row refererence part of the index record, containing the
2152  clustered index key fields. For such a secondary index record,
2153  to avoid race condition, we must FIRST do the insertion and after
2154  that check that the uniqueness condition is not breached! */
2155 
2156  /* NOTE: A problem is that in the B-tree node pointers on an
2157  upper level may match more to the entry than the actual existing
2158  user records on the leaf level. So, even if low_match would suggest
2159  that a duplicate key violation may occur, this may not be the case. */
2160 
2161  n_unique = dict_index_get_n_unique(cursor->index);
2162 
2163  if (cursor->low_match >= n_unique) {
2164 
2165  rec = btr_cur_get_rec(cursor);
2166 
2167  if (!page_rec_is_infimum(rec)) {
2168  offsets = rec_get_offsets(rec, cursor->index, offsets,
2169  ULINT_UNDEFINED, &heap);
2170 
2171  /* We set a lock on the possible duplicate: this
2172  is needed in logical logging of MySQL to make
2173  sure that in roll-forward we get the same duplicate
2174  errors as in original execution */
2175 
2176  if (trx->duplicates) {
2177 
2178  /* If the SQL-query will update or replace
2179  duplicate key we will take X-lock for
2180  duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2181  INSERT ON DUPLICATE KEY UPDATE). */
2182 
2183  err = row_ins_set_exclusive_rec_lock(
2185  btr_cur_get_block(cursor),
2186  rec, cursor->index, offsets, thr);
2187  } else {
2188 
2189  err = row_ins_set_shared_rec_lock(
2191  btr_cur_get_block(cursor), rec,
2192  cursor->index, offsets, thr);
2193  }
2194 
2195  switch (err) {
2196  case DB_SUCCESS_LOCKED_REC:
2197  case DB_SUCCESS:
2198  break;
2199  default:
2200  goto func_exit;
2201  }
2202 
2203  if (row_ins_dupl_error_with_rec(
2204  rec, entry, cursor->index, offsets)) {
2205 duplicate:
2206  trx->error_info = cursor->index;
2207  err = DB_DUPLICATE_KEY;
2208  goto func_exit;
2209  }
2210  }
2211  }
2212 
2213  if (cursor->up_match >= n_unique) {
2214 
2215  rec = page_rec_get_next(btr_cur_get_rec(cursor));
2216 
2217  if (!page_rec_is_supremum(rec)) {
2218  offsets = rec_get_offsets(rec, cursor->index, offsets,
2219  ULINT_UNDEFINED, &heap);
2220 
2221  if (trx->duplicates) {
2222 
2223  /* If the SQL-query will update or replace
2224  duplicate key we will take X-lock for
2225  duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2226  INSERT ON DUPLICATE KEY UPDATE). */
2227 
2228  err = row_ins_set_exclusive_rec_lock(
2230  btr_cur_get_block(cursor),
2231  rec, cursor->index, offsets, thr);
2232  } else {
2233 
2234  err = row_ins_set_shared_rec_lock(
2236  btr_cur_get_block(cursor),
2237  rec, cursor->index, offsets, thr);
2238  }
2239 
2240  switch (err) {
2241  case DB_SUCCESS_LOCKED_REC:
2242  case DB_SUCCESS:
2243  break;
2244  default:
2245  goto func_exit;
2246  }
2247 
2248  if (row_ins_dupl_error_with_rec(
2249  rec, entry, cursor->index, offsets)) {
2250  goto duplicate;
2251  }
2252  }
2253 
2254  /* This should never happen */
2255  ut_error;
2256  }
2257 
2258  err = DB_SUCCESS;
2259 func_exit:
2260  if (UNIV_LIKELY_NULL(heap)) {
2261  mem_heap_free(heap);
2262  }
2263  return(err);
2264 }
2265 
2266 /***************************************************************/
2277 UNIV_INLINE
2278 ibool
2279 row_ins_must_modify_rec(
2280 /*====================*/
2281  const btr_cur_t* cursor)
2282 {
2283  /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2284  Because node pointers on upper levels of the B-tree may match more
2285  to entry than to actual user records on the leaf level, we
2286  have to check if the candidate record is actually a user record.
2287  A clustered index node pointer contains index->n_unique first fields,
2288  and a secondary index node pointer contains all index fields. */
2289 
2290  return(cursor->low_match
2292  && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2293 }
2294 
2295 /***************************************************************/
2306 UNIV_INTERN
2307 dberr_t
2309 /*==========================*/
2310  ulint flags,
2311  ulint mode,
2314  dict_index_t* index,
2315  ulint n_uniq,
2316  dtuple_t* entry,
2317  ulint n_ext,
2318  que_thr_t* thr)
2319 {
2320  btr_cur_t cursor;
2321  ulint* offsets = NULL;
2322  dberr_t err;
2323  big_rec_t* big_rec = NULL;
2324  mtr_t mtr;
2325  mem_heap_t* offsets_heap = NULL;
2326 
2327  ut_ad(dict_index_is_clust(index));
2328  ut_ad(!dict_index_is_unique(index)
2329  || n_uniq == dict_index_get_n_unique(index));
2330  ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2331 
2332  mtr_start(&mtr);
2333 
2334  if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2336  mtr_s_lock(dict_index_get_lock(index), &mtr);
2337  }
2338 
2339  cursor.thr = thr;
2340 
2341  /* Note that we use PAGE_CUR_LE as the search mode, because then
2342  the function will return in both low_match and up_match of the
2343  cursor sensible values */
2344 
2345  btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, mode,
2346  &cursor, 0, __FILE__, __LINE__, &mtr);
2347 
2348 #ifdef UNIV_DEBUG
2349  {
2350  page_t* page = btr_cur_get_page(&cursor);
2351  rec_t* first_rec = page_rec_get_next(
2352  page_get_infimum_rec(page));
2353 
2354  ut_ad(page_rec_is_supremum(first_rec)
2355  || rec_get_n_fields(first_rec, index)
2356  == dtuple_get_n_fields(entry));
2357  }
2358 #endif
2359 
2360  if (n_uniq && (cursor.up_match >= n_uniq
2361  || cursor.low_match >= n_uniq)) {
2362 
2363  if (flags
2364  == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2366  /* Set no locks when applying log
2367  in online table rebuild. Only check for duplicates. */
2368  err = row_ins_duplicate_error_in_clust_online(
2369  n_uniq, entry, &cursor,
2370  &offsets, &offsets_heap);
2371 
2372  switch (err) {
2373  case DB_SUCCESS:
2374  break;
2375  default:
2376  ut_ad(0);
2377  /* fall through */
2378  case DB_SUCCESS_LOCKED_REC:
2379  case DB_DUPLICATE_KEY:
2380  thr_get_trx(thr)->error_info = cursor.index;
2381  }
2382  } else {
2383  /* Note that the following may return also
2384  DB_LOCK_WAIT */
2385 
2386  err = row_ins_duplicate_error_in_clust(
2387  flags, &cursor, entry, thr, &mtr);
2388  }
2389 
2390  if (err != DB_SUCCESS) {
2391 err_exit:
2392  mtr_commit(&mtr);
2393  goto func_exit;
2394  }
2395  }
2396 
2397  if (row_ins_must_modify_rec(&cursor)) {
2398  /* There is already an index entry with a long enough common
2399  prefix, we must convert the insert into a modify of an
2400  existing record */
2401  mem_heap_t* entry_heap = mem_heap_create(1024);
2402 
2403  err = row_ins_clust_index_entry_by_modify(
2404  flags, mode, &cursor, &offsets, &offsets_heap,
2405  entry_heap, &big_rec, entry, thr, &mtr);
2406 
2407  rec_t* rec = btr_cur_get_rec(&cursor);
2408 
2409  if (big_rec) {
2410  ut_a(err == DB_SUCCESS);
2411  /* Write out the externally stored
2412  columns while still x-latching
2413  index->lock and block->lock. Allocate
2414  pages for big_rec in the mtr that
2415  modified the B-tree, but be sure to skip
2416  any pages that were freed in mtr. We will
2417  write out the big_rec pages before
2418  committing the B-tree mini-transaction. If
2419  the system crashes so that crash recovery
2420  will not replay the mtr_commit(&mtr), the
2421  big_rec pages will be left orphaned until
2422  the pages are allocated for something else.
2423 
2424  TODO: If the allocation extends the
2425  tablespace, it will not be redo
2426  logged, in either mini-transaction.
2427  Tablespace extension should be
2428  redo-logged in the big_rec
2429  mini-transaction, so that recovery
2430  will not fail when the big_rec was
2431  written to the extended portion of the
2432  file, in case the file was somehow
2433  truncated in the crash. */
2434 
2435  DEBUG_SYNC_C_IF_THD(
2436  thr_get_trx(thr)->mysql_thd,
2437  "before_row_ins_upd_extern");
2439  index, btr_cur_get_block(&cursor),
2440  rec, offsets, big_rec, &mtr,
2442  DEBUG_SYNC_C_IF_THD(
2443  thr_get_trx(thr)->mysql_thd,
2444  "after_row_ins_upd_extern");
2445  /* If writing big_rec fails (for
2446  example, because of DB_OUT_OF_FILE_SPACE),
2447  the record will be corrupted. Even if
2448  we did not update any externally
2449  stored columns, our update could cause
2450  the record to grow so that a
2451  non-updated column was selected for
2452  external storage. This non-update
2453  would not have been written to the
2454  undo log, and thus the record cannot
2455  be rolled back.
2456 
2457  However, because we have not executed
2458  mtr_commit(mtr) yet, the update will
2459  not be replayed in crash recovery, and
2460  the following assertion failure will
2461  effectively "roll back" the operation. */
2462  ut_a(err == DB_SUCCESS);
2463  dtuple_big_rec_free(big_rec);
2464  }
2465 
2466  if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2467  row_log_table_insert(rec, index, offsets);
2468  }
2469 
2470  mtr_commit(&mtr);
2471  mem_heap_free(entry_heap);
2472  } else {
2473  rec_t* insert_rec;
2474 
2475  if (mode != BTR_MODIFY_TREE) {
2476  ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2477  == BTR_MODIFY_LEAF);
2479  flags, &cursor, &offsets, &offsets_heap,
2480  entry, &insert_rec, &big_rec,
2481  n_ext, thr, &mtr);
2482  } else {
2484 
2485  err = DB_LOCK_TABLE_FULL;
2486  goto err_exit;
2487  }
2488 
2490  flags, &cursor,
2491  &offsets, &offsets_heap,
2492  entry, &insert_rec, &big_rec,
2493  n_ext, thr, &mtr);
2494 
2495  if (err == DB_FAIL) {
2497  flags, &cursor,
2498  &offsets, &offsets_heap,
2499  entry, &insert_rec, &big_rec,
2500  n_ext, thr, &mtr);
2501  }
2502  }
2503 
2504  if (UNIV_LIKELY_NULL(big_rec)) {
2505  mtr_commit(&mtr);
2506 
2507  /* Online table rebuild could read (and
2508  ignore) the incomplete record at this point.
2509  If online rebuild is in progress, the
2510  row_ins_index_entry_big_rec() will write log. */
2511 
2512  DBUG_EXECUTE_IF(
2513  "row_ins_extern_checkpoint",
2515  LSN_MAX, TRUE););
2516  err = row_ins_index_entry_big_rec(
2517  entry, big_rec, offsets, &offsets_heap, index,
2518  thr_get_trx(thr)->mysql_thd,
2519  __FILE__, __LINE__);
2520  dtuple_convert_back_big_rec(index, entry, big_rec);
2521  } else {
2522  if (err == DB_SUCCESS
2523  && dict_index_is_online_ddl(index)) {
2525  insert_rec, index, offsets);
2526  }
2527 
2528  mtr_commit(&mtr);
2529  }
2530  }
2531 
2532 func_exit:
2533  if (offsets_heap) {
2534  mem_heap_free(offsets_heap);
2535  }
2536 
2537  return(err);
2538 }
2539 
2540 /***************************************************************/
2543 static __attribute__((nonnull, warn_unused_result))
2544 bool
2545 row_ins_sec_mtr_start_and_check_if_aborted(
2546 /*=======================================*/
2547  mtr_t* mtr,
2548  dict_index_t* index,
2549  bool check,
2550  ulint search_mode)
2552 {
2553  ut_ad(!dict_index_is_clust(index));
2554 
2555  mtr_start(mtr);
2556 
2557  if (!check) {
2558  return(false);
2559  }
2560 
2561  if (search_mode & BTR_ALREADY_S_LATCHED) {
2562  mtr_s_lock(dict_index_get_lock(index), mtr);
2563  } else {
2564  mtr_x_lock(dict_index_get_lock(index), mtr);
2565  }
2566 
2567  switch (index->online_status) {
2568  case ONLINE_INDEX_ABORTED:
2570  ut_ad(*index->name == TEMP_INDEX_PREFIX);
2571  return(true);
2572  case ONLINE_INDEX_COMPLETE:
2573  return(false);
2574  case ONLINE_INDEX_CREATION:
2575  break;
2576  }
2577 
2578  ut_error;
2579  return(true);
2580 }
2581 
2582 /***************************************************************/
2590 UNIV_INTERN
2591 dberr_t
2593 /*========================*/
2594  ulint flags,
2595  ulint mode,
2598  dict_index_t* index,
2599  mem_heap_t* offsets_heap,
2601  mem_heap_t* heap,
2602  dtuple_t* entry,
2603  trx_id_t trx_id,
2605  que_thr_t* thr)
2606 {
2607  btr_cur_t cursor;
2608  ulint search_mode = mode | BTR_INSERT;
2609  dberr_t err = DB_SUCCESS;
2610  ulint n_unique;
2611  mtr_t mtr;
2612  ulint* offsets = NULL;
2613 
2614  ut_ad(!dict_index_is_clust(index));
2615  ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2616 
2617  cursor.thr = thr;
2618  ut_ad(thr_get_trx(thr)->id);
2619  mtr_start(&mtr);
2620 
2621  /* Ensure that we acquire index->lock when inserting into an
2622  index with index->online_status == ONLINE_INDEX_COMPLETE, but
2623  could still be subject to rollback_inplace_alter_table().
2624  This prevents a concurrent change of index->online_status.
2625  The memory object cannot be freed as long as we have an open
2626  reference to the table, or index->table->n_ref_count > 0. */
2627  const bool check = *index->name == TEMP_INDEX_PREFIX;
2628  if (check) {
2629  DEBUG_SYNC_C("row_ins_sec_index_enter");
2630  if (mode == BTR_MODIFY_LEAF) {
2631  search_mode |= BTR_ALREADY_S_LATCHED;
2632  mtr_s_lock(dict_index_get_lock(index), &mtr);
2633  } else {
2634  mtr_x_lock(dict_index_get_lock(index), &mtr);
2635  }
2636 
2638  index, entry, thr_get_trx(thr)->id)) {
2639  goto func_exit;
2640  }
2641  }
2642 
2643  /* Note that we use PAGE_CUR_LE as the search mode, because then
2644  the function will return in both low_match and up_match of the
2645  cursor sensible values */
2646 
2647  if (!thr_get_trx(thr)->check_unique_secondary) {
2648  search_mode |= BTR_IGNORE_SEC_UNIQUE;
2649  }
2650 
2651  btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2652  search_mode,
2653  &cursor, 0, __FILE__, __LINE__, &mtr);
2654 
2655  if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2656  /* The insert was buffered during the search: we are done */
2657  goto func_exit;
2658  }
2659 
2660 #ifdef UNIV_DEBUG
2661  {
2662  page_t* page = btr_cur_get_page(&cursor);
2663  rec_t* first_rec = page_rec_get_next(
2664  page_get_infimum_rec(page));
2665 
2666  ut_ad(page_rec_is_supremum(first_rec)
2667  || rec_get_n_fields(first_rec, index)
2668  == dtuple_get_n_fields(entry));
2669  }
2670 #endif
2671 
2672  n_unique = dict_index_get_n_unique(index);
2673 
2674  if (dict_index_is_unique(index)
2675  && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
2676  mtr_commit(&mtr);
2677 
2678  DEBUG_SYNC_C("row_ins_sec_index_unique");
2679 
2680  if (row_ins_sec_mtr_start_and_check_if_aborted(
2681  &mtr, index, check, search_mode)) {
2682  goto func_exit;
2683  }
2684 
2685  err = row_ins_scan_sec_index_for_duplicate(
2686  flags, index, entry, thr, check, &mtr, offsets_heap);
2687 
2688  mtr_commit(&mtr);
2689 
2690  switch (err) {
2691  case DB_SUCCESS:
2692  break;
2693  case DB_DUPLICATE_KEY:
2694  if (*index->name == TEMP_INDEX_PREFIX) {
2695  ut_ad(!thr_get_trx(thr)
2696  ->dict_operation_lock_mode);
2697  mutex_enter(&dict_sys->mutex);
2699  index, index->table);
2700  mutex_exit(&dict_sys->mutex);
2701  /* Do not return any error to the
2702  caller. The duplicate will be reported
2703  by ALTER TABLE or CREATE UNIQUE INDEX.
2704  Unfortunately we cannot report the
2705  duplicate key value to the DDL thread,
2706  because the altered_table object is
2707  private to its call stack. */
2708  err = DB_SUCCESS;
2709  }
2710  /* fall through */
2711  default:
2712  return(err);
2713  }
2714 
2715  if (row_ins_sec_mtr_start_and_check_if_aborted(
2716  &mtr, index, check, search_mode)) {
2717  goto func_exit;
2718  }
2719 
2720  /* We did not find a duplicate and we have now
2721  locked with s-locks the necessary records to
2722  prevent any insertion of a duplicate by another
2723  transaction. Let us now reposition the cursor and
2724  continue the insertion. */
2725 
2727  index, 0, entry, PAGE_CUR_LE,
2728  search_mode & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE),
2729  &cursor, 0, __FILE__, __LINE__, &mtr);
2730  }
2731 
2732  if (row_ins_must_modify_rec(&cursor)) {
2733  /* There is already an index entry with a long enough common
2734  prefix, we must convert the insert into a modify of an
2735  existing record */
2736  offsets = rec_get_offsets(
2737  btr_cur_get_rec(&cursor), index, offsets,
2738  ULINT_UNDEFINED, &offsets_heap);
2739 
2740  err = row_ins_sec_index_entry_by_modify(
2741  flags, mode, &cursor, &offsets,
2742  offsets_heap, heap, entry, thr, &mtr);
2743  } else {
2744  rec_t* insert_rec;
2745  big_rec_t* big_rec;
2746 
2747  if (mode == BTR_MODIFY_LEAF) {
2749  flags, &cursor, &offsets, &offsets_heap,
2750  entry, &insert_rec,
2751  &big_rec, 0, thr, &mtr);
2752  } else {
2753  ut_ad(mode == BTR_MODIFY_TREE);
2755 
2756  err = DB_LOCK_TABLE_FULL;
2757  goto func_exit;
2758  }
2759 
2761  flags, &cursor,
2762  &offsets, &offsets_heap,
2763  entry, &insert_rec,
2764  &big_rec, 0, thr, &mtr);
2765  if (err == DB_FAIL) {
2767  flags, &cursor,
2768  &offsets, &offsets_heap,
2769  entry, &insert_rec,
2770  &big_rec, 0, thr, &mtr);
2771  }
2772  }
2773 
2774  if (err == DB_SUCCESS && trx_id) {
2776  btr_cur_get_block(&cursor),
2777  btr_cur_get_page_zip(&cursor),
2778  trx_id, &mtr);
2779  }
2780 
2781  ut_ad(!big_rec);
2782  }
2783 
2784 func_exit:
2785  mtr_commit(&mtr);
2786  return(err);
2787 }
2788 
2789 /***************************************************************/
2793 UNIV_INTERN
2794 dberr_t
2796 /*=============================*/
2797  const dtuple_t* entry,
2798  const big_rec_t* big_rec,
2799  ulint* offsets,
2800  mem_heap_t** heap,
2801  dict_index_t* index,
2802  const char* file,
2803 #ifndef DBUG_OFF
2804  const void* thd,
2805 #endif /* DBUG_OFF */
2806  ulint line)
2807 {
2808  mtr_t mtr;
2809  btr_cur_t cursor;
2810  rec_t* rec;
2811  dberr_t error;
2812 
2813  ut_ad(dict_index_is_clust(index));
2814 
2815  DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
2816 
2817  mtr_start(&mtr);
2818  btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2819  BTR_MODIFY_TREE, &cursor, 0,
2820  file, line, &mtr);
2821  rec = btr_cur_get_rec(&cursor);
2822  offsets = rec_get_offsets(rec, index, offsets,
2823  ULINT_UNDEFINED, heap);
2824 
2825  DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
2827  index, btr_cur_get_block(&cursor),
2828  rec, offsets, big_rec, &mtr, BTR_STORE_INSERT);
2829  DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
2830 
2831  if (error == DB_SUCCESS
2832  && dict_index_is_online_ddl(index)) {
2833  row_log_table_insert(rec, index, offsets);
2834  }
2835 
2836  mtr_commit(&mtr);
2837 
2838  return(error);
2839 }
2840 
2841 /***************************************************************/
2847 UNIV_INTERN
2848 dberr_t
2850 /*======================*/
2851  dict_index_t* index,
2852  dtuple_t* entry,
2853  que_thr_t* thr,
2854  ulint n_ext)
2855 {
2856  dberr_t err;
2857  ulint n_uniq;
2858 
2859  if (UT_LIST_GET_FIRST(index->table->foreign_list)) {
2860  err = row_ins_check_foreign_constraints(
2861  index->table, index, entry, thr);
2862  if (err != DB_SUCCESS) {
2863 
2864  return(err);
2865  }
2866  }
2867 
2868  n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
2869 
2870  /* Try first optimistic descent to the B-tree */
2871 
2872  log_free_check();
2873 
2875  0, BTR_MODIFY_LEAF, index, n_uniq, entry, n_ext, thr);
2876 
2877 #ifdef UNIV_DEBUG
2878  /* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
2879  Once it is fixed, remove the 'ifdef', 'if' and this comment. */
2880  if (!thr_get_trx(thr)->ddl) {
2881  DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
2882  "after_row_ins_clust_index_entry_leaf");
2883  }
2884 #endif /* UNIV_DEBUG */
2885 
2886  if (err != DB_FAIL) {
2887  DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
2888  return(err);
2889  }
2890 
2891  /* Try then pessimistic descent to the B-tree */
2892 
2893  log_free_check();
2894 
2896  0, BTR_MODIFY_TREE, index, n_uniq, entry, n_ext, thr));
2897 }
2898 
2899 /***************************************************************/
2905 UNIV_INTERN
2906 dberr_t
2908 /*====================*/
2909  dict_index_t* index,
2910  dtuple_t* entry,
2911  que_thr_t* thr)
2912 {
2913  dberr_t err;
2915  mem_heap_t* heap;
2916 
2917  if (UT_LIST_GET_FIRST(index->table->foreign_list)) {
2918  err = row_ins_check_foreign_constraints(index->table, index,
2919  entry, thr);
2920  if (err != DB_SUCCESS) {
2921 
2922  return(err);
2923  }
2924  }
2925 
2926  ut_ad(thr_get_trx(thr)->id);
2927 
2928  offsets_heap = mem_heap_create(1024);
2929  heap = mem_heap_create(1024);
2930 
2931  /* Try first optimistic descent to the B-tree */
2932 
2933  log_free_check();
2934 
2936  0, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry, 0, thr);
2937  if (err == DB_FAIL) {
2938  mem_heap_empty(heap);
2939 
2940  /* Try then pessimistic descent to the B-tree */
2941 
2942  log_free_check();
2943 
2945  0, BTR_MODIFY_TREE, index,
2946  offsets_heap, heap, entry, 0, thr);
2947  }
2948 
2949  mem_heap_free(heap);
2950  mem_heap_free(offsets_heap);
2951  return(err);
2952 }
2953 
2954 /***************************************************************/
2960 static
2961 dberr_t
2962 row_ins_index_entry(
2963 /*================*/
2964  dict_index_t* index,
2965  dtuple_t* entry,
2966  que_thr_t* thr)
2967 {
2968  if (dict_index_is_clust(index)) {
2969  return(row_ins_clust_index_entry(index, entry, thr, 0));
2970  } else {
2971  return(row_ins_sec_index_entry(index, entry, thr));
2972  }
2973 }
2974 
2975 /***********************************************************/
2978 static __attribute__((nonnull))
2979 void
2980 row_ins_index_entry_set_vals(
2981 /*=========================*/
2982  dict_index_t* index,
2983  dtuple_t* entry,
2984  const dtuple_t* row)
2985 {
2986  ulint n_fields;
2987  ulint i;
2988 
2989  n_fields = dtuple_get_n_fields(entry);
2990 
2991  for (i = 0; i < n_fields; i++) {
2992  dict_field_t* ind_field;
2993  dfield_t* field;
2994  const dfield_t* row_field;
2995  ulint len;
2996 
2997  field = dtuple_get_nth_field(entry, i);
2998  ind_field = dict_index_get_nth_field(index, i);
2999  row_field = dtuple_get_nth_field(row, ind_field->col->ind);
3000  len = dfield_get_len(row_field);
3001 
3002  /* Check column prefix indexes */
3003  if (ind_field->prefix_len > 0
3004  && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3005 
3006  const dict_col_t* col
3007  = dict_field_get_col(ind_field);
3008 
3010  col->prtype, col->mbminmaxlen,
3011  ind_field->prefix_len,
3012  len,
3013  static_cast<const char*>(
3014  dfield_get_data(row_field)));
3015 
3016  ut_ad(!dfield_is_ext(row_field));
3017  }
3018 
3019  dfield_set_data(field, dfield_get_data(row_field), len);
3020  if (dfield_is_ext(row_field)) {
3021  ut_ad(dict_index_is_clust(index));
3022  dfield_set_ext(field);
3023  }
3024  }
3025 }
3026 
3027 /***********************************************************/
3031 static __attribute__((nonnull, warn_unused_result))
3032 dberr_t
3033 row_ins_index_entry_step(
3034 /*=====================*/
3035  ins_node_t* node,
3036  que_thr_t* thr)
3037 {
3038  dberr_t err;
3039 
3040  ut_ad(dtuple_check_typed(node->row));
3041 
3042  row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3043 
3044  ut_ad(dtuple_check_typed(node->entry));
3045 
3046  err = row_ins_index_entry(node->index, node->entry, thr);
3047 
3048 #ifdef UNIV_DEBUG
3049  /* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
3050  Once it is fixed, remove the 'ifdef', 'if' and this comment. */
3051  if (!thr_get_trx(thr)->ddl) {
3052  DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3053  "after_row_ins_index_entry_step");
3054  }
3055 #endif /* UNIV_DEBUG */
3056 
3057  return(err);
3058 }
3059 
3060 /***********************************************************/
3062 UNIV_INLINE
3063 void
3064 row_ins_alloc_row_id_step(
3065 /*======================*/
3066  ins_node_t* node)
3067 {
3068  row_id_t row_id;
3069 
3070  ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3071 
3072  if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3073 
3074  /* No row id is stored if the clustered index is unique */
3075 
3076  return;
3077  }
3078 
3079  /* Fill in row id value to row */
3080 
3081  row_id = dict_sys_get_new_row_id();
3082 
3083  dict_sys_write_row_id(node->row_id_buf, row_id);
3084 }
3085 
3086 /***********************************************************/
3088 UNIV_INLINE
3089 void
3090 row_ins_get_row_from_values(
3091 /*========================*/
3092  ins_node_t* node)
3093 {
3094  que_node_t* list_node;
3095  dfield_t* dfield;
3096  dtuple_t* row;
3097  ulint i;
3098 
3099  /* The field values are copied in the buffers of the select node and
3100  it is safe to use them until we fetch from select again: therefore
3101  we can just copy the pointers */
3102 
3103  row = node->row;
3104 
3105  i = 0;
3106  list_node = node->values_list;
3107 
3108  while (list_node) {
3109  eval_exp(list_node);
3110 
3111  dfield = dtuple_get_nth_field(row, i);
3112  dfield_copy_data(dfield, que_node_get_val(list_node));
3113 
3114  i++;
3115  list_node = que_node_get_next(list_node);
3116  }
3117 }
3118 
3119 /***********************************************************/
3121 UNIV_INLINE
3122 void
3123 row_ins_get_row_from_select(
3124 /*========================*/
3125  ins_node_t* node)
3126 {
3127  que_node_t* list_node;
3128  dfield_t* dfield;
3129  dtuple_t* row;
3130  ulint i;
3131 
3132  /* The field values are copied in the buffers of the select node and
3133  it is safe to use them until we fetch from select again: therefore
3134  we can just copy the pointers */
3135 
3136  row = node->row;
3137 
3138  i = 0;
3139  list_node = node->select->select_list;
3140 
3141  while (list_node) {
3142  dfield = dtuple_get_nth_field(row, i);
3143  dfield_copy_data(dfield, que_node_get_val(list_node));
3144 
3145  i++;
3146  list_node = que_node_get_next(list_node);
3147  }
3148 }
3149 
3150 /***********************************************************/
3154 static __attribute__((nonnull, warn_unused_result))
3155 dberr_t
3156 row_ins(
3157 /*====*/
3158  ins_node_t* node,
3159  que_thr_t* thr)
3160 {
3161  dberr_t err;
3162 
3163  if (node->state == INS_NODE_ALLOC_ROW_ID) {
3164 
3165  row_ins_alloc_row_id_step(node);
3166 
3167  node->index = dict_table_get_first_index(node->table);
3168  node->entry = UT_LIST_GET_FIRST(node->entry_list);
3169 
3170  if (node->ins_type == INS_SEARCHED) {
3171 
3172  row_ins_get_row_from_select(node);
3173 
3174  } else if (node->ins_type == INS_VALUES) {
3175 
3176  row_ins_get_row_from_values(node);
3177  }
3178 
3179  node->state = INS_NODE_INSERT_ENTRIES;
3180  }
3181 
3182  ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3183 
3184  while (node->index != NULL) {
3185  if (node->index->type != DICT_FTS) {
3186  err = row_ins_index_entry_step(node, thr);
3187 
3188  if (err != DB_SUCCESS) {
3189 
3190  return(err);
3191  }
3192  }
3193 
3194  node->index = dict_table_get_next_index(node->index);
3195  node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3196 
3197  DBUG_EXECUTE_IF(
3198  "row_ins_skip_sec",
3199  node->index = NULL; node->entry = NULL; break;);
3200 
3201  /* Skip corrupted secondary index and its entry */
3202  while (node->index && dict_index_is_corrupted(node->index)) {
3203 
3204  node->index = dict_table_get_next_index(node->index);
3205  node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3206  }
3207  }
3208 
3209  ut_ad(node->entry == NULL);
3210 
3211  node->state = INS_NODE_ALLOC_ROW_ID;
3212 
3213  return(DB_SUCCESS);
3214 }
3215 
3216 /***********************************************************/
3220 UNIV_INTERN
3221 que_thr_t*
3222 row_ins_step(
3223 /*=========*/
3224  que_thr_t* thr)
3225 {
3226  ins_node_t* node;
3227  que_node_t* parent;
3228  sel_node_t* sel_node;
3229  trx_t* trx;
3230  dberr_t err;
3231 
3232  ut_ad(thr);
3233 
3234  trx = thr_get_trx(thr);
3235 
3236  trx_start_if_not_started_xa(trx);
3237 
3238  node = static_cast<ins_node_t*>(thr->run_node);
3239 
3240  ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3241 
3242  parent = que_node_get_parent(node);
3243  sel_node = node->select;
3244 
3245  if (thr->prev_node == parent) {
3246  node->state = INS_NODE_SET_IX_LOCK;
3247  }
3248 
3249  /* If this is the first time this node is executed (or when
3250  execution resumes after wait for the table IX lock), set an
3251  IX lock on the table and reset the possible select node. MySQL's
3252  partitioned table code may also call an insert within the same
3253  SQL statement AFTER it has used this table handle to do a search.
3254  This happens, for example, when a row update moves it to another
3255  partition. In that case, we have already set the IX lock on the
3256  table during the search operation, and there is no need to set
3257  it again here. But we must write trx->id to node->trx_id_buf. */
3258 
3259  trx_write_trx_id(node->trx_id_buf, trx->id);
3260 
3261  if (node->state == INS_NODE_SET_IX_LOCK) {
3262 
3263  node->state = INS_NODE_ALLOC_ROW_ID;
3264 
3265  /* It may be that the current session has not yet started
3266  its transaction, or it has been committed: */
3267 
3268  if (trx->id == node->trx_id) {
3269  /* No need to do IX-locking */
3270 
3271  goto same_trx;
3272  }
3273 
3274  err = lock_table(0, node->table, LOCK_IX, thr);
3275 
3276  DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3277  err = DB_LOCK_WAIT;);
3278 
3279  if (err != DB_SUCCESS) {
3280 
3281  goto error_handling;
3282  }
3283 
3284  node->trx_id = trx->id;
3285 same_trx:
3286  if (node->ins_type == INS_SEARCHED) {
3287  /* Reset the cursor */
3288  sel_node->state = SEL_NODE_OPEN;
3289 
3290  /* Fetch a row to insert */
3291 
3292  thr->run_node = sel_node;
3293 
3294  return(thr);
3295  }
3296  }
3297 
3298  if ((node->ins_type == INS_SEARCHED)
3299  && (sel_node->state != SEL_NODE_FETCH)) {
3300 
3301  ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3302 
3303  /* No more rows to insert */
3304  thr->run_node = parent;
3305 
3306  return(thr);
3307  }
3308 
3309  /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3310 
3311  err = row_ins(node, thr);
3312 
3313 error_handling:
3314  trx->error_state = err;
3315 
3316  if (err != DB_SUCCESS) {
3317  /* err == DB_LOCK_WAIT or SQL error detected */
3318  return(NULL);
3319  }
3320 
3321  /* DO THE TRIGGER ACTIONS HERE */
3322 
3323  if (node->ins_type == INS_SEARCHED) {
3324  /* Fetch a row to insert */
3325 
3326  thr->run_node = sel_node;
3327  } else {
3328  thr->run_node = que_node_get_parent(node);
3329  }
3330 
3331  return(thr);
3332 }