MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sql_insert.cc
1 /*
2  Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 /* Insert of records */
19 
20 /*
21  INSERT DELAYED
22 
23  Insert delayed is distinguished from a normal insert by lock_type ==
24  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
25  "delayed" table (delayed_get_table()), but falls back to
26  open_and_lock_tables() on error and proceeds as normal insert then.
27 
28  Opening a "delayed" table means to find a delayed insert thread that
29  has the table open already. If this fails, a new thread is created and
30  waited for to open and lock the table.
31 
32  If accessing the thread succeeded, in
33  Delayed_insert::get_local_table() the table of the thread is copied
34  for local use. A copy is required because the normal insert logic
35  works on a target table, but the other threads table object must not
36  be used. The insert logic uses the record buffer to create a record.
37  And the delayed insert thread uses the record buffer to pass the
38  record to the table handler. So there must be different objects. Also
39  the copied table is not included in the lock, so that the statement
40  can proceed even if the real table cannot be accessed at this moment.
41 
42  Copying a table object is not a trivial operation. Besides the TABLE
43  object there are the field pointer array, the field objects and the
44  record buffer. After copying the field objects, their pointers into
45  the record must be "moved" to point to the new record buffer.
46 
47  After this setup the normal insert logic is used. Only that for
48  delayed inserts write_delayed() is called instead of write_record().
49  It inserts the rows into a queue and signals the delayed insert thread
50  instead of writing directly to the table.
51 
52  The delayed insert thread awakes from the signal. It locks the table,
53  inserts the rows from the queue, unlocks the table, and waits for the
54  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
55 
56 */
57 
58 #include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */
59 #include "sql_priv.h"
60 #include "unireg.h" // REQUIRED: for other includes
61 #include "sql_insert.h"
62 #include "sql_update.h" // compare_record
63 #include "sql_base.h" // close_thread_tables
64 #include "sql_cache.h" // query_cache_*
65 #include "key.h" // key_copy
66 #include "lock.h" // mysql_unlock_tables
67 #include "sp_head.h"
68 #include "sql_view.h" // check_key_in_view, insert_view_fields
69 #include "sql_table.h" // mysql_create_table_no_lock
70 #include "sql_acl.h" // *_ACL, check_grant_all_columns
71 #include "sql_trigger.h"
72 #include "sql_select.h"
73 #include "sql_show.h"
74 #include "rpl_slave.h"
75 #include "sql_parse.h" // end_active_trans
76 #include "rpl_mi.h"
77 #include "transaction.h"
78 #include "sql_audit.h"
79 #include "debug_sync.h"
80 #include "opt_explain.h"
81 #include "delayable_insert_operation.h"
82 #include "sql_tmp_table.h" // tmp tables
83 #include "sql_optimizer.h" // JOIN
84 #include "global_threads.h"
85 #ifdef WITH_PARTITION_STORAGE_ENGINE
86 #include "sql_partition.h"
87 #include "partition_info.h" // partition_info
88 #endif /* WITH_PARTITION_STORAGE_ENGINE */
89 
90 #include "debug_sync.h"
91 
92 #ifndef EMBEDDED_LIBRARY
93 static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
94  TABLE_LIST *table_list);
95 
96 static bool write_delayed(THD *thd, TABLE *table, LEX_STRING query, bool log_on,
97  COPY_INFO *client_op);
98 
99 static void end_delayed_insert(THD *thd);
100 pthread_handler_t handle_delayed_insert(void *arg);
101 static void unlink_blobs(register TABLE *table);
102 #endif
103 static bool check_view_insertability(THD *thd, TABLE_LIST *view);
104 
105 /*
106  Check that insert/update fields are from the same single table of a view.
107 
108  @param fields The insert/update fields to be checked.
109  @param values The insert/update values to be checked, NULL if
110  checking is not wanted.
111  @param view The view for insert.
112  @param map [in/out] The insert table map.
113 
114  This function is called in 2 cases:
115  1. to check insert fields. In this case *map will be set to 0.
116  Insert fields are checked to be all from the same single underlying
117  table of the given view. Otherwise the error is thrown. Found table
118  map is returned in the map parameter.
119  2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
120  In this case *map contains table_map found on the previous call of
121  the function to check insert fields. Update fields are checked to be
122  from the same table as the insert fields.
123 
124  @returns false if success.
125 */
126 
127 bool check_view_single_update(List<Item> &fields, List<Item> *values,
128  TABLE_LIST *view, table_map *map)
129 {
130  /* it is join view => we need to find the table for update */
131  List_iterator_fast<Item> it(fields);
132  Item *item;
133  TABLE_LIST *tbl= 0; // reset for call to check_single_table()
134  table_map tables= 0;
135 
136  while ((item= it++))
137  tables|= item->used_tables();
138 
139  if (values)
140  {
141  it.init(*values);
142  while ((item= it++))
143  tables|= item->used_tables();
144  }
145 
146  /* Convert to real table bits */
147  tables&= ~PSEUDO_TABLE_BITS;
148 
149 
150  /* Check found map against provided map */
151  if (*map)
152  {
153  if (tables != *map)
154  goto error;
155  return FALSE;
156  }
157 
158  if (view->check_single_table(&tbl, tables, view) || tbl == 0)
159  goto error;
160 
161  view->table= tbl->table;
162  *map= tables;
163 
164  return FALSE;
165 
166 error:
167  my_error(ER_VIEW_MULTIUPDATE, MYF(0),
168  view->view_db.str, view->view_name.str);
169  return TRUE;
170 }
171 
172 
173 /*
174  Check if insert fields are correct.
175 
176  @param thd The current thread.
177  @param table_list The table we are inserting into (may be view)
178  @param fields The insert fields.
179  @param values The insert values.
180  @param check_unique If duplicate values should be rejected.
181  @param fields_and_values_from_different_maps If 'values' are allowed to
182  refer to other tables than those of 'fields'
183  @param map See check_view_single_update
184 
185  @returns 0 if success, -1 if error
186 */
187 
188 static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
189  List<Item> &fields, List<Item> &values,
190  bool check_unique,
191  bool fields_and_values_from_different_maps,
192  table_map *map)
193 {
194  TABLE *table= table_list->table;
195 
196  if (!table_list->updatable)
197  {
198  my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
199  return -1;
200  }
201 
202  if (fields.elements == 0 && values.elements != 0)
203  {
204  if (!table)
205  {
206  my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
207  table_list->view_db.str, table_list->view_name.str);
208  return -1;
209  }
210  if (values.elements != table->s->fields)
211  {
212  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
213  return -1;
214  }
215 #ifndef NO_EMBEDDED_ACCESS_CHECKS
216  Field_iterator_table_ref field_it;
217  field_it.set(table_list);
218  if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
219  return -1;
220 #endif
221  /*
222  No fields are provided so all fields must be provided in the values.
223  Thus we set all bits in the write set.
224  */
225  bitmap_set_all(table->write_set);
226  }
227  else
228  { // Part field list
229  SELECT_LEX *select_lex= &thd->lex->select_lex;
230  Name_resolution_context *context= &select_lex->context;
232  int res;
233 
234  if (fields.elements != values.elements)
235  {
236  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
237  return -1;
238  }
239 
240  thd->dup_field= 0;
241  select_lex->no_wrap_view_item= TRUE;
242 
243  /* Save the state of the current name resolution context. */
244  ctx_state.save_state(context, table_list);
245 
246  /*
247  Perform name resolution only in the first table - 'table_list',
248  which is the table that is inserted into.
249  */
250  table_list->next_local= 0;
251  context->resolve_in_table_list_only(table_list);
252  res= setup_fields(thd, Ref_ptr_array(), fields, MARK_COLUMNS_WRITE, 0, 0);
253 
254  /* Restore the current context. */
255  ctx_state.restore_state(context, table_list);
256  thd->lex->select_lex.no_wrap_view_item= FALSE;
257 
258  if (res)
259  return -1;
260 
261  if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
262  {
263  if (check_view_single_update(fields,
264  fields_and_values_from_different_maps ?
265  (List<Item>*) 0 : &values,
266  table_list, map))
267  return -1;
268  table= table_list->table;
269  }
270 
271  if (check_unique && thd->dup_field)
272  {
273  my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
274  return -1;
275  }
276  }
277  // For the values we need select_priv
278 #ifndef NO_EMBEDDED_ACCESS_CHECKS
279  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
280 #endif
281 
282  if (check_key_in_view(thd, table_list) ||
283  (table_list->view &&
284  check_view_insertability(thd, table_list)))
285  {
286  my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
287  return -1;
288  }
289 
290  return 0;
291 }
292 
293 
308 static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
309  List<Item> &update_fields,
310  List<Item> &update_values,
311  bool fields_and_values_from_different_maps,
312  table_map *map)
313 {
314  /* Check the fields we are going to modify */
315  if (setup_fields(thd, Ref_ptr_array(),
316  update_fields, MARK_COLUMNS_WRITE, 0, 0))
317  return -1;
318 
319  if (insert_table_list->effective_algorithm == VIEW_ALGORITHM_MERGE &&
320  check_view_single_update(update_fields,
321  fields_and_values_from_different_maps ?
322  (List<Item>*) 0 : &update_values,
323  insert_table_list, map))
324  return -1;
325  return 0;
326 }
327 
344 bool validate_default_values_of_unset_fields(THD *thd, TABLE *table)
345 {
346  MY_BITMAP *write_set= table->write_set;
347  DBUG_ENTER("validate_default_values_of_unset_fields");
348 
349  for (Field **field= table->field; *field; field++)
350  {
351  if (!bitmap_is_set(write_set, (*field)->field_index) &&
352  !((*field)->flags & NO_DEFAULT_VALUE_FLAG))
353  {
354  if ((*field)->validate_stored_val(thd) && thd->is_error())
355  DBUG_RETURN(true);
356  }
357  }
358 
359  DBUG_RETURN(false);
360 }
361 
362 
363 /*
364  Prepare triggers for INSERT-like statement.
365 
366  SYNOPSIS
367  prepare_triggers_for_insert_stmt()
368  table Table to which insert will happen
369 
370  NOTE
371  Prepare triggers for INSERT-like statement by marking fields
372  used by triggers and inform handlers that batching of UPDATE/DELETE
373  cannot be done if there are BEFORE UPDATE/DELETE triggers.
374 */
375 
376 void prepare_triggers_for_insert_stmt(TABLE *table)
377 {
378  if (table->triggers)
379  {
380  if (table->triggers->has_triggers(TRG_EVENT_DELETE,
381  TRG_ACTION_AFTER))
382  {
383  /*
384  The table has AFTER DELETE triggers that might access to
385  subject table and therefore might need delete to be done
386  immediately. So we turn-off the batching.
387  */
388  (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
389  }
390  if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
391  TRG_ACTION_AFTER))
392  {
393  /*
394  The table has AFTER UPDATE triggers that might access to subject
395  table and therefore might need update to be done immediately.
396  So we turn-off the batching.
397  */
398  (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
399  }
400  }
401  table->mark_columns_needed_for_insert();
402 }
403 
404 
413 static
414 void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
415  enum_duplicates duplic)
416 {
417  if (duplic == DUP_UPDATE ||
418  (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
419  {
420  *lock_type= TL_WRITE_DEFAULT;
421  return;
422  }
423 
424  if (*lock_type == TL_WRITE_DELAYED)
425  {
426  /*
427  We do not use delayed threads if:
428  - we're running in skip-new mode -- the feature is disabled
429  in this mode
430  - we're executing this statement on a replication slave --
431  we need to ensure serial execution of queries on the
432  slave
433  - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
434  insert cannot be concurrent
435  - this statement is directly or indirectly invoked from
436  a stored function or trigger (under pre-locking) - to
437  avoid deadlocks, since INSERT DELAYED involves a lock
438  upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
439  attempt while keeping other table level locks.
440  - this statement itself may require pre-locking.
441  We should upgrade the lock even though in most cases
442  delayed functionality may work. Unfortunately, we can't
443  easily identify whether the subject table is not used in
444  the statement indirectly via a stored function or trigger:
445  if it is used, that will lead to a deadlock between the
446  client connection and the delayed thread.
447  - we're running the EXPLAIN INSERT command
448  */
449  if (specialflag & SPECIAL_NO_NEW_FUNC ||
450  thd->variables.max_insert_delayed_threads == 0 ||
451  thd->locked_tables_mode > LTM_LOCK_TABLES ||
452  thd->lex->uses_stored_routines() || thd->lex->describe)
453  {
454  *lock_type= TL_WRITE;
455  return;
456  }
457  if (thd->slave_thread)
458  {
459  /* Try concurrent insert */
460  *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
461  TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
462  return;
463  }
464 
465  bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
466  if (global_system_variables.binlog_format == BINLOG_FORMAT_STMT &&
467  log_on && mysql_bin_log.is_open())
468  {
469  /*
470  Statement-based binary logging does not work in this case, because:
471  a) two concurrent statements may have their rows intermixed in the
472  queue, leading to autoincrement replication problems on slave (because
473  the values generated used for one statement don't depend only on the
474  value generated for the first row of this statement, so are not
475  replicable)
476  b) if first row of the statement has an error the full statement is
477  not binlogged, while next rows of the statement may be inserted.
478  c) if first row succeeds, statement is binlogged immediately with a
479  zero error code (i.e. "no error"), if then second row fails, query
480  will fail on slave too and slave will stop (wrongly believing that the
481  master got no error).
482  So we fallback to non-delayed INSERT.
483  Note that to be fully correct, we should test the "binlog format which
484  the delayed thread is going to use for this row". But in the common case
485  where the global binlog format is not changed and the session binlog
486  format may be changed, that is equal to the global binlog format.
487  We test it without mutex for speed reasons (condition rarely true), and
488  in the common case (global not changed) it is as good as without mutex;
489  if global value is changed, anyway there is uncertainty as the delayed
490  thread may be old and use the before-the-change value.
491  */
492  *lock_type= TL_WRITE;
493  return;
494  }
495  }
496 }
497 
498 
522 static
523 bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
524 {
525  MDL_request protection_request;
526  DBUG_ENTER("open_and_lock_for_insert_delayed");
527 
528 #ifndef EMBEDDED_LIBRARY
529  /* INSERT DELAYED is not allowed in a read only transaction. */
530  if (thd->tx_read_only)
531  {
532  my_error(ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION, MYF(0));
533  DBUG_RETURN(true);
534  }
535 
536  /*
537  In order for the deadlock detector to be able to find any deadlocks
538  caused by the handler thread waiting for GRL or this table, we acquire
539  protection against GRL (global IX metadata lock) and metadata lock on
540  table to being inserted into inside the connection thread.
541  If this goes ok, the tickets are cloned and added to the list of granted
542  locks held by the handler thread.
543  */
544  if (thd->global_read_lock.can_acquire_protection())
545  DBUG_RETURN(TRUE);
546 
547  protection_request.init(MDL_key::GLOBAL, "", "", MDL_INTENTION_EXCLUSIVE,
548  MDL_STATEMENT);
549 
550  if (thd->mdl_context.acquire_lock(&protection_request,
551  thd->variables.lock_wait_timeout))
552  DBUG_RETURN(TRUE);
553 
554  if (thd->mdl_context.acquire_lock(&table_list->mdl_request,
555  thd->variables.lock_wait_timeout))
556  /*
557  If a lock can't be acquired, it makes no sense to try normal insert.
558  Therefore we just abort the statement.
559  */
560  DBUG_RETURN(TRUE);
561 
562  bool error= FALSE;
563  if (delayed_get_table(thd, &protection_request, table_list))
564  error= TRUE;
565  else if (table_list->table)
566  {
567  /*
568  Open tables used for sub-selects or in stored functions, will also
569  cache these functions.
570  */
571  if (open_and_lock_tables(thd, table_list->next_global, TRUE, 0))
572  {
573  end_delayed_insert(thd);
574  error= TRUE;
575  }
576  else
577  {
578  /*
579  First table was not processed by open_and_lock_tables(),
580  we need to set updatability flag "by hand".
581  */
582  if (!table_list->derived && !table_list->view)
583  table_list->updatable= 1; // usual table
584  }
585  }
586 
587  /*
588  We can't release protection against GRL and metadata lock on the table
589  being inserted into here. These locks might be required, for example,
590  because this INSERT DELAYED calls functions which may try to update
591  this or another tables (updating the same table is of course illegal,
592  but such an attempt can be discovered only later during statement
593  execution).
594  */
595 
596  /*
597  Reset the ticket in case we end up having to use normal insert and
598  therefore will reopen the table and reacquire the metadata lock.
599  */
600  table_list->mdl_request.ticket= NULL;
601 
602  if (error || table_list->table)
603  DBUG_RETURN(error);
604 #endif
605  /*
606  * This is embedded library and we don't have auxiliary
607  threads OR
608  * a lock upgrade was requested inside delayed_get_table
609  because
610  - there are too many delayed insert threads OR
611  - the table has triggers.
612  Use a normal insert.
613  */
614  table_list->lock_type= TL_WRITE;
615  DBUG_RETURN(open_and_lock_tables(thd, table_list, TRUE, 0));
616 }
617 
618 
630 static int
631 create_insert_stmt_from_insert_delayed(THD *thd, String *buf)
632 {
633  /* Make a copy of thd->query() and then remove the "DELAYED" keyword */
634  if (buf->append(thd->query()) ||
635  buf->replace(thd->lex->keyword_delayed_begin_offset,
636  thd->lex->keyword_delayed_end_offset -
637  thd->lex->keyword_delayed_begin_offset, 0))
638  return 1;
639  return 0;
640 }
641 
642 
651 bool mysql_insert(THD *thd,TABLE_LIST *table_list,
652  List<Item> &fields,
653  List<List_item> &values_list,
654  List<Item> &update_fields,
655  List<Item> &update_values,
656  enum_duplicates duplic,
657  bool ignore)
658 {
659  int error, res;
660  bool err= true;
661  bool transactional_table, joins_freed= FALSE;
662  bool changed;
663  bool was_insert_delayed= (table_list->lock_type == TL_WRITE_DELAYED);
664  bool is_locked= false;
665  ulong counter = 1;
666  ulonglong id;
667  /*
668  We have three alternative syntax rules for the INSERT statement:
669  1) "INSERT (columns) VALUES ...", so non-listed columns need a default
670  2) "INSERT VALUES (), ..." so all columns need a default;
671  note that "VALUES (),(expr_1, ..., expr_n)" is not allowed, so checking
672  emptiness of the first row is enough
673  3) "INSERT VALUES (expr_1, ...), ..." so no defaults are needed; even if
674  expr_i is "DEFAULT" (in which case the column is set by
675  Item_default_value::save_in_field()).
676  */
677  const bool manage_defaults=
678  fields.elements != 0 || // 1)
679  values_list.head()->elements == 0; // 2)
680  COPY_INFO info(COPY_INFO::INSERT_OPERATION,
681  &fields,
682  manage_defaults,
683  duplic,
684  ignore);
685  COPY_INFO update(COPY_INFO::UPDATE_OPERATION, &update_fields, &update_values);
686  Name_resolution_context *context;
688 #ifndef EMBEDDED_LIBRARY
689  char *query= thd->query();
690  /*
691  log_on is about delayed inserts only.
692  By default, both logs are enabled (this won't cause problems if the server
693  runs without --log-bin).
694  */
695  bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
696 #endif
697  Item *unused_conds= 0;
698 #ifdef WITH_PARTITION_STORAGE_ENGINE
699  uint num_partitions= 0;
700  enum partition_info::enum_can_prune can_prune_partitions=
701  partition_info::PRUNE_NO;
702  MY_BITMAP used_partitions;
703  bool prune_needs_default_values;
704 #endif /* WITH_PARITITION_STORAGE_ENGINE */
705  DBUG_ENTER("mysql_insert");
706 
707  /*
708  Upgrade lock type if the requested lock is incompatible with
709  the current connection mode or table operation.
710  */
711  upgrade_lock_type(thd, &table_list->lock_type, duplic);
712 
713  /*
714  We can't write-delayed into a table locked with LOCK TABLES:
715  this will lead to a deadlock, since the delayed thread will
716  never be able to get a lock on the table. QQQ: why not
717  upgrade the lock here instead?
718  */
719  if (table_list->lock_type == TL_WRITE_DELAYED &&
720  thd->locked_tables_mode &&
721  find_locked_table(thd->open_tables, table_list->db,
722  table_list->table_name))
723  {
724  my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
725  table_list->table_name);
726  DBUG_RETURN(TRUE);
727  }
728 
729  if (table_list->lock_type == TL_WRITE_DELAYED)
730  {
731  if (open_and_lock_for_insert_delayed(thd, table_list))
732  DBUG_RETURN(TRUE);
733  is_locked= true;
734  }
735  else
736  {
737  if (open_normal_and_derived_tables(thd, table_list, 0))
738  DBUG_RETURN(true);
739  }
740 
741  const thr_lock_type lock_type= table_list->lock_type;
742 
743  THD_STAGE_INFO(thd, stage_init);
744  thd->lex->used_tables=0;
745 
746  List_iterator_fast<List_item> its(values_list);
747  List_item *values= its++;
748  const uint value_count= values->elements;
749  TABLE *table= NULL;
750  if (mysql_prepare_insert(thd, table_list, table, fields, values,
751  update_fields, update_values, duplic, &unused_conds,
752  FALSE,
753  (fields.elements || !value_count ||
754  table_list->view != 0),
755  !ignore && thd->is_strict_mode()))
756  goto exit_without_my_ok;
757 
758  /* mysql_prepare_insert set table_list->table if it was not set */
759  table= table_list->table;
760 
761  /* Must be done before can_prune_insert, due to internal initialization. */
762  if (info.add_function_default_columns(table, table->write_set))
763  goto exit_without_my_ok;
764  if (duplic == DUP_UPDATE &&
765  update.add_function_default_columns(table, table->write_set))
766  goto exit_without_my_ok;
767 
768  context= &thd->lex->select_lex.context;
769  /*
770  These three asserts test the hypothesis that the resetting of the name
771  resolution context below is not necessary at all since the list of local
772  tables for INSERT always consists of one table.
773  */
774  DBUG_ASSERT(!table_list->next_local);
775  DBUG_ASSERT(!context->table_list->next_local);
776  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
777 
778  /* Save the state of the current name resolution context. */
779  ctx_state.save_state(context, table_list);
780 
781  /*
782  Perform name resolution only in the first table - 'table_list',
783  which is the table that is inserted into.
784  */
785  table_list->next_local= 0;
786  context->resolve_in_table_list_only(table_list);
787 
788 #ifdef WITH_PARTITION_STORAGE_ENGINE
789  if (!is_locked && table->part_info)
790  {
791  if (table->part_info->can_prune_insert(thd,
792  duplic,
793  update,
794  update_fields,
795  fields,
796  !test(values->elements),
797  &can_prune_partitions,
798  &prune_needs_default_values,
799  &used_partitions))
800  goto exit_without_my_ok;
801 
802  if (can_prune_partitions != partition_info::PRUNE_NO)
803  {
804  num_partitions= table->part_info->lock_partitions.n_bits;
805  /*
806  Pruning probably possible, all partitions is unmarked for read/lock,
807  and we must now add them on row by row basis.
808 
809  Check the first INSERT value.
810  Do not fail here, since that would break MyISAM behavior of inserting
811  all rows before the failing row.
812 
813  PRUNE_DEFAULTS means the partitioning fields are only set to DEFAULT
814  values, so we only need to check the first INSERT value, since all the
815  rest will be in the same partition.
816  */
817  if (table->part_info->set_used_partition(fields,
818  *values,
819  info,
820  prune_needs_default_values,
821  &used_partitions))
822  can_prune_partitions= partition_info::PRUNE_NO;
823  }
824  }
825 #endif /* WITH_PARTITION_STORAGE_ENGINE */
826 
827  while ((values= its++))
828  {
829  counter++;
830  if (values->elements != value_count)
831  {
832  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
833  goto exit_without_my_ok;
834  }
835  if (setup_fields(thd, Ref_ptr_array(), *values, MARK_COLUMNS_READ, 0, 0))
836  goto exit_without_my_ok;
837 
838 #ifdef WITH_PARTITION_STORAGE_ENGINE
839  /*
840  To make it possible to increase concurrency on table level locking
841  engines such as MyISAM, we check pruning for each row until we will use
842  all partitions, Even if the number of rows is much higher than the
843  number of partitions.
844  TODO: Cache the calculated part_id and reuse in
845  ha_partition::write_row() if possible.
846  */
847  if (can_prune_partitions == partition_info::PRUNE_YES)
848  {
849  if (table->part_info->set_used_partition(fields,
850  *values,
851  info,
852  prune_needs_default_values,
853  &used_partitions))
854  can_prune_partitions= partition_info::PRUNE_NO;
855  if (!(counter % num_partitions))
856  {
857  /*
858  Check if we using all partitions in table after adding partition
859  for current row to the set of used partitions. Do it only from
860  time to time to avoid overhead from bitmap_is_set_all() call.
861  */
862  if (bitmap_is_set_all(&used_partitions))
863  can_prune_partitions= partition_info::PRUNE_NO;
864  }
865  }
866 #endif /* WITH_PARTITION_STORAGE_ENGINE */
867  }
868  table->auto_increment_field_not_null= false;
869  its.rewind ();
870 
871  /* Restore the current context. */
872  ctx_state.restore_state(context, table_list);
873 
874  if (thd->lex->describe)
875  {
876  /*
877  Send "No tables used" and stop execution here since
878  there is no SELECT to explain.
879  */
880 
881  err= explain_no_table(thd, "No tables used");
882  goto exit_without_my_ok;
883  }
884 
885 #ifdef WITH_PARTITION_STORAGE_ENGINE
886  if (can_prune_partitions != partition_info::PRUNE_NO)
887  {
888  /*
889  Only lock the partitions we will insert into.
890  And also only read from those partitions (duplicates etc.).
891  If explicit partition selection 'INSERT INTO t PARTITION (p1)' is used,
892  the new set of read/lock partitions is the intersection of read/lock
893  partitions and used partitions, i.e only the partitions that exists in
894  both sets will be marked for read/lock.
895  It is also safe for REPLACE, since all potentially conflicting records
896  always belong to the same partition as the one which we try to
897  insert a row. This is because ALL unique/primary keys must
898  include ALL partitioning columns.
899  */
900  bitmap_intersect(&table->part_info->read_partitions,
901  &used_partitions);
902  bitmap_intersect(&table->part_info->lock_partitions,
903  &used_partitions);
904  }
905 #endif /* WITH_PARTITION_STORAGE_ENGINE */
906 
907  /* Lock the tables now if not delayed/already locked. */
908  if (!is_locked &&
909  lock_tables(thd, table_list, thd->lex->table_count, 0))
910  DBUG_RETURN(true);
911 
912  /*
913  Count warnings for all inserts.
914  For single line insert, generate an error if try to set a NOT NULL field
915  to NULL.
916  */
917  thd->count_cuted_fields= ((values_list.elements == 1 &&
918  !ignore) ?
919  CHECK_FIELD_ERROR_FOR_NULL :
920  CHECK_FIELD_WARN);
921  thd->cuted_fields = 0L;
922  table->next_number_field=table->found_next_number_field;
923 
924 #ifdef HAVE_REPLICATION
925  if (thd->slave_thread)
926  {
927  DBUG_ASSERT(active_mi != NULL);
928  if(info.get_duplicate_handling() == DUP_UPDATE &&
929  table->next_number_field != NULL &&
930  rpl_master_has_bug(active_mi->rli, 24432, TRUE, NULL, NULL))
931  goto exit_without_my_ok;
932  }
933 #endif
934 
935  error=0;
936  THD_STAGE_INFO(thd, stage_update);
937  if (duplic == DUP_REPLACE &&
938  (!table->triggers || !table->triggers->has_delete_triggers()))
939  table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
940  if (duplic == DUP_UPDATE)
941  table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
942  /*
943  let's *try* to start bulk inserts. It won't necessary
944  start them as values_list.elements should be greater than
945  some - handler dependent - threshold.
946  We should not start bulk inserts if this statement uses
947  functions or invokes triggers since they may access
948  to the same table and therefore should not see its
949  inconsistent state created by this optimization.
950  So we call start_bulk_insert to perform nesessary checks on
951  values_list.elements, and - if nothing else - to initialize
952  the code to make the call of end_bulk_insert() below safe.
953  */
954 #ifndef EMBEDDED_LIBRARY
955  if (lock_type != TL_WRITE_DELAYED)
956 #endif /* EMBEDDED_LIBRARY */
957  {
958  if (duplic != DUP_ERROR || ignore)
959  table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
967  if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
968  table->file->ha_start_bulk_insert(values_list.elements);
969  }
970 
971  thd->abort_on_warning= (!ignore && thd->is_strict_mode());
972 
973  prepare_triggers_for_insert_stmt(table);
974 
975 
976  if (table_list->prepare_where(thd, 0, TRUE) ||
977  table_list->prepare_check_option(thd))
978  error= 1;
979 
980  while ((values= its++))
981  {
982  if (fields.elements || !value_count)
983  {
984  restore_record(table,s->default_values); // Get empty record
985 
986  /*
987  Check whether default values of the fields not specified in column list
988  are correct or not.
989  */
990  if (validate_default_values_of_unset_fields(thd, table))
991  {
992  error= 1;
993  break;
994  }
995 
996  if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
997  table->triggers,
998  TRG_EVENT_INSERT))
999  {
1000  if (values_list.elements != 1 && ! thd->is_error())
1001  {
1002  info.stats.records++;
1003  continue;
1004  }
1005  /*
1006  TODO: set thd->abort_on_warning if values_list.elements == 1
1007  and check that all items return warning in case of problem with
1008  storing field.
1009  */
1010  error=1;
1011  break;
1012  }
1013  }
1014  else
1015  {
1016  if (thd->lex->used_tables) // Column used in values()
1017  restore_record(table,s->default_values); // Get empty record
1018  else
1019  {
1020  TABLE_SHARE *share= table->s;
1021 
1022  /*
1023  Fix delete marker. No need to restore rest of record since it will
1024  be overwritten by fill_record() anyway (and fill_record() does not
1025  use default values in this case).
1026  */
1027  table->record[0][0]= share->default_values[0];
1028 
1029  /* Fix undefined null_bits. */
1030  if (share->null_bytes > 1 && share->last_null_bit_pos)
1031  {
1032  table->record[0][share->null_bytes - 1]=
1033  share->default_values[share->null_bytes - 1];
1034  }
1035  }
1036  if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
1037  table->triggers,
1038  TRG_EVENT_INSERT))
1039  {
1040  if (values_list.elements != 1 && ! thd->is_error())
1041  {
1042  info.stats.records++;
1043  continue;
1044  }
1045  error=1;
1046  break;
1047  }
1048  }
1049 
1050  if ((res= table_list->view_check_option(thd,
1051  (values_list.elements == 1 ?
1052  0 :
1053  ignore))) ==
1054  VIEW_CHECK_SKIP)
1055  continue;
1056  else if (res == VIEW_CHECK_ERROR)
1057  {
1058  error= 1;
1059  break;
1060  }
1061 #ifndef EMBEDDED_LIBRARY
1062  if (lock_type == TL_WRITE_DELAYED)
1063  {
1064  LEX_STRING const st_query = { query, thd->query_length() };
1065  DEBUG_SYNC(thd, "before_write_delayed");
1066  error= write_delayed(thd, table, st_query, log_on, &info);
1067  DEBUG_SYNC(thd, "after_write_delayed");
1068  query=0;
1069  }
1070  else
1071 #endif
1072  error= write_record(thd, table, &info, &update);
1073  if (error)
1074  break;
1075  thd->get_stmt_da()->inc_current_row_for_warning();
1076  }
1077 
1078  free_underlaid_joins(thd, &thd->lex->select_lex);
1079  joins_freed= TRUE;
1080 
1081  /*
1082  Now all rows are inserted. Time to update logs and sends response to
1083  user
1084  */
1085 #ifndef EMBEDDED_LIBRARY
1086  if (lock_type == TL_WRITE_DELAYED)
1087  {
1088  if (!error)
1089  {
1090  info.stats.copied=values_list.elements;
1091  end_delayed_insert(thd);
1092  }
1093  }
1094  else
1095 #endif
1096  {
1097  /*
1098  Do not do this release if this is a delayed insert, it would steal
1099  auto_inc values from the delayed_insert thread as they share TABLE.
1100  */
1101  table->file->ha_release_auto_increment();
1102  if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
1103  table->file->ha_end_bulk_insert() && !error)
1104  {
1105  table->file->print_error(my_errno,MYF(0));
1106  error=1;
1107  }
1108  if (duplic != DUP_ERROR || ignore)
1109  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1110 
1111  transactional_table= table->file->has_transactions();
1112 
1113  if ((changed= (info.stats.copied || info.stats.deleted || info.stats.updated)))
1114  {
1115  /*
1116  Invalidate the table in the query cache if something changed.
1117  For the transactional algorithm to work the invalidation must be
1118  before binlog writing and ha_autocommit_or_rollback
1119  */
1120  query_cache_invalidate3(thd, table_list, 1);
1121  }
1122 
1123  if (error <= 0 ||
1124  thd->transaction.stmt.cannot_safely_rollback() ||
1125  was_insert_delayed)
1126  {
1127  if (mysql_bin_log.is_open())
1128  {
1129  int errcode= 0;
1130  if (error <= 0)
1131  {
1132  /*
1133  [Guilhem wrote] Temporary errors may have filled
1134  thd->net.last_error/errno. For example if there has
1135  been a disk full error when writing the row, and it was
1136  MyISAM, then thd->net.last_error/errno will be set to
1137  "disk full"... and the mysql_file_pwrite() will wait until free
1138  space appears, and so when it finishes then the
1139  write_row() was entirely successful
1140  */
1141  /* todo: consider removing */
1142  thd->clear_error();
1143  }
1144  else
1145  errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
1146 
1147  /* bug#22725:
1148 
1149  A query which per-row-loop can not be interrupted with
1150  KILLED, like INSERT, and that does not invoke stored
1151  routines can be binlogged with neglecting the KILLED error.
1152 
1153  If there was no error (error == zero) until after the end of
1154  inserting loop the KILLED flag that appeared later can be
1155  disregarded since previously possible invocation of stored
1156  routines did not result in any error due to the KILLED. In
1157  such case the flag is ignored for constructing binlog event.
1158  */
1159  DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
1160  if (was_insert_delayed && table_list->lock_type == TL_WRITE)
1161  {
1162  /* Binlog INSERT DELAYED as INSERT without DELAYED. */
1163  String log_query;
1164  if (create_insert_stmt_from_insert_delayed(thd, &log_query))
1165  {
1166  sql_print_error("Event Error: An error occurred while creating query string"
1167  "for INSERT DELAYED stmt, before writing it into binary log.");
1168 
1169  error= 1;
1170  }
1171  else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1172  log_query.c_ptr(), log_query.length(),
1173  transactional_table, FALSE, FALSE,
1174  errcode))
1175  error= 1;
1176  }
1177  else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1178  thd->query(), thd->query_length(),
1179  transactional_table, FALSE, FALSE,
1180  errcode))
1181  error= 1;
1182  }
1183  }
1184  DBUG_ASSERT(transactional_table || !changed ||
1185  thd->transaction.stmt.cannot_safely_rollback());
1186  }
1187  THD_STAGE_INFO(thd, stage_end);
1188  /*
1189  We'll report to the client this id:
1190  - if the table contains an autoincrement column and we successfully
1191  inserted an autogenerated value, the autogenerated value.
1192  - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
1193  called, X.
1194  - if the table contains an autoincrement column, and some rows were
1195  inserted, the id of the last "inserted" row (if IGNORE, that value may not
1196  have been really inserted but ignored).
1197  */
1198  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
1199  thd->first_successful_insert_id_in_cur_stmt :
1200  (thd->arg_of_last_insert_id_function ?
1201  thd->first_successful_insert_id_in_prev_stmt :
1202  ((table->next_number_field && info.stats.copied) ?
1203  table->next_number_field->val_int() : 0));
1204  table->next_number_field=0;
1205  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
1206  table->auto_increment_field_not_null= FALSE;
1207  if (duplic == DUP_REPLACE &&
1208  (!table->triggers || !table->triggers->has_delete_triggers()))
1209  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1210 
1211  if (error)
1212  goto exit_without_my_ok;
1213  if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
1214  !thd->cuted_fields))
1215  {
1216  my_ok(thd, info.stats.copied + info.stats.deleted +
1217  ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1218  info.stats.touched : info.stats.updated),
1219  id);
1220  }
1221  else
1222  {
1223  char buff[160];
1224  ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1225  info.stats.touched : info.stats.updated);
1226  if (ignore)
1227  my_snprintf(buff, sizeof(buff),
1228  ER(ER_INSERT_INFO), (long) info.stats.records,
1229  (lock_type == TL_WRITE_DELAYED) ? (long) 0 :
1230  (long) (info.stats.records - info.stats.copied),
1231  (long) thd->get_stmt_da()->current_statement_warn_count());
1232  else
1233  my_snprintf(buff, sizeof(buff),
1234  ER(ER_INSERT_INFO), (long) info.stats.records,
1235  (long) (info.stats.deleted + updated),
1236  (long) thd->get_stmt_da()->current_statement_warn_count());
1237  my_ok(thd, info.stats.copied + info.stats.deleted + updated, id, buff);
1238  }
1239  thd->abort_on_warning= 0;
1240  DBUG_RETURN(FALSE);
1241 
1242 exit_without_my_ok:
1243 #ifndef EMBEDDED_LIBRARY
1244  if (lock_type == TL_WRITE_DELAYED)
1245  end_delayed_insert(thd);
1246 #endif
1247  if (!joins_freed)
1248  free_underlaid_joins(thd, &thd->lex->select_lex);
1249  thd->abort_on_warning= 0;
1250  DBUG_RETURN(err);
1251 }
1252 
1253 
1254 /*
1255  Additional check for insertability for VIEW
1256 
1257  SYNOPSIS
1258  check_view_insertability()
1259  thd - thread handler
1260  view - reference on VIEW
1261 
1262  IMPLEMENTATION
1263  A view is insertable if the folloings are true:
1264  - All columns in the view are columns from a table
1265  - All not used columns in table have a default values
1266  - All field in view are unique (not referring to the same column)
1267 
1268  RETURN
1269  FALSE - OK
1270  view->contain_auto_increment is 1 if and only if the view contains an
1271  auto_increment field
1272 
1273  TRUE - can't be used for insert
1274 */
1275 
1276 static bool check_view_insertability(THD * thd, TABLE_LIST *view)
1277 {
1278  uint num= view->view->select_lex.item_list.elements;
1279  TABLE *table= view->table;
1280  Field_translator *trans_start= view->field_translation,
1281  *trans_end= trans_start + num;
1282  Field_translator *trans;
1283  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
1284  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1285  MY_BITMAP used_fields;
1286  enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
1287  DBUG_ENTER("check_key_in_view");
1288 
1289  if (!used_fields_buff)
1290  DBUG_RETURN(TRUE); // EOM
1291 
1292  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);
1293 
1294  (void) bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
1295  bitmap_clear_all(&used_fields);
1296 
1297  view->contain_auto_increment= 0;
1298  /*
1299  we must not set query_id for fields as they're not
1300  really used in this context
1301  */
1302  thd->mark_used_columns= MARK_COLUMNS_NONE;
1303  /* check simplicity and prepare unique test of view */
1304  for (trans= trans_start; trans != trans_end; trans++)
1305  {
1306  if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1307  {
1308  thd->mark_used_columns= save_mark_used_columns;
1309  DBUG_RETURN(TRUE);
1310  }
1311  Item_field *field;
1312  /* simple SELECT list entry (field without expression) */
1313  if (!(field= trans->item->field_for_view_update()))
1314  {
1315  thd->mark_used_columns= save_mark_used_columns;
1316  DBUG_RETURN(TRUE);
1317  }
1318  if (field->field->unireg_check == Field::NEXT_NUMBER)
1319  view->contain_auto_increment= 1;
1320  /* prepare unique test */
1321  /*
1322  remove collation (or other transparent for update function) if we have
1323  it
1324  */
1325  trans->item= field;
1326  }
1327  thd->mark_used_columns= save_mark_used_columns;
1328  /* unique test */
1329  for (trans= trans_start; trans != trans_end; trans++)
1330  {
1331  /* Thanks to test above, we know that all columns are of type Item_field */
1332  Item_field *field= (Item_field *)trans->item;
1333  /* check fields belong to table in which we are inserting */
1334  if (field->field->table == table &&
1335  bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1336  DBUG_RETURN(TRUE);
1337  }
1338 
1339  DBUG_RETURN(FALSE);
1340 }
1341 
1342 
1343 /*
1344  Check if table can be updated
1345 
1346  SYNOPSIS
1347  mysql_prepare_insert_check_table()
1348  thd Thread handle
1349  table_list Table list
1350  fields List of fields to be updated
1351  where Pointer to where clause
1352  select_insert Check is making for SELECT ... INSERT
1353 
1354  RETURN
1355  FALSE ok
1356  TRUE ERROR
1357 */
1358 
1359 static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1360  List<Item> &fields,
1361  bool select_insert)
1362 {
1363  bool insert_into_view= (table_list->view != 0);
1364  DBUG_ENTER("mysql_prepare_insert_check_table");
1365 
1366  if (!table_list->updatable)
1367  {
1368  my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
1369  DBUG_RETURN(TRUE);
1370  }
1371  /*
1372  first table in list is the one we'll INSERT into, requires INSERT_ACL.
1373  all others require SELECT_ACL only. the ACL requirement below is for
1374  new leaves only anyway (view-constituents), so check for SELECT rather
1375  than INSERT.
1376  */
1377 
1378  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
1379  &thd->lex->select_lex.top_join_list,
1380  table_list,
1381  &thd->lex->select_lex.leaf_tables,
1382  select_insert, INSERT_ACL, SELECT_ACL))
1383  DBUG_RETURN(TRUE);
1384 
1385  if (insert_into_view && !fields.elements)
1386  {
1387  thd->lex->empty_field_list_on_rset= 1;
1388  if (!table_list->table)
1389  {
1390  my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1391  table_list->view_db.str, table_list->view_name.str);
1392  DBUG_RETURN(TRUE);
1393  }
1394  DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
1395  }
1396 
1397  DBUG_RETURN(FALSE);
1398 }
1399 
1400 
1401 /*
1402  Get extra info for tables we insert into
1403 
1404  @param table table(TABLE object) we insert into,
1405  might be NULL in case of view
1406  @param table(TABLE_LIST object) or view we insert into
1407 */
1408 
1409 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1410 {
1411  if (table)
1412  {
1413  if(table->reginfo.lock_type != TL_WRITE_DELAYED)
1414  table->prepare_for_position();
1415  return;
1416  }
1417 
1418  DBUG_ASSERT(tables->view);
1419  List_iterator<TABLE_LIST> it(*tables->view_tables);
1420  TABLE_LIST *tbl;
1421  while ((tbl= it++))
1422  prepare_for_positional_update(tbl->table, tbl);
1423 
1424  return;
1425 }
1426 
1427 
1428 /*
1429  Prepare items in INSERT statement
1430 
1431  SYNOPSIS
1432  mysql_prepare_insert()
1433  thd Thread handler
1434  table_list Global/local table list
1435  table Table to insert into (can be NULL if table should
1436  be taken from table_list->table)
1437  where Where clause (for insert ... select)
1438  select_insert TRUE if INSERT ... SELECT statement
1439  check_fields TRUE if need to check that all INSERT fields are
1440  given values.
1441  abort_on_warning whether to report if some INSERT field is not
1442  assigned as an error (TRUE) or as a warning (FALSE).
1443 
1444  TODO (in far future)
1445  In cases of:
1446  INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1447  ON DUPLICATE KEY ...
1448  we should be able to refer to sum1 in the ON DUPLICATE KEY part
1449 
1450  WARNING
1451  You MUST set table->insert_values to 0 after calling this function
1452  before releasing the table object.
1453 
1454  RETURN VALUE
1455  FALSE OK
1456  TRUE error
1457 */
1458 
1459 bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1460  TABLE *table, List<Item> &fields, List_item *values,
1461  List<Item> &update_fields, List<Item> &update_values,
1462  enum_duplicates duplic,
1463  Item **where, bool select_insert,
1464  bool check_fields, bool abort_on_warning)
1465 {
1466  SELECT_LEX *select_lex= &thd->lex->select_lex;
1467  Name_resolution_context *context= &select_lex->context;
1469  bool insert_into_view= (table_list->view != 0);
1470  bool res= 0;
1471  table_map map= 0;
1472  DBUG_ENTER("mysql_prepare_insert");
1473  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
1474  (ulong)table_list, (ulong)table,
1475  (int)insert_into_view));
1476  /* INSERT should have a SELECT or VALUES clause */
1477  DBUG_ASSERT (!select_insert || !values);
1478 
1479  /*
1480  For subqueries in VALUES() we should not see the table in which we are
1481  inserting (for INSERT ... SELECT this is done by changing table_list,
1482  because INSERT ... SELECT share SELECT_LEX it with SELECT.
1483  */
1484  if (!select_insert)
1485  {
1486  for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1487  un;
1488  un= un->next_unit())
1489  {
1490  for (SELECT_LEX *sl= un->first_select();
1491  sl;
1492  sl= sl->next_select())
1493  {
1494  sl->context.outer_context= 0;
1495  }
1496  }
1497  }
1498 
1499  if (duplic == DUP_UPDATE)
1500  {
1501  /* it should be allocated before Item::fix_fields() */
1502  if (table_list->set_insert_values(thd->mem_root))
1503  DBUG_RETURN(TRUE);
1504  }
1505 
1506  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
1507  DBUG_RETURN(TRUE);
1508 
1509 
1510  /* Prepare the fields in the statement. */
1511  if (values)
1512  {
1513  /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
1514  DBUG_ASSERT (!select_lex->group_list.elements);
1515 
1516  /* Save the state of the current name resolution context. */
1517  ctx_state.save_state(context, table_list);
1518 
1519  /*
1520  Perform name resolution only in the first table - 'table_list',
1521  which is the table that is inserted into.
1522  */
1523  table_list->next_local= 0;
1524  context->resolve_in_table_list_only(table_list);
1525 
1526  res= (setup_fields(thd, Ref_ptr_array(),
1527  *values, MARK_COLUMNS_READ, 0, 0) ||
1528  check_insert_fields(thd, context->table_list, fields, *values,
1529  !insert_into_view, 0, &map));
1530 
1531  if (!res && check_fields)
1532  {
1533  bool saved_abort_on_warning= thd->abort_on_warning;
1534  thd->abort_on_warning= abort_on_warning;
1535  res= check_that_all_fields_are_given_values(thd,
1536  table ? table :
1537  context->table_list->table,
1538  context->table_list);
1539  thd->abort_on_warning= saved_abort_on_warning;
1540  }
1541 
1542  if (!res)
1543  res= setup_fields(thd, Ref_ptr_array(),
1544  update_values, MARK_COLUMNS_READ, 0, 0);
1545 
1546  if (!res && duplic == DUP_UPDATE)
1547  {
1548  select_lex->no_wrap_view_item= TRUE;
1549  res= check_update_fields(thd, context->table_list, update_fields,
1550  update_values, false, &map);
1551  select_lex->no_wrap_view_item= FALSE;
1552  }
1553 
1554  /* Restore the current context. */
1555  ctx_state.restore_state(context, table_list);
1556  }
1557 
1558  if (res)
1559  DBUG_RETURN(res);
1560 
1561  if (!table)
1562  table= table_list->table;
1563 
1564  if (!select_insert)
1565  {
1566  Item *fake_conds= 0;
1567  TABLE_LIST *duplicate;
1568  if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
1569  {
1570  update_non_unique_table_error(table_list, "INSERT", duplicate);
1571  DBUG_RETURN(TRUE);
1572  }
1573  select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
1574  select_lex->first_execution= 0;
1575  }
1576  /*
1577  Only call prepare_for_posistion() if we are not performing a DELAYED
1578  operation. It will instead be executed by delayed insert thread.
1579  */
1580  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1581  prepare_for_positional_update(table, table_list);
1582  DBUG_RETURN(FALSE);
1583 }
1584 
1585 
1586  /* Check if there is more uniq keys after field */
1587 
1588 static int last_uniq_key(TABLE *table,uint keynr)
1589 {
1590  /*
1591  When an underlying storage engine informs that the unique key
1592  conflicts are not reported in the ascending order by setting
1593  the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
1594  information to determine the last key conflict.
1595 
1596  The information about the last key conflict will be used to
1597  do a replace of the new row on the conflicting row, rather
1598  than doing a delete (of old row) + insert (of new row).
1599 
1600  Hence check for this flag and disable replacing the last row
1601  by returning 0 always. Returning 0 will result in doing
1602  a delete + insert always.
1603  */
1604  if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
1605  return 0;
1606 
1607  while (++keynr < table->s->keys)
1608  if (table->key_info[keynr].flags & HA_NOSAME)
1609  return 0;
1610  return 1;
1611 }
1612 
1613 
1643 int write_record(THD *thd, TABLE *table, COPY_INFO *info, COPY_INFO *update)
1644 {
1645  int error, trg_error= 0;
1646  char *key=0;
1647  MY_BITMAP *save_read_set, *save_write_set;
1648  ulonglong prev_insert_id= table->file->next_insert_id;
1649  ulonglong insert_id_for_cur_row= 0;
1650  DBUG_ENTER("write_record");
1651 
1652  info->stats.records++;
1653  save_read_set= table->read_set;
1654  save_write_set= table->write_set;
1655 
1656  info->set_function_defaults(table);
1657 
1658  const enum_duplicates duplicate_handling= info->get_duplicate_handling();
1659  const bool ignore_errors= info->get_ignore_errors();
1660 
1661  if (duplicate_handling == DUP_REPLACE || duplicate_handling == DUP_UPDATE)
1662  {
1663  DBUG_ASSERT(duplicate_handling != DUP_UPDATE || update != NULL);
1664  while ((error=table->file->ha_write_row(table->record[0])))
1665  {
1666  uint key_nr;
1667  /*
1668  If we do more than one iteration of this loop, from the second one the
1669  row will have an explicit value in the autoinc field, which was set at
1670  the first call of handler::update_auto_increment(). So we must save
1671  the autogenerated value to avoid thd->insert_id_for_cur_row to become
1672  0.
1673  */
1674  if (table->file->insert_id_for_cur_row > 0)
1675  insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1676  else
1677  table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1678  bool is_duplicate_key_error;
1679  if (table->file->is_fatal_error(error, HA_CHECK_DUP))
1680  goto err;
1681  is_duplicate_key_error= table->file->is_fatal_error(error, 0);
1682  if (!is_duplicate_key_error)
1683  {
1684  /*
1685  We come here when we had an ignorable error which is not a duplicate
1686  key error. In this we ignore error if ignore flag is set, otherwise
1687  report error as usual. We will not do any duplicate key processing.
1688  */
1689  if (ignore_errors)
1690  goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1691  goto err;
1692  }
1693  if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
1694  {
1695  error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
1696  goto err;
1697  }
1698  DEBUG_SYNC(thd, "write_row_replace");
1699 
1700  /* Read all columns for the row we are going to replace */
1701  table->use_all_columns();
1702  /*
1703  Don't allow REPLACE to replace a row when a auto_increment column
1704  was used. This ensures that we don't get a problem when the
1705  whole range of the key has been used.
1706  */
1707  if (duplicate_handling == DUP_REPLACE &&
1708  table->next_number_field &&
1709  key_nr == table->s->next_number_index &&
1710  (insert_id_for_cur_row > 0))
1711  goto err;
1712  if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1713  {
1714  if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
1715  goto err;
1716  }
1717  else
1718  {
1719  if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1720  {
1721  error=my_errno;
1722  goto err;
1723  }
1724 
1725  if (!key)
1726  {
1727  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
1728  MAX_KEY_LENGTH)))
1729  {
1730  error=ENOMEM;
1731  goto err;
1732  }
1733  }
1734  key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1735  if ((error=(table->file->ha_index_read_idx_map(table->record[1],key_nr,
1736  (uchar*) key, HA_WHOLE_KEY,
1737  HA_READ_KEY_EXACT))))
1738  goto err;
1739  }
1740  if (duplicate_handling == DUP_UPDATE)
1741  {
1742  int res= 0;
1743  /*
1744  We don't check for other UNIQUE keys - the first row
1745  that matches, is updated. If update causes a conflict again,
1746  an error is returned
1747  */
1748  DBUG_ASSERT(table->insert_values != NULL);
1749  store_record(table,insert_values);
1750  restore_record(table,record[1]);
1751  DBUG_ASSERT(update->get_changed_columns()->elements ==
1752  update->update_values->elements);
1753  if (fill_record_n_invoke_before_triggers(thd,
1754  *update->get_changed_columns(),
1755  *update->update_values,
1756  ignore_errors,
1757  table->triggers,
1758  TRG_EVENT_UPDATE))
1759  goto before_trg_err;
1760 
1761  bool insert_id_consumed= false;
1762  if (// UPDATE clause specifies a value for the auto increment field
1763  table->auto_increment_field_not_null &&
1764  // An auto increment value has been generated for this row
1765  (insert_id_for_cur_row > 0))
1766  {
1767  // After-update value:
1768  const ulonglong auto_incr_val= table->next_number_field->val_int();
1769  if (auto_incr_val == insert_id_for_cur_row)
1770  {
1771  // UPDATE wants to use the generated value
1772  insert_id_consumed= true;
1773  }
1774  else if (table->file->auto_inc_interval_for_cur_row.
1775  in_range(auto_incr_val))
1776  {
1777  /*
1778  UPDATE wants to use one auto generated value which we have already
1779  reserved for another (previous or following) row. That may cause
1780  a duplicate key error if we later try to insert the reserved
1781  value. Such conflicts on auto generated values would be strange
1782  behavior, so we return a clear error now.
1783  */
1784  my_error(ER_AUTO_INCREMENT_CONFLICT, MYF(0));
1785  goto before_trg_err;
1786  }
1787  }
1788 
1789  if (!insert_id_consumed)
1790  table->file->restore_auto_increment(prev_insert_id);
1791 
1792  /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1793  {
1794  const TABLE_LIST *inserted_view=
1795  table->pos_in_table_list->belong_to_view;
1796  if (inserted_view != NULL)
1797  {
1798  res= inserted_view->view_check_option(thd, ignore_errors);
1799  if (res == VIEW_CHECK_SKIP)
1800  goto ok_or_after_trg_err;
1801  if (res == VIEW_CHECK_ERROR)
1802  goto before_trg_err;
1803  }
1804  }
1805 
1806  info->stats.touched++;
1807  if (!records_are_comparable(table) || compare_records(table))
1808  {
1809  // Handle the INSERT ON DUPLICATE KEY UPDATE operation
1810  update->set_function_defaults(table);
1811 
1812  if ((error=table->file->ha_update_row(table->record[1],
1813  table->record[0])) &&
1814  error != HA_ERR_RECORD_IS_THE_SAME)
1815  {
1816  if (ignore_errors &&
1817  !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1818  {
1819  goto ok_or_after_trg_err;
1820  }
1821  goto err;
1822  }
1823 
1824  if (error != HA_ERR_RECORD_IS_THE_SAME)
1825  info->stats.updated++;
1826  else
1827  error= 0;
1828  /*
1829  If ON DUP KEY UPDATE updates a row instead of inserting one, it's
1830  like a regular UPDATE statement: it should not affect the value of a
1831  next SELECT LAST_INSERT_ID() or mysql_insert_id().
1832  Except if LAST_INSERT_ID(#) was in the INSERT query, which is
1833  handled separately by THD::arg_of_last_insert_id_function.
1834  */
1835  insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1836  trg_error= (table->triggers &&
1837  table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
1838  TRG_ACTION_AFTER, TRUE));
1839  info->stats.copied++;
1840  }
1841 
1842  goto ok_or_after_trg_err;
1843  }
1844  else /* DUP_REPLACE */
1845  {
1846  /*
1847  The manual defines the REPLACE semantics that it is either
1848  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
1849  InnoDB do not function in the defined way if we allow MySQL
1850  to convert the latter operation internally to an UPDATE.
1851  We also should not perform this conversion if we have
1852  timestamp field with ON UPDATE which is different from DEFAULT.
1853  Another case when conversion should not be performed is when
1854  we have ON DELETE trigger on table so user may notice that
1855  we cheat here. Note that it is ok to do such conversion for
1856  tables which have ON UPDATE but have no ON DELETE triggers,
1857  we just should not expose this fact to users by invoking
1858  ON UPDATE triggers.
1859  */
1860  if (last_uniq_key(table,key_nr) &&
1861  !table->file->referenced_by_foreign_key() &&
1862  (!table->triggers || !table->triggers->has_delete_triggers()))
1863  {
1864  if ((error=table->file->ha_update_row(table->record[1],
1865  table->record[0])) &&
1866  error != HA_ERR_RECORD_IS_THE_SAME)
1867  goto err;
1868  if (error != HA_ERR_RECORD_IS_THE_SAME)
1869  info->stats.deleted++;
1870  else
1871  error= 0;
1872  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1873  /*
1874  Since we pretend that we have done insert we should call
1875  its after triggers.
1876  */
1877  goto after_trg_n_copied_inc;
1878  }
1879  else
1880  {
1881  if (table->triggers &&
1882  table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1883  TRG_ACTION_BEFORE, TRUE))
1884  goto before_trg_err;
1885  if ((error=table->file->ha_delete_row(table->record[1])))
1886  goto err;
1887  info->stats.deleted++;
1888  if (!table->file->has_transactions())
1889  thd->transaction.stmt.mark_modified_non_trans_table();
1890  if (table->triggers &&
1891  table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1892  TRG_ACTION_AFTER, TRUE))
1893  {
1894  trg_error= 1;
1895  goto ok_or_after_trg_err;
1896  }
1897  /* Let us attempt do write_row() once more */
1898  }
1899  }
1900  }
1901 
1902  /*
1903  If more than one iteration of the above while loop is done, from the second
1904  one the row being inserted will have an explicit value in the autoinc field,
1905  which was set at the first call of handler::update_auto_increment(). This
1906  value is saved to avoid thd->insert_id_for_cur_row becoming 0. Use this saved
1907  autoinc value.
1908  */
1909  if (table->file->insert_id_for_cur_row == 0)
1910  table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1911 
1912  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1913  /*
1914  Restore column maps if they where replaced during an duplicate key
1915  problem.
1916  */
1917  if (table->read_set != save_read_set ||
1918  table->write_set != save_write_set)
1919  table->column_bitmaps_set(save_read_set, save_write_set);
1920  }
1921  else if ((error=table->file->ha_write_row(table->record[0])))
1922  {
1923  DEBUG_SYNC(thd, "write_row_noreplace");
1924  if (!ignore_errors ||
1925  table->file->is_fatal_error(error, HA_CHECK_DUP))
1926  goto err;
1927  table->file->restore_auto_increment(prev_insert_id);
1928  goto ok_or_after_trg_err;
1929  }
1930 
1931 after_trg_n_copied_inc:
1932  info->stats.copied++;
1933  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1934  trg_error= (table->triggers &&
1935  table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
1936  TRG_ACTION_AFTER, TRUE));
1937 
1938 ok_or_after_trg_err:
1939  if (key)
1940  my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1941  if (!table->file->has_transactions())
1942  thd->transaction.stmt.mark_modified_non_trans_table();
1943  DBUG_RETURN(trg_error);
1944 
1945 err:
1946  info->last_errno= error;
1947  /* current_select is NULL if this is a delayed insert */
1948  if (thd->lex->current_select)
1949  thd->lex->current_select->no_error= 0; // Give error
1950  table->file->print_error(error,MYF(0));
1951 
1952 before_trg_err:
1953  table->file->restore_auto_increment(prev_insert_id);
1954  if (key)
1955  my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1956  table->column_bitmaps_set(save_read_set, save_write_set);
1957  DBUG_RETURN(1);
1958 }
1959 
1960 
1961 /******************************************************************************
1962  Check that all fields with arn't null_fields are used
1963 ******************************************************************************/
1964 
1965 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
1966  TABLE_LIST *table_list)
1967 {
1968  int err= 0;
1969  MY_BITMAP *write_set= entry->write_set;
1970 
1971  for (Field **field=entry->field ; *field ; field++)
1972  {
1973  if (!bitmap_is_set(write_set, (*field)->field_index) &&
1974  ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1975  ((*field)->real_type() != MYSQL_TYPE_ENUM))
1976  {
1977  bool view= FALSE;
1978  if (table_list)
1979  {
1980  table_list= table_list->top_table();
1981  view= test(table_list->view);
1982  }
1983  if (view)
1984  {
1985  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
1986  ER_NO_DEFAULT_FOR_VIEW_FIELD,
1987  ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
1988  table_list->view_db.str,
1989  table_list->view_name.str);
1990  }
1991  else
1992  {
1993  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
1994  ER_NO_DEFAULT_FOR_FIELD,
1995  ER(ER_NO_DEFAULT_FOR_FIELD),
1996  (*field)->field_name);
1997  }
1998  err= 1;
1999  }
2000  }
2001  return thd->abort_on_warning ? err : 0;
2002 }
2003 
2004 /*****************************************************************************
2005  Handling of delayed inserts
2006  A thread is created for each table that one uses with the DELAYED attribute.
2007 *****************************************************************************/
2008 
2009 #ifndef EMBEDDED_LIBRARY
2010 
2011 
2020 class delayed_row :public ilink<delayed_row> {
2021 public:
2022  char *record;
2023  enum_duplicates dup;
2024  time_t start_time;
2025  sql_mode_t sql_mode;
2026  bool auto_increment_field_not_null;
2027  bool query_start_used, ignore, log_query, binlog_rows_query_log_events;
2028  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2029  MY_BITMAP write_set;
2030  ulonglong first_successful_insert_id_in_prev_stmt;
2031  ulonglong forced_insert_id;
2032  ulong auto_increment_increment;
2033  ulong auto_increment_offset;
2034  LEX_STRING query;
2035  Time_zone *time_zone;
2036 
2044  delayed_row(LEX_STRING const query_arg, const COPY_INFO *insert_operation,
2045  bool log_query_arg)
2046  : record(NULL),
2047  dup(insert_operation->get_duplicate_handling()),
2048  ignore(insert_operation->get_ignore_errors()),
2049  log_query(log_query_arg),
2050  binlog_rows_query_log_events(false),
2051  forced_insert_id(0),
2052  query(query_arg),
2053  time_zone(0)
2054  {
2055  memset(&write_set, 0, sizeof(write_set));
2056  }
2057 
2074  bool copy_context(THD *thd, TABLE *client_table, TABLE *local_table);
2075 
2076  ~delayed_row()
2077  {
2078  my_free(query.str);
2079  my_free(record);
2080  bitmap_free(&write_set);
2081  }
2082 };
2083 
2084 
2085 bool delayed_row::copy_context(THD *thd, TABLE *client_table,
2086  TABLE *local_table)
2087 {
2088  if (!(record= (char*) my_malloc(client_table->s->reclength, MYF(MY_WME))))
2089  return true;
2090 
2091  memcpy(record, client_table->record[0], client_table->s->reclength);
2092  start_time= thd->start_time.tv_sec;
2093  query_start_used= thd->query_start_used;
2094 
2095  /*
2096  those are for the binlog: LAST_INSERT_ID() has been evaluated at this
2097  time, so record does not need it, but statement-based binlogging of the
2098  INSERT will need when the row is actually inserted.
2099  As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
2100  */
2101  stmt_depends_on_first_successful_insert_id_in_prev_stmt=
2102  thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2103  first_successful_insert_id_in_prev_stmt=
2104  thd->first_successful_insert_id_in_prev_stmt;
2105 
2106  /*
2107  Add session variable timezone Time_zone object will not be freed even the
2108  thread is ended. So we can get time_zone object from thread which
2109  handling delayed statement. See the comment of my_tz_find() for detail.
2110  */
2111  if (thd->time_zone_used)
2112  {
2113  time_zone= thd->variables.time_zone;
2114  }
2115  else
2116  {
2117  time_zone= NULL;
2118  }
2119  /* Copy session variables. */
2120  auto_increment_increment= thd->variables.auto_increment_increment;
2121  auto_increment_offset= thd->variables.auto_increment_offset;
2122  sql_mode= thd->variables.sql_mode;
2123  auto_increment_field_not_null= client_table->auto_increment_field_not_null;
2124  binlog_rows_query_log_events= thd->variables.binlog_rows_query_log_events;
2125 
2126  /* Copy the next forced auto increment value, if any. */
2127  const Discrete_interval *forced_auto_inc=
2128  thd->auto_inc_intervals_forced.get_next();
2129  if (forced_auto_inc != NULL)
2130  {
2131  forced_insert_id= forced_auto_inc->minimum();
2132  DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
2133  (ulong) forced_insert_id));
2134  }
2135 
2136  /*
2137  Since insert delayed has its own thread and table, we
2138  need to copy the user thread session write_set.
2139  */
2140  my_bitmap_map *bitmaps=
2141  (my_bitmap_map*)
2142  my_malloc(bitmap_buffer_size(client_table->write_set->n_bits), MYF(0));
2143 
2144  if (bitmaps == NULL)
2145  return true;
2146 
2147  bitmap_init(&write_set, bitmaps, client_table->write_set->n_bits, false);
2148  bitmap_union(&write_set, client_table->write_set);
2149 
2150  return false;
2151 }
2152 
2153 
2162 class Delayed_insert :public ilink<Delayed_insert> {
2163  uint locks_in_memory;
2164  thr_lock_type delayed_lock;
2165 public:
2166  THD thd;
2167  TABLE *table;
2168  mysql_mutex_t mutex;
2169  mysql_cond_t cond, cond_client;
2170  volatile uint tables_in_use,stacked_inserts;
2171  volatile bool status;
2184 
2187 
2188  I_List<delayed_row> rows;
2189  ulong group_count;
2190  TABLE_LIST table_list; // Argument
2196 
2199  :locks_in_memory(0), table(0),tables_in_use(0),stacked_inserts(0),
2200  status(0), handler_thread_initialized(FALSE), group_count(0)
2201  {
2202  DBUG_ENTER("Delayed_insert constructor");
2203  thd.security_ctx->user=(char*) delayed_user;
2204  thd.security_ctx->set_host(my_localhost);
2205  strmake(thd.security_ctx->priv_user, thd.security_ctx->user,
2206  USERNAME_LENGTH);
2207  thd.current_tablenr=0;
2208  thd.set_command(COM_DELAYED_INSERT);
2209  thd.lex->current_select= 0; // for my_message_sql
2210  thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
2211 
2212  /*
2213  Prevent changes to global.lock_wait_timeout from affecting
2214  delayed insert threads as any timeouts in delayed inserts
2215  are not communicated to the client.
2216  */
2217  thd.variables.lock_wait_timeout= LONG_TIMEOUT;
2218 
2219  memset(&thd.net, 0, sizeof(thd.net)); // Safety
2220  memset(&table_list, 0, sizeof(table_list)); // Safety
2221  thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
2222  thd.security_ctx->host_or_ip= "";
2223  mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
2224  mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
2225  mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
2226  mysql_mutex_lock(&LOCK_thread_count);
2227  delayed_insert_threads++;
2228  delayed_lock= global_system_variables.low_priority_updates ?
2229  TL_WRITE_LOW_PRIORITY : TL_WRITE;
2230  mysql_mutex_unlock(&LOCK_thread_count);
2231  DBUG_VOID_RETURN;
2232  }
2233  ~Delayed_insert()
2234  {
2235  /* The following is not really needed, but just for safety */
2236  delayed_row *row;
2237  while ((row=rows.get()))
2238  delete row;
2239  if (table)
2240  {
2241  close_thread_tables(&thd);
2242  thd.mdl_context.release_transactional_locks();
2243  }
2244  thd.release_resources();
2245  mysql_mutex_lock(&LOCK_thread_count);
2246  mysql_mutex_destroy(&mutex);
2247  mysql_cond_destroy(&cond);
2248  mysql_cond_destroy(&cond_client);
2249  remove_global_thread(&thd); // Must be removed under lock
2250  my_free(table_list.table_name);
2251  thd.security_ctx->set_host("");
2252  thd.security_ctx->user= 0;
2253  delayed_insert_threads--;
2254  mysql_mutex_unlock(&LOCK_thread_count);
2255  }
2256 
2257  /* The following is for checking when we can delete ourselves */
2258  inline void lock()
2259  {
2260  locks_in_memory++; // Assume LOCK_delay_insert
2261  }
2262  void unlock()
2263  {
2264  mysql_mutex_lock(&LOCK_delayed_insert);
2265  if (!--locks_in_memory)
2266  {
2267  mysql_mutex_lock(&mutex);
2268  if (thd.killed && ! stacked_inserts && ! tables_in_use)
2269  {
2270  mysql_cond_signal(&cond);
2271  status=1;
2272  }
2273  mysql_mutex_unlock(&mutex);
2274  }
2275  mysql_mutex_unlock(&LOCK_delayed_insert);
2276  }
2277  inline uint lock_count() { return locks_in_memory; }
2278 
2279  TABLE* get_local_table(THD* client_thd);
2280  bool open_and_lock_table();
2281  bool handle_inserts(void);
2282 };
2283 
2284 
2285 I_List<Delayed_insert> delayed_threads;
2286 
2287 
2293 static
2294 Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
2295 {
2296  THD_STAGE_INFO(thd, stage_waiting_for_delay_list);
2297  mysql_mutex_lock(&LOCK_delayed_insert); // Protect master list
2298  I_List_iterator<Delayed_insert> it(delayed_threads);
2299  Delayed_insert *di;
2300  while ((di= it++))
2301  {
2302  if (!strcmp(table_list->db, di->table_list.db) &&
2303  !strcmp(table_list->table_name, di->table_list.table_name))
2304  {
2305  di->lock();
2306  break;
2307  }
2308  }
2309  mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2310  return di;
2311 }
2312 
2313 
2363 static
2364 bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
2365  TABLE_LIST *table_list)
2366 {
2367  int error;
2368  Delayed_insert *di;
2369  DBUG_ENTER("delayed_get_table");
2370 
2371  /* Must be set in the parser */
2372  DBUG_ASSERT(table_list->db);
2373 
2374  /* Find the thread which handles this table. */
2375  if (!(di= find_handler(thd, table_list)))
2376  {
2377  /*
2378  No match. Create a new thread to handle the table, but
2379  no more than max_insert_delayed_threads.
2380  */
2381  if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
2382  DBUG_RETURN(0);
2383  THD_STAGE_INFO(thd, stage_creating_delayed_handler);
2384  mysql_mutex_lock(&LOCK_delayed_create);
2385  /*
2386  The first search above was done without LOCK_delayed_create.
2387  Another thread might have created the handler in between. Search again.
2388  */
2389  if (! (di= find_handler(thd, table_list)))
2390  {
2391  if (!(di= new Delayed_insert()))
2392  goto end_create;
2393  di->table_list= *table_list; // Needed to open table
2394  /* Replace volatile strings with local copies */
2395  di->thd.set_db(table_list->db, (uint) strlen(table_list->db));
2396  di->table_list.alias= di->table_list.table_name=
2397  my_strdup(table_list->table_name, MYF(MY_WME | ME_FATALERROR));
2398  di->table_list.db= di->thd.db;
2399  di->thd.set_query(di->table_list.table_name, 0, system_charset_info);
2400  if (di->thd.db == NULL || di->thd.query() == NULL)
2401  {
2402  /* The error is reported */
2403  delete di;
2404  goto end_create;
2405  }
2406  /* We need the tickets so that they can be cloned in handle_delayed_insert */
2407  di->grl_protection.init(MDL_key::GLOBAL, "", "",
2408  MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT);
2409  di->grl_protection.ticket= grl_protection_request->ticket;
2410  init_mdl_requests(&di->table_list);
2411  di->table_list.mdl_request.ticket= table_list->mdl_request.ticket;
2412 
2413  di->lock();
2414  mysql_mutex_lock(&di->mutex);
2415  if ((error= mysql_thread_create(key_thread_delayed_insert,
2416  &di->thd.real_id, &connection_attrib,
2417  handle_delayed_insert, (void*) di)))
2418  {
2419  DBUG_PRINT("error",
2420  ("Can't create thread to handle delayed insert (error %d)",
2421  error));
2422  mysql_mutex_unlock(&di->mutex);
2423  di->unlock();
2424  delete di;
2425  my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATALERROR), error);
2426  goto end_create;
2427  }
2428 
2429  /*
2430  Wait until table is open unless the handler thread or the connection
2431  thread has been killed. Note that we in all cases must wait until the
2432  handler thread has been properly initialized before exiting. Otherwise
2433  we risk doing clone_ticket() on a ticket that is no longer valid.
2434  */
2435  THD_STAGE_INFO(thd, stage_waiting_for_handler_open);
2436  while (!di->handler_thread_initialized ||
2437  (!di->thd.killed && !di->table && !thd->killed))
2438  {
2439  mysql_cond_wait(&di->cond_client, &di->mutex);
2440  }
2441  mysql_mutex_unlock(&di->mutex);
2442  THD_STAGE_INFO(thd, stage_got_old_table);
2443  if (thd->killed)
2444  {
2445  di->unlock();
2446  goto end_create;
2447  }
2448  if (di->thd.killed)
2449  {
2450  if (di->thd.is_error())
2451  {
2452  /*
2453  Copy the error message. Note that we don't treat fatal
2454  errors in the delayed thread as fatal errors in the
2455  main thread. If delayed thread was killed, we don't
2456  want to send "Server shutdown in progress" in the
2457  INSERT THREAD.
2458  */
2459  if (di->thd.get_stmt_da()->sql_errno() == ER_SERVER_SHUTDOWN)
2460  my_message(ER_QUERY_INTERRUPTED, ER(ER_QUERY_INTERRUPTED), MYF(0));
2461  else
2462  my_message(di->thd.get_stmt_da()->sql_errno(),
2463  di->thd.get_stmt_da()->message(),
2464  MYF(0));
2465  }
2466  di->unlock();
2467  goto end_create;
2468  }
2469  mysql_mutex_lock(&LOCK_delayed_insert);
2470  delayed_threads.push_front(di);
2471  mysql_mutex_unlock(&LOCK_delayed_insert);
2472  }
2473  mysql_mutex_unlock(&LOCK_delayed_create);
2474  }
2475 
2476  mysql_mutex_lock(&di->mutex);
2477  table_list->table= di->get_local_table(thd);
2478  mysql_mutex_unlock(&di->mutex);
2479  if (table_list->table)
2480  {
2481  DBUG_ASSERT(! thd->is_error());
2482  thd->di= di;
2483  }
2484  /* Unlock the delayed insert object after its last access. */
2485  di->unlock();
2486  DBUG_RETURN((table_list->table == NULL));
2487 
2488 end_create:
2489  mysql_mutex_unlock(&LOCK_delayed_create);
2490  DBUG_RETURN(thd->is_error());
2491 }
2492 
2493 
2511 {
2512  my_ptrdiff_t adjust_ptrs;
2513  Field **field,**org_field, *found_next_number_field;
2514  TABLE *copy;
2515  TABLE_SHARE *share;
2516  uchar *bitmap;
2517  DBUG_ENTER("Delayed_insert::get_local_table");
2518 
2519  /* First request insert thread to get a lock */
2520  status=1;
2521  tables_in_use++;
2522  if (!thd.lock) // Table is not locked
2523  {
2524  THD_STAGE_INFO(client_thd, stage_waiting_for_handler_lock);
2525  mysql_cond_signal(&cond); // Tell handler to lock table
2526  while (!thd.killed && !thd.lock && ! client_thd->killed)
2527  {
2528  mysql_cond_wait(&cond_client, &mutex);
2529  }
2530  THD_STAGE_INFO(client_thd, stage_got_handler_lock);
2531  if (client_thd->killed)
2532  goto error;
2533  if (thd.killed)
2534  {
2535  /*
2536  Copy the error message. Note that we don't treat fatal
2537  errors in the delayed thread as fatal errors in the
2538  main thread. If delayed thread was killed, we don't
2539  want to send "Server shutdown in progress" in the
2540  INSERT THREAD.
2541 
2542  The thread could be killed with an error message if
2543  di->handle_inserts() or di->open_and_lock_table() fails.
2544  The thread could be killed without an error message if
2545  killed using THD::notify_shared_lock() or
2546  kill_delayed_threads_for_table().
2547  */
2548  if (!thd.is_error() ||
2549  thd.get_stmt_da()->sql_errno() == ER_SERVER_SHUTDOWN)
2550  my_message(ER_QUERY_INTERRUPTED, ER(ER_QUERY_INTERRUPTED), MYF(0));
2551  else
2552  my_message(thd.get_stmt_da()->sql_errno(),
2553  thd.get_stmt_da()->message(), MYF(0));
2554  goto error;
2555  }
2556  }
2557  share= table->s;
2558 
2559  /*
2560  Allocate memory for the TABLE object, the field pointers array, and
2561  one record buffer of reclength size. Normally a table has three
2562  record buffers of rec_buff_length size, which includes alignment
2563  bytes. Since the table copy is used for creating one record only,
2564  the other record buffers and alignment are unnecessary.
2565  */
2566  THD_STAGE_INFO(client_thd, stage_allocating_local_table);
2567  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
2568  (share->fields+1)*sizeof(Field**)+
2569  share->reclength +
2570  share->column_bitmap_size*2);
2571  if (!copy)
2572  goto error;
2573 
2574  /* Copy the TABLE object. */
2575  *copy= *table;
2576  /* We don't need to change the file handler here */
2577  /* Assign the pointers for the field pointers array and the record. */
2578  field= copy->field= (Field**) (copy + 1);
2579  bitmap= (uchar*) (field + share->fields + 1);
2580  copy->record[0]= (bitmap + share->column_bitmap_size * 2);
2581  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2582  /*
2583  Make a copy of all fields.
2584  The copied fields need to point into the copied record. This is done
2585  by copying the field objects with their old pointer values and then
2586  "move" the pointers by the distance between the original and copied
2587  records. That way we preserve the relative positions in the records.
2588  */
2589  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
2590  found_next_number_field= table->found_next_number_field;
2591  for (org_field= table->field; *org_field; org_field++, field++)
2592  {
2593  if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
2594  goto error;
2595  (*field)->orig_table= copy; // Remove connection
2596  (*field)->move_field_offset(adjust_ptrs); // Point at copy->record[0]
2597  if (*org_field == found_next_number_field)
2598  (*field)->table->found_next_number_field= *field;
2599 
2600  /*
2601  The Field::new_field() method does not transfer unireg_check values to
2602  the new Field object, and function defaults needed to be copied
2603  here. Hence this must be done manually.
2604  */
2605  if ((*org_field)->has_insert_default_function() ||
2606  (*org_field)->has_update_default_function())
2607  (*field)->unireg_check= (*org_field)->unireg_check;
2608  }
2609  *field=0;
2610 
2611  /* Adjust in_use for pointing to client thread */
2612  copy->in_use= client_thd;
2613  /* Adjust lock_count. This table object is not part of a lock. */
2614  copy->lock_count= 0;
2615 
2616  /* Adjust bitmaps */
2617  bitmap_init(&copy->def_read_set,
2618  reinterpret_cast<my_bitmap_map*>(bitmap),
2619  table->def_read_set.n_bits,
2620  false);
2621  bitmap_init(&copy->def_write_set,
2622  reinterpret_cast<my_bitmap_map*>
2623  (bitmap + share->column_bitmap_size),
2624  table->def_write_set.n_bits,
2625  false);
2626  copy->tmp_set.bitmap= 0; // To catch errors
2627  copy->read_set= &copy->def_read_set;
2628  copy->write_set= &copy->def_write_set;
2629 
2630  DBUG_RETURN(copy);
2631 
2632  /* Got fatal error */
2633  error:
2634  tables_in_use--;
2635  status=1;
2636  mysql_cond_signal(&cond); // Inform thread about abort
2637  DBUG_RETURN(0);
2638 }
2639 
2640 
2654 static bool write_delayed(THD *thd, TABLE *table, LEX_STRING query, bool log_on,
2655  COPY_INFO *client_op)
2656 {
2657  delayed_row *row= 0;
2658  Delayed_insert *di=thd->di;
2659  DBUG_ENTER("write_delayed");
2660  DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
2661  (ulong) query.length));
2662 
2663  THD_STAGE_INFO(thd, stage_waiting_for_handler_insert);
2664  mysql_mutex_lock(&di->mutex);
2665  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
2666  mysql_cond_wait(&di->cond_client, &di->mutex);
2667  THD_STAGE_INFO(thd, stage_storing_row_into_queue);
2668 
2669  if (thd->killed)
2670  goto err;
2671 
2672  /*
2673  Take a copy of the query string, if there is any. The string will
2674  be free'ed when the row is destroyed. If there is no query string,
2675  we don't do anything special.
2676  */
2677 
2678  if (query.str)
2679  {
2680  char *str;
2681  if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
2682  goto err;
2683  query.str= str;
2684  }
2685 
2686  client_op->set_function_defaults(table);
2687  row= new delayed_row(query, client_op, log_on);
2688  if (row->copy_context(thd, table, di->table))
2689  goto err;
2690 
2691  di->rows.push_back(row);
2692  di->stacked_inserts++;
2693  di->status=1;
2694  if (table->s->blob_fields)
2695  unlink_blobs(table);
2696  mysql_cond_signal(&di->cond);
2697 
2698  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
2699  mysql_mutex_unlock(&di->mutex);
2700  DBUG_RETURN(false);
2701 
2702  err:
2703  delete row;
2704  mysql_mutex_unlock(&di->mutex);
2705  DBUG_RETURN(true);
2706 }
2707 
2713 static void end_delayed_insert(THD *thd)
2714 {
2715  DBUG_ENTER("end_delayed_insert");
2716  Delayed_insert *di=thd->di;
2717  mysql_mutex_lock(&di->mutex);
2718  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
2719  if (!--di->tables_in_use || di->thd.killed)
2720  { // Unlock table
2721  di->status=1;
2722  mysql_cond_signal(&di->cond);
2723  }
2724  mysql_mutex_unlock(&di->mutex);
2725  DBUG_VOID_RETURN;
2726 }
2727 
2728 
2729 /* We kill all delayed threads when doing flush-tables */
2730 
2731 void kill_delayed_threads(void)
2732 {
2733  mysql_mutex_lock(&LOCK_delayed_insert); // For unlink from list
2734 
2735  I_List_iterator<Delayed_insert> it(delayed_threads);
2736  Delayed_insert *di;
2737  while ((di= it++))
2738  {
2739  di->thd.killed= THD::KILL_CONNECTION;
2740  if (di->thd.mysys_var)
2741  {
2742  mysql_mutex_lock(&di->thd.mysys_var->mutex);
2743  if (di->thd.mysys_var->current_cond)
2744  {
2745  /*
2746  We need the following test because the main mutex may be locked
2747  in handle_delayed_insert()
2748  */
2749  if (&di->mutex != di->thd.mysys_var->current_mutex)
2750  mysql_mutex_lock(di->thd.mysys_var->current_mutex);
2751  mysql_cond_broadcast(di->thd.mysys_var->current_cond);
2752  if (&di->mutex != di->thd.mysys_var->current_mutex)
2753  mysql_mutex_unlock(di->thd.mysys_var->current_mutex);
2754  }
2755  mysql_mutex_unlock(&di->thd.mysys_var->mutex);
2756  }
2757  }
2758  mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2759 }
2760 
2761 
2773 {
2774 public:
2775  virtual bool handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
2776  Sroutine_hash_entry *rt, sp_head *sp,
2777  bool *need_prelocking);
2778  virtual bool handle_table(THD *thd, Query_tables_list *prelocking_ctx,
2779  TABLE_LIST *table_list, bool *need_prelocking);
2780  virtual bool handle_view(THD *thd, Query_tables_list *prelocking_ctx,
2781  TABLE_LIST *table_list, bool *need_prelocking);
2782 };
2783 
2784 
2785 bool Delayed_prelocking_strategy::
2786 handle_table(THD *thd, Query_tables_list *prelocking_ctx,
2787  TABLE_LIST *table_list, bool *need_prelocking)
2788 {
2789  DBUG_ASSERT(table_list->lock_type == TL_WRITE_DELAYED);
2790 
2791  if (!(table_list->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
2792  {
2793  my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), table_list->table_name);
2794  return TRUE;
2795  }
2796  return FALSE;
2797 }
2798 
2799 
2800 bool Delayed_prelocking_strategy::
2801 handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
2802  Sroutine_hash_entry *rt, sp_head *sp,
2803  bool *need_prelocking)
2804 {
2805  /* LEX used by the delayed insert thread has no routines. */
2806  DBUG_ASSERT(0);
2807  return FALSE;
2808 }
2809 
2810 
2811 bool Delayed_prelocking_strategy::
2812 handle_view(THD *thd, Query_tables_list *prelocking_ctx,
2813  TABLE_LIST *table_list, bool *need_prelocking)
2814 {
2815  /* We don't open views in the delayed insert thread. */
2816  DBUG_ASSERT(0);
2817  return FALSE;
2818 }
2819 
2820 
2830 {
2831  DBUG_ENTER("Delayed_insert::open_and_lock_table");
2832  Delayed_prelocking_strategy prelocking_strategy;
2833 
2834  /*
2835  Use special prelocking strategy to get ER_DELAYED_NOT_SUPPORTED
2836  error for tables with engines which don't support delayed inserts.
2837  */
2838  if (!(table= open_n_lock_single_table(&thd, &table_list,
2839  TL_WRITE_DELAYED,
2840  MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK,
2841  &prelocking_strategy)))
2842  {
2843  thd.fatal_error(); // Abort waiting inserts
2844  DBUG_RETURN(true);
2845  }
2846 
2847  if (table->triggers)
2848  {
2849  /*
2850  Table has triggers. This is not an error, but we do
2851  not support triggers with delayed insert. Terminate the delayed
2852  thread without an error and thus request lock upgrade.
2853  */
2854  DBUG_RETURN(true);
2855  }
2856  table->copy_blobs= 1;
2857  DBUG_RETURN(false);
2858 }
2859 
2860 
2861 /*
2862  * Create a new delayed insert thread
2863 */
2864 
2865 pthread_handler_t handle_delayed_insert(void *arg)
2866 {
2867  Delayed_insert *di=(Delayed_insert*) arg;
2868  THD *thd= &di->thd;
2869 
2870  pthread_detach_this_thread();
2871  /* Add thread to THD list so that's it's visible in 'show processlist' */
2872  mysql_mutex_lock(&LOCK_thread_count);
2873  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
2874  thd->set_current_time();
2875  add_global_thread(thd);
2876  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
2877  mysql_mutex_unlock(&LOCK_thread_count);
2878 
2879  mysql_thread_set_psi_id(thd->thread_id);
2880 
2881  /*
2882  Wait until the client runs into mysql_cond_wait(),
2883  where we free it after the table is opened and di linked in the list.
2884  If we did not wait here, the client might detect the opened table
2885  before it is linked to the list. It would release LOCK_delayed_create
2886  and allow another thread to create another handler for the same table,
2887  since it does not find one in the list.
2888  */
2889  mysql_mutex_lock(&di->mutex);
2890  if (my_thread_init())
2891  {
2892  /* Can't use my_error since store_globals has not yet been called */
2893  thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
2894  di->handler_thread_initialized= TRUE;
2895  }
2896  else
2897  {
2898  DBUG_ENTER("handle_delayed_insert");
2899  thd->thread_stack= (char*) &thd;
2900  if (init_thr_lock() || thd->store_globals())
2901  {
2902  /* Can't use my_error since store_globals has perhaps failed */
2903  thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
2904  di->handler_thread_initialized= TRUE;
2905  thd->fatal_error();
2906  goto err;
2907  }
2908 
2909  thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
2910 
2911  /*
2912  INSERT DELAYED has to go to row-based format because the time
2913  at which rows are inserted cannot be determined in mixed mode.
2914  */
2915  thd->set_current_stmt_binlog_format_row_if_mixed();
2916 
2917  /*
2918  Clone tickets representing protection against GRL and the lock on
2919  the target table for the insert and add them to the list of granted
2920  metadata locks held by the handler thread. This is safe since the
2921  handler thread is not holding nor waiting on any metadata locks.
2922  */
2923  if (thd->mdl_context.clone_ticket(&di->grl_protection) ||
2924  thd->mdl_context.clone_ticket(&di->table_list.mdl_request))
2925  {
2926  thd->mdl_context.release_transactional_locks();
2927  di->handler_thread_initialized= TRUE;
2928  goto err;
2929  }
2930 
2931  /*
2932  Now that the ticket has been cloned, it is safe for the connection
2933  thread to exit.
2934  */
2935  di->handler_thread_initialized= TRUE;
2936  di->table_list.mdl_request.ticket= NULL;
2937 
2938  if (di->open_and_lock_table())
2939  goto err;
2940 
2941  /* Tell client that the thread is initialized */
2942  mysql_cond_signal(&di->cond_client);
2943 
2944  /* Now wait until we get an insert or lock to handle */
2945  /* We will not abort as long as a client thread uses this thread */
2946 
2947  for (;;)
2948  {
2949  if (thd->killed)
2950  {
2951  uint lock_count;
2952  /*
2953  Remove this from delay insert list so that no one can request a
2954  table from this
2955  */
2956  mysql_mutex_unlock(&di->mutex);
2957  mysql_mutex_lock(&LOCK_delayed_insert);
2958  di->unlink();
2959  lock_count=di->lock_count();
2960  mysql_mutex_unlock(&LOCK_delayed_insert);
2961  mysql_mutex_lock(&di->mutex);
2962  if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
2963  break; // Time to die
2964  }
2965 
2966  /* Shouldn't wait if killed or an insert is waiting. */
2967  if (!thd->killed && !di->status && !di->stacked_inserts)
2968  {
2969  struct timespec abstime;
2970  set_timespec(abstime, delayed_insert_timeout);
2971 
2972  /* Information for pthread_kill */
2973  di->thd.mysys_var->current_mutex= &di->mutex;
2974  di->thd.mysys_var->current_cond= &di->cond;
2975  THD_STAGE_INFO(&(di->thd), stage_waiting_for_insert);
2976 
2977  DBUG_PRINT("info",("Waiting for someone to insert rows"));
2978  while (!thd->killed && !di->status)
2979  {
2980  int error;
2981  mysql_audit_release(thd);
2982 #if defined(HAVE_BROKEN_COND_TIMEDWAIT)
2983  error= mysql_cond_wait(&di->cond, &di->mutex);
2984 #else
2985  error= mysql_cond_timedwait(&di->cond, &di->mutex, &abstime);
2986 #ifdef EXTRA_DEBUG
2987  if (error && error != EINTR && error != ETIMEDOUT)
2988  {
2989  fprintf(stderr, "Got error %d from mysql_cond_timedwait\n", error);
2990  DBUG_PRINT("error", ("Got error %d from mysql_cond_timedwait",
2991  error));
2992  }
2993 #endif
2994 #endif
2995  if (error == ETIMEDOUT || error == ETIME)
2996  thd->killed= THD::KILL_CONNECTION;
2997  }
2998  /* We can't lock di->mutex and mysys_var->mutex at the same time */
2999  mysql_mutex_unlock(&di->mutex);
3000  mysql_mutex_lock(&di->thd.mysys_var->mutex);
3001  di->thd.mysys_var->current_mutex= 0;
3002  di->thd.mysys_var->current_cond= 0;
3003  mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3004  mysql_mutex_lock(&di->mutex);
3005  }
3006 
3007  if (di->tables_in_use && ! thd->lock && !thd->killed)
3008  {
3009  /*
3010  Request for new delayed insert.
3011  Lock the table, but avoid to be blocked by a global read lock.
3012  If we got here while a global read lock exists, then one or more
3013  inserts started before the lock was requested. These are allowed
3014  to complete their work before the server returns control to the
3015  client which requested the global read lock. The delayed insert
3016  handler will close the table and finish when the outstanding
3017  inserts are done.
3018  */
3019  if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, 0)))
3020  {
3021  /* Fatal error */
3022  thd->killed= THD::KILL_CONNECTION;
3023  }
3024  mysql_cond_broadcast(&di->cond_client);
3025  }
3026  if (di->stacked_inserts)
3027  {
3028  if (di->handle_inserts())
3029  {
3030  /* Some fatal error */
3031  thd->killed= THD::KILL_CONNECTION;
3032  }
3033  }
3034  di->status=0;
3035  if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
3036  {
3037  /*
3038  No one is doing a insert delayed
3039  Unlock table so that other threads can use it
3040  */
3041  MYSQL_LOCK *lock=thd->lock;
3042  thd->lock=0;
3043  mysql_mutex_unlock(&di->mutex);
3044  /*
3045  We need to release next_insert_id before unlocking. This is
3046  enforced by handler::ha_external_lock().
3047  */
3048  di->table->file->ha_release_auto_increment();
3049  mysql_unlock_tables(thd, lock);
3050  trans_commit_stmt(thd);
3051  di->group_count=0;
3052  mysql_audit_release(thd);
3053  mysql_mutex_lock(&di->mutex);
3054  }
3055  if (di->tables_in_use)
3056  mysql_cond_broadcast(&di->cond_client); // If waiting clients
3057  }
3058 
3059  err:
3060  DBUG_LEAVE;
3061  }
3062 
3063  close_thread_tables(thd); // Free the table
3064  thd->mdl_context.release_transactional_locks();
3065  di->table=0;
3066  thd->killed= THD::KILL_CONNECTION; // If error
3067  mysql_cond_broadcast(&di->cond_client); // Safety
3068  mysql_mutex_unlock(&di->mutex);
3069 
3070  mysql_mutex_lock(&LOCK_delayed_create); // Because of delayed_get_table
3071  mysql_mutex_lock(&LOCK_delayed_insert);
3072  /*
3073  di should be unlinked from the thread handler list and have no active
3074  clients
3075  */
3076  delete di;
3077  mysql_mutex_unlock(&LOCK_delayed_insert);
3078  mysql_mutex_unlock(&LOCK_delayed_create);
3079 
3080  my_thread_end();
3081  pthread_exit(0);
3082 
3083  return 0;
3084 }
3085 
3086 
3087 /* Remove pointers from temporary fields to allocated values */
3088 
3089 static void unlink_blobs(register TABLE *table)
3090 {
3091  for (Field **ptr=table->field ; *ptr ; ptr++)
3092  {
3093  if ((*ptr)->flags & BLOB_FLAG)
3094  ((Field_blob *) (*ptr))->clear_temporary();
3095  }
3096 }
3097 
3098 /* Free blobs stored in current row */
3099 
3100 static void free_delayed_insert_blobs(register TABLE *table)
3101 {
3102  for (Field **ptr=table->field ; *ptr ; ptr++)
3103  {
3104  if ((*ptr)->flags & BLOB_FLAG)
3105  {
3106  uchar *str;
3107  ((Field_blob *) (*ptr))->get_ptr(&str);
3108  my_free(str);
3109  ((Field_blob *) (*ptr))->reset();
3110  }
3111  }
3112 }
3113 
3114 
3115 bool Delayed_insert::handle_inserts(void)
3116 {
3117  int error;
3118  ulong max_rows;
3119  bool has_trans = TRUE;
3120  bool using_ignore= 0, using_opt_replace= 0,
3121  using_bin_log= mysql_bin_log.is_open();
3122  delayed_row *row;
3123  DBUG_ENTER("Delayed_insert::handle_inserts");
3124 
3125  /* Allow client to insert new rows */
3126  mysql_mutex_unlock(&mutex);
3127 
3128  table->next_number_field=table->found_next_number_field;
3129 
3130  THD_STAGE_INFO(&thd, stage_upgrading_lock);
3131  if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
3132  thd.variables.lock_wait_timeout))
3133  {
3134  /*
3135  This can happen if thread is killed either by a shutdown
3136  or if another thread is removing the current table definition
3137  from the table cache.
3138  */
3139  my_error(ER_DELAYED_CANT_CHANGE_LOCK,MYF(ME_FATALERROR),
3140  table->s->table_name.str);
3141  goto err;
3142  }
3143 
3144  THD_STAGE_INFO(&thd, stage_insert);
3145  max_rows= delayed_insert_limit;
3146  if (thd.killed || table->s->has_old_version())
3147  {
3148  thd.killed= THD::KILL_CONNECTION;
3149  max_rows= ULONG_MAX; // Do as much as possible
3150  }
3151 
3152  /*
3153  We can't use row caching when using the binary log because if
3154  we get a crash, then binary log will contain rows that are not yet
3155  written to disk, which will cause problems in replication.
3156  */
3157  if (!using_bin_log)
3158  table->file->extra(HA_EXTRA_WRITE_CACHE);
3159  mysql_mutex_lock(&mutex);
3160 
3161  bitmap_set_all(table->read_set);
3162  while ((row=rows.get()))
3163  {
3164  stacked_inserts--;
3165  mysql_mutex_unlock(&mutex);
3166  memcpy(table->record[0],row->record,table->s->reclength);
3167 
3168  thd.start_time.tv_sec= row->start_time;
3169  thd.start_time.tv_usec= 0;
3170  thd.query_start_used=row->query_start_used;
3171 
3172  /*
3173  Copy to the DI table hander the row write set
3174  which in its turn is a copy of the user thread's table
3175  write set at the time the delayed insert was issued.
3176  */
3177  bitmap_clear_all(table->write_set);
3178  bitmap_union(table->write_set, &row->write_set);
3179  table->file->column_bitmaps_signal();
3180 
3181  /*
3182  To get the exact auto_inc interval to store in the binlog we must not
3183  use values from the previous interval (of the previous rows).
3184  */
3185  bool log_query= (row->log_query && row->query.str != NULL);
3186  DBUG_PRINT("delayed", ("query: '%s' length: %lu", row->query.str ?
3187  row->query.str : "[NULL]",
3188  (ulong) row->query.length));
3189  if (log_query)
3190  {
3191  /*
3192  Guaranteed that the INSERT DELAYED STMT will not be here
3193  in SBR when mysql binlog is enabled.
3194  */
3195  DBUG_ASSERT(!(mysql_bin_log.is_open() &&
3196  !thd.is_current_stmt_binlog_format_row()));
3197 
3198  if (mysql_bin_log.is_open())
3199  {
3200  /* Flush rows of previous statement*/
3201  if (thd.binlog_flush_pending_rows_event(TRUE, FALSE))
3202  {
3203  delete row;
3204  goto err;
3205  }
3206  /* Set query for Rows_query_log event in RBR*/
3207  thd.set_query(row->query.str, row->query.length);
3208  thd.variables.binlog_rows_query_log_events= row->binlog_rows_query_log_events;
3209  }
3210 
3211  /*
3212  This is the first value of an INSERT statement.
3213  It is the right place to clear a forced insert_id.
3214  This is usually done after the last value of an INSERT statement,
3215  but we won't know this in the insert delayed thread. But before
3216  the first value is sufficiently equivalent to after the last
3217  value of the previous statement.
3218  */
3219  table->file->ha_release_auto_increment();
3220  thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
3221  }
3222  thd.first_successful_insert_id_in_prev_stmt=
3223  row->first_successful_insert_id_in_prev_stmt;
3224  thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
3225  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
3226  table->auto_increment_field_not_null= row->auto_increment_field_not_null;
3227 
3228  /* Copy the session variables. */
3229  thd.variables.auto_increment_increment= row->auto_increment_increment;
3230  thd.variables.auto_increment_offset= row->auto_increment_offset;
3231  thd.variables.sql_mode= row->sql_mode;
3232 
3233  /* Copy a forced insert_id, if any. */
3234  if (row->forced_insert_id)
3235  {
3236  DBUG_PRINT("delayed", ("received auto_inc: %lu",
3237  (ulong) row->forced_insert_id));
3238  thd.force_one_auto_inc_interval(row->forced_insert_id);
3239  }
3240 
3241  d_info.set_dup_and_ignore(row->dup, row->ignore);
3242 
3243  const enum_duplicates duplicate_handling= d_info.get_duplicate_handling();
3244  if (d_info.get_ignore_errors() || duplicate_handling != DUP_ERROR)
3245  {
3246  table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3247  using_ignore=1;
3248  }
3249  if (duplicate_handling == DUP_REPLACE &&
3250  (!table->triggers ||
3251  !table->triggers->has_delete_triggers()))
3252  {
3253  table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3254  using_opt_replace= 1;
3255  }
3256  if (duplicate_handling == DUP_UPDATE)
3257  table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3258  thd.clear_error(); // reset error for binlog
3259  if (write_record(&thd, table, &d_info, NULL))
3260  {
3261  d_info.stats.error_count++; // Ignore errors
3262  thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3263  row->log_query = 0;
3264  }
3265 
3266  if (using_ignore)
3267  {
3268  using_ignore=0;
3269  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3270  }
3271  if (using_opt_replace)
3272  {
3273  using_opt_replace= 0;
3274  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3275  }
3276 
3277  if (table->s->blob_fields)
3278  free_delayed_insert_blobs(table);
3279  thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
3280  thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
3281  mysql_mutex_lock(&mutex);
3282 
3283  /*
3284  Reset the table->auto_increment_field_not_null as it is valid for
3285  only one row.
3286  */
3287  table->auto_increment_field_not_null= FALSE;
3288 
3289  if (log_query && mysql_bin_log.is_open())
3290  thd.set_query(NULL, 0);
3291  delete row;
3292  /*
3293  Let READ clients do something once in a while
3294  We should however not break in the middle of a multi-line insert
3295  if we have binary logging enabled as we don't want other commands
3296  on this table until all entries has been processed
3297  */
3298  if (group_count++ >= max_rows && (row= rows.head()) &&
3299  (!(row->log_query & using_bin_log)))
3300  {
3301  group_count=0;
3302  if (stacked_inserts || tables_in_use) // Let these wait a while
3303  {
3304  if (tables_in_use)
3305  mysql_cond_broadcast(&cond_client); // If waiting clients
3306  THD_STAGE_INFO(&thd, stage_reschedule);
3307  mysql_mutex_unlock(&mutex);
3308  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
3309  {
3310  /* This should never happen */
3311  table->file->print_error(error,MYF(0));
3312  sql_print_error("%s", thd.get_stmt_da()->message());
3313  DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
3314  goto err;
3315  }
3316  query_cache_invalidate3(&thd, table, 1);
3317  if (thr_reschedule_write_lock(*thd.lock->locks,
3318  thd.variables.lock_wait_timeout))
3319  {
3320  /* This is not known to happen. */
3321  my_error(ER_DELAYED_CANT_CHANGE_LOCK,MYF(ME_FATALERROR),
3322  table->s->table_name.str);
3323  goto err;
3324  }
3325  if (!using_bin_log)
3326  table->file->extra(HA_EXTRA_WRITE_CACHE);
3327  mysql_mutex_lock(&mutex);
3328  THD_STAGE_INFO(&thd, stage_insert);
3329  }
3330  if (tables_in_use)
3331  mysql_cond_broadcast(&cond_client); // If waiting clients
3332  }
3333  }
3334  mysql_mutex_unlock(&mutex);
3335 
3336  /*
3337  We need to flush the pending event when using row-based
3338  replication since the flushing normally done in binlog_query() is
3339  not done last in the statement: for delayed inserts, the insert
3340  statement is logged *before* all rows are inserted.
3341 
3342  We can flush the pending event without checking the thd->lock
3343  since the delayed insert *thread* is not inside a stored function
3344  or trigger.
3345 
3346  TODO: Move the logging to last in the sequence of rows.
3347  */
3348  has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE ||
3349  table->file->has_transactions();
3350  if (mysql_bin_log.is_open() &&
3351  thd.binlog_flush_pending_rows_event(TRUE, has_trans))
3352  goto err;
3353 
3354  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
3355  { // This shouldn't happen
3356  table->file->print_error(error,MYF(0));
3357  sql_print_error("%s", thd.get_stmt_da()->message());
3358  DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
3359  goto err;
3360  }
3361  query_cache_invalidate3(&thd, table, 1);
3362  DBUG_EXECUTE_IF("after_handle_inserts",
3363  {
3364  const char act[]=
3365  "now "
3366  "signal inserts_handled";
3367  DBUG_ASSERT(opt_debug_sync_timeout > 0);
3368  DBUG_ASSERT(!debug_sync_set_action(&thd,
3369  STRING_WITH_LEN(act)));
3370  };);
3371  mysql_mutex_lock(&mutex);
3372  DBUG_RETURN(0);
3373 
3374  err:
3375  max_rows= 0;
3376  mysql_mutex_lock(&mutex);
3377  /* Remove all not used rows */
3378  while ((row=rows.get()))
3379  {
3380  if (table->s->blob_fields)
3381  {
3382  memcpy(table->record[0],row->record,table->s->reclength);
3383  free_delayed_insert_blobs(table);
3384  }
3385  delete row;
3386  stacked_inserts--;
3387  max_rows++;
3388  }
3389  mysql_mutex_unlock(&mutex);
3390  DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
3391  for (; max_rows > 0; max_rows--)
3392  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
3393  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
3394  mysql_mutex_lock(&mutex);
3395  DBUG_RETURN(1);
3396 }
3397 #endif /* EMBEDDED_LIBRARY */
3398 
3399 /***************************************************************************
3400  Store records in INSERT ... SELECT *
3401 ***************************************************************************/
3402 
3403 
3404 /*
3405  make insert specific preparation and checks after opening tables
3406 
3407  SYNOPSIS
3408  mysql_insert_select_prepare()
3409  thd thread handler
3410 
3411  RETURN
3412  FALSE OK
3413  TRUE Error
3414 */
3415 
3416 bool mysql_insert_select_prepare(THD *thd)
3417 {
3418  LEX *lex= thd->lex;
3419  SELECT_LEX *select_lex= &lex->select_lex;
3420  TABLE_LIST *first_select_leaf_table;
3421  DBUG_ENTER("mysql_insert_select_prepare");
3422 
3423  /*
3424  SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
3425  clause if table is VIEW
3426  */
3427 
3428  if (mysql_prepare_insert(thd, lex->query_tables,
3429  lex->query_tables->table, lex->field_list, 0,
3430  lex->update_list, lex->value_list,
3431  lex->duplicates,
3432  &select_lex->where, TRUE, FALSE, FALSE))
3433  DBUG_RETURN(TRUE);
3434 
3435  /*
3436  exclude first table from leaf tables list, because it belong to
3437  INSERT
3438  */
3439  DBUG_ASSERT(select_lex->leaf_tables != 0);
3440  lex->leaf_tables_insert= select_lex->leaf_tables;
3441  /* skip all leaf tables belonged to view where we are insert */
3442  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
3443  first_select_leaf_table &&
3444  first_select_leaf_table->belong_to_view &&
3445  first_select_leaf_table->belong_to_view ==
3446  lex->leaf_tables_insert->belong_to_view;
3447  first_select_leaf_table= first_select_leaf_table->next_leaf)
3448  {}
3449  select_lex->leaf_tables= first_select_leaf_table;
3450  DBUG_RETURN(FALSE);
3451 }
3452 
3453 
3454 int
3455 select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3456 {
3457  LEX *lex= thd->lex;
3458  int res;
3459  table_map map= 0;
3460  SELECT_LEX *lex_current_select_save= lex->current_select;
3461  DBUG_ENTER("select_insert::prepare");
3462 
3463  const enum_duplicates duplicate_handling= info.get_duplicate_handling();
3464  const bool ignore_errors= info.get_ignore_errors();
3465 
3466  unit= u;
3467 
3468  /*
3469  Since table in which we are going to insert is added to the first
3470  select, LEX::current_select should point to the first select while
3471  we are fixing fields from insert list.
3472  */
3473  lex->current_select= &lex->select_lex;
3474 
3475  /* Errors during check_insert_fields() should not be ignored. */
3476  lex->current_select->no_error= FALSE;
3477  res= (setup_fields(thd, Ref_ptr_array(), values, MARK_COLUMNS_READ, 0, 0) ||
3478  check_insert_fields(thd, table_list, *fields, values,
3479  !insert_into_view, 1, &map));
3480 
3481  if (!res && fields->elements)
3482  {
3483  bool saved_abort_on_warning= thd->abort_on_warning;
3484 
3485  thd->abort_on_warning= !ignore_errors && thd->is_strict_mode();
3486 
3487  res= check_that_all_fields_are_given_values(thd, table_list->table,
3488  table_list);
3489  thd->abort_on_warning= saved_abort_on_warning;
3490  }
3491 
3492  if (duplicate_handling == DUP_UPDATE && !res)
3493  {
3494  Name_resolution_context *context= &lex->select_lex.context;
3496 
3497  /* Save the state of the current name resolution context. */
3498  ctx_state.save_state(context, table_list);
3499 
3500  /* Perform name resolution only in the first table - 'table_list'. */
3501  table_list->next_local= 0;
3502  context->resolve_in_table_list_only(table_list);
3503 
3504  lex->select_lex.no_wrap_view_item= TRUE;
3505  res= res ||
3506  check_update_fields(thd, context->table_list,
3507  *update.get_changed_columns(),
3508  *update.update_values,
3509  /*
3510  In INSERT SELECT ON DUPLICATE KEY UPDATE col=x
3511  'x' can legally refer to a non-inserted table.
3512  'x' is not even resolved yet.
3513  */
3514  true,
3515  &map);
3516  lex->select_lex.no_wrap_view_item= FALSE;
3517  /*
3518  When we are not using GROUP BY and there are no ungrouped aggregate
3519  functions
3520  we can refer to other tables in the ON DUPLICATE KEY part.
3521  We use next_name_resolution_table destructively, so check it first
3522  (views?).
3523  */
3524  DBUG_ASSERT (!table_list->next_name_resolution_table);
3525  if (lex->select_lex.group_list.elements == 0 &&
3526  !lex->select_lex.with_sum_func)
3527  {
3528  /*
3529  We must make a single context out of the two separate name resolution
3530  contexts:
3531  the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
3532  To do that we must concatenate the two lists
3533  */
3534  table_list->next_name_resolution_table=
3535  ctx_state.get_first_name_resolution_table();
3536  }
3537  res= res || setup_fields(thd, Ref_ptr_array(), *update.update_values,
3538  MARK_COLUMNS_READ, 0, 0);
3539  if (!res)
3540  {
3541  /*
3542  Traverse the update values list and substitute fields from the
3543  select for references (Item_ref objects) to them. This is done in
3544  order to get correct values from those fields when the select
3545  employs a temporary table.
3546  */
3547  List_iterator<Item> li(*update.update_values);
3548  Item *item;
3549 
3550  while ((item= li++))
3551  {
3552  item->transform(&Item::update_value_transformer,
3553  (uchar*)lex->current_select);
3554  }
3555  }
3556 
3557  /* Restore the current context. */
3558  ctx_state.restore_state(context, table_list);
3559  }
3560 
3561  lex->current_select= lex_current_select_save;
3562  if (res)
3563  DBUG_RETURN(1);
3564  /*
3565  if it is INSERT into join view then check_insert_fields already found
3566  real table for insert
3567  */
3568  table= table_list->table;
3569 
3570  if (info.add_function_default_columns(table, table->write_set))
3571  DBUG_RETURN(1);
3572  if ((duplicate_handling == DUP_UPDATE) &&
3573  update.add_function_default_columns(table, table->write_set))
3574  DBUG_RETURN(1);
3575 
3576  /*
3577  Is table which we are changing used somewhere in other parts of
3578  query
3579  */
3580  if (unique_table(thd, table_list, table_list->next_global, 0))
3581  {
3582  /* Using same table for INSERT and SELECT */
3583  lex->current_select->options|= OPTION_BUFFER_RESULT;
3584  lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
3585  }
3586  restore_record(table,s->default_values); // Get empty record
3587  table->next_number_field=table->found_next_number_field;
3588 
3589 #ifdef HAVE_REPLICATION
3590  if (thd->slave_thread)
3591  {
3592  DBUG_ASSERT(active_mi != NULL);
3593  if (duplicate_handling == DUP_UPDATE &&
3594  table->next_number_field != NULL &&
3595  rpl_master_has_bug(active_mi->rli, 24432, TRUE, NULL, NULL))
3596  DBUG_RETURN(1);
3597  }
3598 #endif
3599 
3600  thd->cuted_fields=0;
3601  if (ignore_errors || duplicate_handling != DUP_ERROR)
3602  table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3603  if (duplicate_handling == DUP_REPLACE &&
3604  (!table->triggers || !table->triggers->has_delete_triggers()))
3605  table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3606  if (duplicate_handling == DUP_UPDATE)
3607  table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3608  thd->abort_on_warning= (!ignore_errors && thd->is_strict_mode());
3609  res= (table_list->prepare_where(thd, 0, TRUE) ||
3610  table_list->prepare_check_option(thd));
3611 
3612  if (!res)
3613  prepare_triggers_for_insert_stmt(table);
3614 
3615  DBUG_RETURN(res);
3616 }
3617 
3618 
3619 /*
3620  Finish the preparation of the result table.
3621 
3622  SYNOPSIS
3623  select_insert::prepare2()
3624  void
3625 
3626  DESCRIPTION
3627  If the result table is the same as one of the source tables (INSERT SELECT),
3628  the result table is not finally prepared at the join prepair phase.
3629  Do the final preparation now.
3630 
3631  RETURN
3632  0 OK
3633 */
3634 
3635 int select_insert::prepare2(void)
3636 {
3637  DBUG_ENTER("select_insert::prepare2");
3638  if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
3639  !thd->lex->describe)
3640  {
3641  DBUG_ASSERT(!bulk_insert_started);
3642  // TODO: Is there no better estimation than 0 == Unknown number of rows?
3643  table->file->ha_start_bulk_insert((ha_rows) 0);
3644  bulk_insert_started= true;
3645  }
3646  DBUG_RETURN(0);
3647 }
3648 
3649 
3650 void select_insert::cleanup()
3651 {
3652  /* select_insert/select_create are never re-used in prepared statement */
3653  DBUG_ASSERT(0);
3654 }
3655 
3656 select_insert::~select_insert()
3657 {
3658  DBUG_ENTER("~select_insert");
3659  if (table)
3660  {
3661  table->next_number_field=0;
3662  table->auto_increment_field_not_null= FALSE;
3663  table->file->ha_reset();
3664  }
3665  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
3666  thd->abort_on_warning= 0;
3667  DBUG_VOID_RETURN;
3668 }
3669 
3670 
3671 bool select_insert::send_data(List<Item> &values)
3672 {
3673  DBUG_ENTER("select_insert::send_data");
3674  bool error=0;
3675 
3676  if (unit->offset_limit_cnt)
3677  { // using limit offset,count
3678  unit->offset_limit_cnt--;
3679  DBUG_RETURN(0);
3680  }
3681 
3682  thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
3683  store_values(values);
3684  thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
3685  if (thd->is_error())
3686  {
3687  table->auto_increment_field_not_null= FALSE;
3688  DBUG_RETURN(1);
3689  }
3690  if (table_list) // Not CREATE ... SELECT
3691  {
3692  switch (table_list->view_check_option(thd, info.get_ignore_errors())) {
3693  case VIEW_CHECK_SKIP:
3694  DBUG_RETURN(0);
3695  case VIEW_CHECK_ERROR:
3696  DBUG_RETURN(1);
3697  }
3698  }
3699 
3700  // Release latches in case bulk insert takes a long time
3702 
3703  error= write_record(thd, table, &info, &update);
3704  table->auto_increment_field_not_null= FALSE;
3705 
3706  if (!error)
3707  {
3708  if (table->triggers || info.get_duplicate_handling() == DUP_UPDATE)
3709  {
3710  /*
3711  Restore fields of the record since it is possible that they were
3712  changed by ON DUPLICATE KEY UPDATE clause.
3713 
3714  If triggers exist then whey can modify some fields which were not
3715  originally touched by INSERT ... SELECT, so we have to restore
3716  their original values for the next row.
3717  */
3718  restore_record(table, s->default_values);
3719  }
3720  if (table->next_number_field)
3721  {
3722  /*
3723  If no value has been autogenerated so far, we need to remember the
3724  value we just saw, we may need to send it to client in the end.
3725  */
3726  if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
3727  autoinc_value_of_last_inserted_row=
3728  table->next_number_field->val_int();
3729  /*
3730  Clear auto-increment field for the next record, if triggers are used
3731  we will clear it twice, but this should be cheap.
3732  */
3733  table->next_number_field->reset();
3734  }
3735  }
3736  DBUG_RETURN(error);
3737 }
3738 
3739 
3740 void select_insert::store_values(List<Item> &values)
3741 {
3742  const bool ignore_err= true;
3743  if (fields->elements)
3744  {
3745  restore_record(table, s->default_values);
3746  if (!validate_default_values_of_unset_fields(thd, table))
3747  fill_record_n_invoke_before_triggers(thd, *fields, values, ignore_err,
3748  table->triggers, TRG_EVENT_INSERT);
3749  }
3750  else
3751  fill_record_n_invoke_before_triggers(thd, table->field, values, ignore_err,
3752  table->triggers, TRG_EVENT_INSERT);
3753 }
3754 
3755 void select_insert::send_error(uint errcode,const char *err)
3756 {
3757  DBUG_ENTER("select_insert::send_error");
3758 
3759  my_message(errcode, err, MYF(0));
3760 
3761  DBUG_VOID_RETURN;
3762 }
3763 
3764 
3765 bool select_insert::send_eof()
3766 {
3767  int error;
3768  bool const trans_table= table->file->has_transactions();
3769  ulonglong id, row_count;
3770  bool changed;
3771  THD::killed_state killed_status= thd->killed;
3772  DBUG_ENTER("select_insert::send_eof");
3773  DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
3774  trans_table, table->file->table_type()));
3775 
3776  error= (bulk_insert_started ?
3777  table->file->ha_end_bulk_insert() : 0);
3778  if (!error && thd->is_error())
3779  error= thd->get_stmt_da()->sql_errno();
3780 
3781  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3782  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3783 
3784  changed= (info.stats.copied || info.stats.deleted || info.stats.updated);
3785  if (changed)
3786  {
3787  /*
3788  We must invalidate the table in the query cache before binlog writing
3789  and ha_autocommit_or_rollback.
3790  */
3791  query_cache_invalidate3(thd, table, 1);
3792  }
3793 
3794  DBUG_ASSERT(trans_table || !changed ||
3795  thd->transaction.stmt.cannot_safely_rollback());
3796 
3797  /*
3798  Write to binlog before commiting transaction. No statement will
3799  be written by the binlog_query() below in RBR mode. All the
3800  events are in the transaction cache and will be written when
3801  ha_autocommit_or_rollback() is issued below.
3802  */
3803  if (mysql_bin_log.is_open() &&
3804  (!error || thd->transaction.stmt.cannot_safely_rollback()))
3805  {
3806  int errcode= 0;
3807  if (!error)
3808  thd->clear_error();
3809  else
3810  errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);
3811  if (thd->binlog_query(THD::ROW_QUERY_TYPE,
3812  thd->query(), thd->query_length(),
3813  trans_table, FALSE, FALSE, errcode))
3814  {
3815  table->file->ha_release_auto_increment();
3816  DBUG_RETURN(1);
3817  }
3818  }
3819  table->file->ha_release_auto_increment();
3820 
3821  if (error)
3822  {
3823  table->file->print_error(error,MYF(0));
3824  DBUG_RETURN(1);
3825  }
3826  char buff[160];
3827  if (info.get_ignore_errors())
3828  my_snprintf(buff, sizeof(buff),
3829  ER(ER_INSERT_INFO), (long) info.stats.records,
3830  (long) (info.stats.records - info.stats.copied),
3831  (long) thd->get_stmt_da()->current_statement_warn_count());
3832  else
3833  my_snprintf(buff, sizeof(buff),
3834  ER(ER_INSERT_INFO), (long) info.stats.records,
3835  (long) (info.stats.deleted+info.stats.updated),
3836  (long) thd->get_stmt_da()->current_statement_warn_count());
3837  row_count= info.stats.copied + info.stats.deleted +
3838  ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
3839  info.stats.touched : info.stats.updated);
3840  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
3841  thd->first_successful_insert_id_in_cur_stmt :
3842  (thd->arg_of_last_insert_id_function ?
3843  thd->first_successful_insert_id_in_prev_stmt :
3844  (info.stats.copied ? autoinc_value_of_last_inserted_row : 0));
3845  my_ok(thd, row_count, id, buff);
3846  DBUG_RETURN(0);
3847 }
3848 
3849 void select_insert::abort_result_set() {
3850 
3851  DBUG_ENTER("select_insert::abort_result_set");
3852  /*
3853  If the creation of the table failed (due to a syntax error, for
3854  example), no table will have been opened and therefore 'table'
3855  will be NULL. In that case, we still need to execute the rollback
3856  and the end of the function.
3857  */
3858  if (table)
3859  {
3860  bool changed, transactional_table;
3861  /*
3862  Try to end the bulk insert which might have been started before.
3863  We don't need to do this if we are in prelocked mode (since we
3864  don't use bulk insert in this case). Also we should not do this
3865  if tables are not locked yet (bulk insert is not started yet
3866  in this case).
3867  */
3868  if (bulk_insert_started)
3869  table->file->ha_end_bulk_insert();
3870 
3871  /*
3872  If at least one row has been inserted/modified and will stay in
3873  the table (the table doesn't have transactions) we must write to
3874  the binlog (and the error code will make the slave stop).
3875 
3876  For many errors (example: we got a duplicate key error while
3877  inserting into a MyISAM table), no row will be added to the table,
3878  so passing the error to the slave will not help since there will
3879  be an error code mismatch (the inserts will succeed on the slave
3880  with no error).
3881 
3882  If table creation failed, the number of rows modified will also be
3883  zero, so no check for that is made.
3884  */
3885  changed= (info.stats.copied || info.stats.deleted || info.stats.updated);
3886  transactional_table= table->file->has_transactions();
3887  if (thd->transaction.stmt.cannot_safely_rollback())
3888  {
3889  if (mysql_bin_log.is_open())
3890  {
3891  int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
3892  /* error of writing binary log is ignored */
3893  (void) thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
3894  thd->query_length(),
3895  transactional_table, FALSE, FALSE, errcode);
3896  }
3897  if (changed)
3898  query_cache_invalidate3(thd, table, 1);
3899  }
3900  DBUG_ASSERT(transactional_table || !changed ||
3901  thd->transaction.stmt.cannot_safely_rollback());
3902  table->file->ha_release_auto_increment();
3903  }
3904 
3905  DBUG_VOID_RETURN;
3906 }
3907 
3908 
3909 /***************************************************************************
3910  CREATE TABLE (SELECT) ...
3911 ***************************************************************************/
3912 
3962 static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
3964  Alter_info *alter_info,
3965  List<Item> *items)
3966 {
3967  TABLE tmp_table; // Used during 'Create_field()'
3968  TABLE_SHARE share;
3969  TABLE *table= 0;
3970  uint select_field_count= items->elements;
3971  /* Add selected items to field list */
3972  List_iterator_fast<Item> it(*items);
3973  Item *item;
3974  Field *tmp_field;
3975  DBUG_ENTER("create_table_from_items");
3976 
3977  tmp_table.alias= 0;
3978  tmp_table.s= &share;
3979  init_tmp_table_share(thd, &share, "", 0, "", "");
3980 
3981  tmp_table.s->db_create_options=0;
3982  tmp_table.s->db_low_byte_first=
3983  test(create_info->db_type == myisam_hton ||
3984  create_info->db_type == heap_hton);
3985  tmp_table.null_row=tmp_table.maybe_null=0;
3986 
3987  if (!thd->variables.explicit_defaults_for_timestamp)
3988  promote_first_timestamp_column(&alter_info->create_list);
3989 
3990  while ((item=it++))
3991  {
3992  Create_field *cr_field;
3993  Field *field, *def_field;
3994  if (item->type() == Item::FUNC_ITEM)
3995  if (item->result_type() != STRING_RESULT)
3996  field= item->tmp_table_field(&tmp_table);
3997  else
3998  field= item->tmp_table_field_from_field_type(&tmp_table, 0);
3999  else
4000  field= create_tmp_field(thd, &tmp_table, item, item->type(),
4001  (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0);
4002  if (!field ||
4003  !(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
4004  ((Item_field *)item)->field :
4005  (Field*) 0))))
4006  DBUG_RETURN(0);
4007 
4008  if (item->maybe_null)
4009  cr_field->flags &= ~NOT_NULL_FLAG;
4010  alter_info->create_list.push_back(cr_field);
4011  }
4012 
4013  DEBUG_SYNC(thd,"create_table_select_before_create");
4014 
4015  /*
4016  Create and lock table.
4017 
4018  Note that we either creating (or opening existing) temporary table or
4019  creating base table on which name we have exclusive lock. So code below
4020  should not cause deadlocks or races.
4021 
4022  We don't log the statement, it will be logged later.
4023 
4024  If this is a HEAP table, the automatic DELETE FROM which is written to the
4025  binlog when a HEAP table is opened for the first time since startup, must
4026  not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
4027  don't want to delete from it) 2) it would be written before the CREATE
4028  TABLE, which is a wrong order. So we keep binary logging disabled when we
4029  open_table().
4030  */
4031  {
4032  if (!mysql_create_table_no_lock(thd, create_table->db,
4033  create_table->table_name,
4034  create_info, alter_info,
4035  select_field_count, NULL))
4036  {
4037  DEBUG_SYNC(thd,"create_table_select_before_open");
4038 
4039  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
4040  {
4041  Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
4042  /*
4043  Here we open the destination table, on which we already have
4044  an exclusive metadata lock.
4045  */
4046  if (open_table(thd, create_table, &ot_ctx))
4047  {
4048  quick_rm_table(thd, create_info->db_type, create_table->db,
4049  table_case_name(create_info, create_table->table_name),
4050  0);
4051  }
4052  else
4053  table= create_table->table;
4054  }
4055  else
4056  {
4057  if (open_temporary_table(thd, create_table))
4058  {
4059  /*
4060  This shouldn't happen as creation of temporary table should make
4061  it preparable for open. Anyway we can't drop temporary table if
4062  we are unable to fint it.
4063  */
4064  DBUG_ASSERT(0);
4065  }
4066  else
4067  {
4068  table= create_table->table;
4069  }
4070  }
4071  }
4072  if (!table) // open failed
4073  DBUG_RETURN(0);
4074  }
4075  DBUG_RETURN(table);
4076 }
4077 
4078 
4090 int
4091 select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
4092 {
4093  DBUG_ENTER("select_create::prepare");
4094 
4095  unit= u;
4096  DBUG_ASSERT(create_table->table == NULL);
4097 
4098  DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
4099 
4100  if (!(table= create_table_from_items(thd, create_info, create_table,
4101  alter_info, &values)))
4102  /* abort() deletes table */
4103  DBUG_RETURN(-1);
4104 
4105  if (table->s->fields < values.elements)
4106  {
4107  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
4108  DBUG_RETURN(-1);
4109  }
4110  /* First field to copy */
4111  field= table->field+table->s->fields - values.elements;
4112 
4113  // Turn off function defaults for columns filled from SELECT list:
4114  const bool retval= info.ignore_last_columns(table, values.elements);
4115  DBUG_RETURN(retval);
4116 }
4117 
4118 
4127 int
4128 select_create::prepare2()
4129 {
4130  DBUG_ENTER("select_create::prepare2");
4131  DEBUG_SYNC(thd,"create_table_select_before_lock");
4132 
4133  MYSQL_LOCK *extra_lock= NULL;
4134  /*
4135  For row-based replication, the CREATE-SELECT statement is written
4136  in two pieces: the first one contain the CREATE TABLE statement
4137  necessary to create the table and the second part contain the rows
4138  that should go into the table.
4139 
4140  For non-temporary tables, the start of the CREATE-SELECT
4141  implicitly commits the previous transaction, and all events
4142  forming the statement will be stored the transaction cache. At end
4143  of the statement, the entire statement is committed as a
4144  transaction, and all events are written to the binary log.
4145 
4146  On the master, the table is locked for the duration of the
4147  statement, but since the CREATE part is replicated as a simple
4148  statement, there is no way to lock the table for accesses on the
4149  slave. Hence, we have to hold on to the CREATE part of the
4150  statement until the statement has finished.
4151  */
4152  class MY_HOOKS : public TABLEOP_HOOKS {
4153  public:
4154  MY_HOOKS(select_create *x, TABLE_LIST *create_table_arg,
4155  TABLE_LIST *select_tables_arg)
4156  : ptr(x),
4157  create_table(create_table_arg),
4158  select_tables(select_tables_arg)
4159  {
4160  }
4161 
4162  private:
4163  virtual int do_postlock(TABLE **tables, uint count)
4164  {
4165  int error;
4166  THD *thd= const_cast<THD*>(ptr->get_thd());
4167  TABLE_LIST *save_next_global= create_table->next_global;
4168 
4169  create_table->next_global= select_tables;
4170 
4171  error= thd->decide_logging_format(create_table);
4172 
4173  create_table->next_global= save_next_global;
4174 
4175  if (error)
4176  return error;
4177 
4178  TABLE const *const table = *tables;
4179  if (thd->is_current_stmt_binlog_format_row() &&
4180  !table->s->tmp_table)
4181  {
4182  if (int error= ptr->binlog_show_create_table(tables, count))
4183  return error;
4184  }
4185  return 0;
4186  }
4187  select_create *ptr;
4189  TABLE_LIST *select_tables;
4190  };
4191 
4192  MY_HOOKS hooks(this, create_table, select_tables);
4193 
4194  table->reginfo.lock_type=TL_WRITE;
4195  hooks.prelock(&table, 1); // Call prelock hooks
4196  /*
4197  mysql_lock_tables() below should never fail with request to reopen table
4198  since it won't wait for the table lock (we have exclusive metadata lock on
4199  the table) and thus can't get aborted.
4200  */
4201  if (! (extra_lock= mysql_lock_tables(thd, &table, 1, 0)) ||
4202  hooks.postlock(&table, 1))
4203  {
4204  if (extra_lock)
4205  {
4206  mysql_unlock_tables(thd, extra_lock);
4207  extra_lock= 0;
4208  }
4209  drop_open_table(thd, table, create_table->db, create_table->table_name);
4210  table= 0;
4211  DBUG_RETURN(1);
4212  }
4213  if (extra_lock)
4214  {
4215  DBUG_ASSERT(m_plock == NULL);
4216 
4217  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
4218  m_plock= &m_lock;
4219  else
4220  m_plock= &thd->extra_lock;
4221 
4222  *m_plock= extra_lock;
4223  }
4224  /* Mark all fields that are given values */
4225  for (Field **f= field ; *f ; f++)
4226  bitmap_set_bit(table->write_set, (*f)->field_index);
4227 
4228  // Set up an empty bitmap of function defaults
4229  if (info.add_function_default_columns(table, table->write_set))
4230  DBUG_RETURN(1);
4231 
4232  table->next_number_field=table->found_next_number_field;
4233 
4234  restore_record(table,s->default_values); // Get empty record
4235  thd->cuted_fields=0;
4236 
4237  const enum_duplicates duplicate_handling= info.get_duplicate_handling();
4238  const bool ignore_errors= info.get_ignore_errors();
4239 
4240  if (ignore_errors || duplicate_handling != DUP_ERROR)
4241  table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
4242  if (duplicate_handling == DUP_REPLACE &&
4243  (!table->triggers || !table->triggers->has_delete_triggers()))
4244  table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
4245  if (duplicate_handling == DUP_UPDATE)
4246  table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
4247  if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4248  {
4249  table->file->ha_start_bulk_insert((ha_rows) 0);
4250  bulk_insert_started= true;
4251  }
4252  thd->abort_on_warning= (!ignore_errors && thd->is_strict_mode());
4253  if (check_that_all_fields_are_given_values(thd, table, table_list))
4254  DBUG_RETURN(1);
4255  table->mark_columns_needed_for_insert();
4256  table->file->extra(HA_EXTRA_WRITE_CACHE);
4257  DBUG_RETURN(0);
4258 }
4259 
4260 int
4261 select_create::binlog_show_create_table(TABLE **tables, uint count)
4262 {
4263  /*
4264  Note 1: In RBR mode, we generate a CREATE TABLE statement for the
4265  created table by calling store_create_info() (behaves as SHOW
4266  CREATE TABLE). In the event of an error, nothing should be
4267  written to the binary log, even if the table is non-transactional;
4268  therefore we pretend that the generated CREATE TABLE statement is
4269  for a transactional table. The event will then be put in the
4270  transaction cache, and any subsequent events (e.g., table-map
4271  events and binrow events) will also be put there. We can then use
4272  ha_autocommit_or_rollback() to either throw away the entire
4273  kaboodle of events, or write them to the binary log.
4274 
4275  We write the CREATE TABLE statement here and not in prepare()
4276  since there potentially are sub-selects or accesses to information
4277  schema that will do a close_thread_tables(), destroying the
4278  statement transaction cache.
4279  */
4280  DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
4281  DBUG_ASSERT(tables && *tables && count > 0);
4282 
4283  char buf[2048];
4284  String query(buf, sizeof(buf), system_charset_info);
4285  int result;
4286  TABLE_LIST tmp_table_list;
4287 
4288  memset(&tmp_table_list, 0, sizeof(tmp_table_list));
4289  tmp_table_list.table = *tables;
4290  query.length(0); // Have to zero it since constructor doesn't
4291 
4292  result= store_create_info(thd, &tmp_table_list, &query, create_info,
4293  /* show_database */ TRUE);
4294  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
4295 
4296  if (mysql_bin_log.is_open())
4297  {
4298  int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
4299  result= thd->binlog_query(THD::STMT_QUERY_TYPE,
4300  query.ptr(), query.length(),
4301  /* is_trans */ TRUE,
4302  /* direct */ FALSE,
4303  /* suppress_use */ FALSE,
4304  errcode);
4305  }
4306  return result;
4307 }
4308 
4309 void select_create::store_values(List<Item> &values)
4310 {
4311  const bool ignore_err= true;
4312  fill_record_n_invoke_before_triggers(thd, field, values, ignore_err,
4313  table->triggers, TRG_EVENT_INSERT);
4314 }
4315 
4316 
4317 void select_create::send_error(uint errcode,const char *err)
4318 {
4319  DBUG_ENTER("select_create::send_error");
4320 
4321  DBUG_PRINT("info",
4322  ("Current statement %s row-based",
4323  thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT"));
4324  DBUG_PRINT("info",
4325  ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
4326  (ulong) table,
4327  table && !table->s->tmp_table ? "is NOT" : "is"));
4328  /*
4329  This will execute any rollbacks that are necessary before writing
4330  the transcation cache.
4331 
4332  We disable the binary log since nothing should be written to the
4333  binary log. This disabling is important, since we potentially do
4334  a "roll back" of non-transactional tables by removing the table,
4335  and the actual rollback might generate events that should not be
4336  written to the binary log.
4337 
4338  */
4339  tmp_disable_binlog(thd);
4340  select_insert::send_error(errcode, err);
4341  reenable_binlog(thd);
4342 
4343  DBUG_VOID_RETURN;
4344 }
4345 
4346 
4347 bool select_create::send_eof()
4348 {
4349  /*
4350  The routine that writes the statement in the binary log
4351  is in select_insert::send_eof(). For that reason, we
4352  mark the flag at this point.
4353  */
4354  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
4355  thd->transaction.stmt.mark_created_temp_table();
4356 
4357  bool tmp=select_insert::send_eof();
4358  if (tmp)
4359  abort_result_set();
4360  else
4361  {
4362  /*
4363  Do an implicit commit at end of statement for non-temporary
4364  tables. This can fail, but we should unlock the table
4365  nevertheless.
4366  */
4367  if (!table->s->tmp_table)
4368  {
4369  trans_commit_stmt(thd);
4370  trans_commit_implicit(thd);
4371  }
4372 
4373  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4374  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4375  if (m_plock)
4376  {
4377  mysql_unlock_tables(thd, *m_plock);
4378  *m_plock= NULL;
4379  m_plock= NULL;
4380  }
4381  }
4382  return tmp;
4383 }
4384 
4385 
4386 void select_create::abort_result_set()
4387 {
4388  DBUG_ENTER("select_create::abort_result_set");
4389 
4390  /*
4391  In select_insert::abort_result_set() we roll back the statement, including
4392  truncating the transaction cache of the binary log. To do this, we
4393  pretend that the statement is transactional, even though it might
4394  be the case that it was not.
4395 
4396  We roll back the statement prior to deleting the table and prior
4397  to releasing the lock on the table, since there might be potential
4398  for failure if the rollback is executed after the drop or after
4399  unlocking the table.
4400 
4401  We also roll back the statement regardless of whether the creation
4402  of the table succeeded or not, since we need to reset the binary
4403  log state.
4404  */
4405  tmp_disable_binlog(thd);
4406  select_insert::abort_result_set();
4407  thd->transaction.stmt.reset_unsafe_rollback_flags();
4408  reenable_binlog(thd);
4409  /* possible error of writing binary log is ignored deliberately */
4410  (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
4411 
4412  if (m_plock)
4413  {
4414  mysql_unlock_tables(thd, *m_plock);
4415  *m_plock= NULL;
4416  m_plock= NULL;
4417  }
4418 
4419  if (table)
4420  {
4421  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4422  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4423  table->auto_increment_field_not_null= FALSE;
4424  drop_open_table(thd, table, create_table->db, create_table->table_name);
4425  table=0; // Safety
4426  }
4427  DBUG_VOID_RETURN;
4428 }