MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ha_ndbcluster_binlog.cc
1 /*
2  Copyright (c) 2006, 2013, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "ha_ndbcluster_glue.h"
19 
20 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
21 #include "ha_ndbcluster.h"
22 #include "ha_ndbcluster_connection.h"
23 #include "ndb_local_connection.h"
24 #include "ndb_thd.h"
25 #include "ndb_table_guard.h"
26 #include "ndb_global_schema_lock.h"
27 #include "ndb_global_schema_lock_guard.h"
28 
29 #include "rpl_injector.h"
30 #include "rpl_filter.h"
31 #if MYSQL_VERSION_ID > 50600
32 #include "rpl_slave.h"
33 #else
34 #include "slave.h"
35 #include "log_event.h"
36 #endif
37 #include "global_threads.h"
38 #include "ha_ndbcluster_binlog.h"
39 #include <ndbapi/NdbDictionary.hpp>
40 #include <ndbapi/ndb_cluster_connection.hpp>
41 
42 extern my_bool opt_ndb_log_orig;
43 extern my_bool opt_ndb_log_bin;
44 extern my_bool opt_ndb_log_update_as_write;
45 extern my_bool opt_ndb_log_updated_only;
46 extern my_bool opt_ndb_log_binlog_index;
47 extern my_bool opt_ndb_log_apply_status;
48 extern ulong opt_ndb_extra_logging;
49 extern st_ndb_slave_state g_ndb_slave_state;
50 
51 bool ndb_log_empty_epochs(void);
52 
53 /*
54  defines for cluster replication table names
55 */
56 #include "ha_ndbcluster_tables.h"
57 
58 #include "ndb_dist_priv_util.h"
59 
60 /*
61  Timeout for syncing schema events between
62  mysql servers, and between mysql server and the binlog
63 */
64 static const int DEFAULT_SYNC_TIMEOUT= 120;
65 
66 /*
67  Flag showing if the ndb injector thread is running, if so == 1
68  -1 if it was started but later stopped for some reason
69  0 if never started
70 */
71 static int ndb_binlog_thread_running= 0;
72 /*
73  Flag showing if the ndb binlog should be created, if so == TRUE
74  FALSE if not
75 */
76 my_bool ndb_binlog_running= FALSE;
77 static my_bool ndb_binlog_tables_inited= FALSE;
78 static my_bool ndb_binlog_is_ready= FALSE;
79 
80 bool
81 ndb_binlog_is_read_only(void)
82 {
83  if(!ndb_binlog_tables_inited)
84  {
85  /* the ndb_* system tables not setup yet */
86  return true;
87  }
88 
89  if (ndb_binlog_running && !ndb_binlog_is_ready)
90  {
91  /*
92  The binlog thread is supposed to write to binlog
93  but not ready (still initializing or has lost connection)
94  */
95  return true;
96  }
97  return false;
98 }
99 
100 /*
101  Global reference to the ndb injector thread THD oject
102 
103  Has one sole purpose, for setting the in_use table member variable
104  in get_share(...)
105 */
106 extern THD * injector_thd; // Declared in ha_ndbcluster.cc
107 
108 /*
109  Global reference to ndb injector thd object.
110 
111  Used mainly by the binlog index thread, but exposed to the client sql
112  thread for one reason; to setup the events operations for a table
113  to enable ndb injector thread receiving events.
114 
115  Must therefore always be used with a surrounding
116  pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
117 */
118 static Ndb *injector_ndb= 0;
119 static Ndb *schema_ndb= 0;
120 
121 static int ndbcluster_binlog_inited= 0;
122 /*
123  Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
124  Server main loop should call handlerton function:
125 
126  ndbcluster_hton->binlog_func ==
127  ndbcluster_binlog_func(...,BFN_BINLOG_END,...) ==
128  ndbcluster_binlog_end
129 
130  at shutdown, which sets the flag. And then server needs to wait for it
131  to complete. Otherwise binlog will not be complete.
132 
133  ndbcluster_hton->panic == ndbcluster_end() will not return until
134  ndb binlog is completed
135 */
136 static int ndbcluster_binlog_terminating= 0;
137 
138 /*
139  Mutex and condition used for interacting between client sql thread
140  and injector thread
141 */
142 pthread_t ndb_binlog_thread;
143 pthread_mutex_t injector_mutex;
144 pthread_cond_t injector_cond;
145 
146 /* NDB Injector thread (used for binlog creation) */
147 static ulonglong ndb_latest_applied_binlog_epoch= 0;
148 static ulonglong ndb_latest_handled_binlog_epoch= 0;
149 static ulonglong ndb_latest_received_binlog_epoch= 0;
150 
151 NDB_SHARE *ndb_apply_status_share= 0;
152 NDB_SHARE *ndb_schema_share= 0;
153 pthread_mutex_t ndb_schema_share_mutex;
154 
155 extern my_bool opt_log_slave_updates;
156 static my_bool g_ndb_log_slave_updates;
157 
158 /* Schema object distribution handling */
159 HASH ndb_schema_objects;
160 typedef struct st_ndb_schema_object {
161  pthread_mutex_t mutex;
162  char *key;
163  uint key_length;
164  uint use_count;
165  MY_BITMAP slock_bitmap;
166  uint32 slock[256/32]; // 256 bits for lock status of table
167  uint32 table_id;
168  uint32 table_version;
169 } NDB_SCHEMA_OBJECT;
170 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
171  my_bool create_if_not_exists,
172  my_bool have_lock);
173 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
174  bool have_lock);
175 
176 #ifndef DBUG_OFF
177 /* purecov: begin deadcode */
178 static void print_records(TABLE *table, const uchar *record)
179 {
180  for (uint j= 0; j < table->s->fields; j++)
181  {
182  char buf[40];
183  int pos= 0;
184  Field *field= table->field[j];
185  const uchar* field_ptr= field->ptr - table->record[0] + record;
186  int pack_len= field->pack_length();
187  int n= pack_len < 10 ? pack_len : 10;
188 
189  for (int i= 0; i < n && pos < 20; i++)
190  {
191  pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
192  }
193  buf[pos]= 0;
194  DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
195  }
196 }
197 /* purecov: end */
198 #else
199 #define print_records(a,b)
200 #endif
201 
202 
203 #ifndef DBUG_OFF
204 static void dbug_print_table(const char *info, TABLE *table)
205 {
206  if (table == 0)
207  {
208  DBUG_PRINT("info",("%s: (null)", info));
209  return;
210  }
211  DBUG_PRINT("info",
212  ("%s: %s.%s s->fields: %d "
213  "reclength: %lu rec_buff_length: %u record[0]: 0x%lx "
214  "record[1]: 0x%lx",
215  info,
216  table->s->db.str,
217  table->s->table_name.str,
218  table->s->fields,
219  table->s->reclength,
220  table->s->rec_buff_length,
221  (long) table->record[0],
222  (long) table->record[1]));
223 
224  for (unsigned int i= 0; i < table->s->fields; i++)
225  {
226  Field *f= table->field[i];
227  DBUG_PRINT("info",
228  ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
229  "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
230  i,
231  f->field_name,
232  (long) f->flags,
233  (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
234  (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
235  (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
236  (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
237  (f->flags & BLOB_FLAG) ? ",blob" : "",
238  (f->flags & BINARY_FLAG) ? ",binary" : "",
239  f->real_type(),
240  f->pack_length(),
241  (long) f->ptr, (int) (f->ptr - table->record[0]),
242  f->null_bit,
243  (long) f->null_ptr,
244  (int) ((uchar*) f->null_ptr - table->record[0])));
245  if (f->type() == MYSQL_TYPE_BIT)
246  {
247  Field_bit *g= (Field_bit*) f;
248  DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
249  "bit_ofs: %d bit_len: %u",
250  g->field_length, (long) g->bit_ptr,
251  (int) ((uchar*) g->bit_ptr -
252  table->record[0]),
253  g->bit_ofs, g->bit_len));
254  }
255  }
256 }
257 #else
258 #define dbug_print_table(a,b)
259 #endif
260 
261 
262 static inline void
263 print_warning_list(const char* prefix, THD* thd)
264 {
266  it(thd->get_stmt_da()->sql_conditions());
267 
268  const Sql_condition *err;
269  while ((err= it++))
270  {
271  sql_print_warning("%s: (%d)%s",
272  prefix,
273  err->get_sql_errno(),
274  err->get_message_text());
275  }
276 }
277 
278 
279 static void run_query(THD *thd, char *buf, char *end,
280  const int *no_print_error)
281 {
282  /*
283  NOTE! Don't use this function for new implementation, backward
284  compat. only
285  */
286 
287  Ndb_local_connection mysqld(thd);
288 
289  /*
290  Run the query, suppress some errors from being printed
291  to log and ignore any error returned
292  */
293  (void)mysqld.raw_run_query(buf, (end - buf),
294  no_print_error);
295 }
296 
297 static void
298 ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
299 {
300  DBUG_ENTER("ndbcluster_binlog_close_table");
301  Ndb_event_data *event_data= share->event_data;
302  if (event_data)
303  {
304  delete event_data;
305  share->event_data= 0;
306  }
307  DBUG_VOID_RETURN;
308 }
309 
310 
311 /*
312  Open a shadow table for the table given in share.
313  - The shadow table is (mainly) used when an event is
314  recieved from the data nodes which need to be written
315  to the binlog injector.
316 */
317 
318 static int
319 ndb_binlog_open_shadow_table(THD *thd, NDB_SHARE *share)
320 {
321  int error;
322  DBUG_ASSERT(share->event_data == 0);
323  Ndb_event_data *event_data= share->event_data= new Ndb_event_data(share);
324  DBUG_ENTER("ndb_binlog_open_shadow_table");
325 
326  MEM_ROOT **root_ptr=
327  my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
328  MEM_ROOT *old_root= *root_ptr;
329  init_sql_alloc(&event_data->mem_root, 1024, 0);
330  *root_ptr= &event_data->mem_root;
331 
332  TABLE_SHARE *shadow_table_share=
333  (TABLE_SHARE*)alloc_root(&event_data->mem_root, sizeof(TABLE_SHARE));
334  TABLE *shadow_table=
335  (TABLE*)alloc_root(&event_data->mem_root, sizeof(TABLE));
336 
337  init_tmp_table_share(thd, shadow_table_share,
338  share->db, 0,
339  share->table_name,
340  share->key);
341  if ((error= open_table_def(thd, shadow_table_share, 0)) ||
342  (error= open_table_from_share(thd, shadow_table_share, "", 0,
343  (uint) (OPEN_FRM_FILE_ONLY | DELAYED_OPEN | READ_ALL),
344  0, shadow_table,
345 #ifdef NDB_WITHOUT_ONLINE_ALTER
346  false
347 #else
348  OTM_OPEN
349 #endif
350  )))
351  {
352  DBUG_PRINT("error", ("failed to open shadow table, error: %d my_errno: %d",
353  error, my_errno));
354  free_table_share(shadow_table_share);
355  delete event_data;
356  share->event_data= 0;
357  *root_ptr= old_root;
358  DBUG_RETURN(error);
359  }
360  event_data->shadow_table= shadow_table;
361 
363  assign_new_table_id(shadow_table_share);
365 
366  shadow_table->in_use= injector_thd;
367 
368  shadow_table->s->db.str= share->db;
369  shadow_table->s->db.length= strlen(share->db);
370  shadow_table->s->table_name.str= share->table_name;
371  shadow_table->s->table_name.length= strlen(share->table_name);
372  /* We can't use 'use_all_columns()' as the file object is not setup yet */
373  shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set,
374  &shadow_table->s->all_set);
375 #ifndef DBUG_OFF
376  dbug_print_table("table", shadow_table);
377 #endif
378  *root_ptr= old_root;
379  DBUG_RETURN(0);
380 }
381 
382 
383 /*
384  Initialize the binlog part of the NDB_SHARE
385 */
386 int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *_table)
387 {
388  MEM_ROOT *mem_root= &share->mem_root;
389  int do_event_op= ndb_binlog_running;
390  int error= 0;
391  DBUG_ENTER("ndbcluster_binlog_init_share");
392 
393  share->connect_count= g_ndb_cluster_connection->get_connect_count();
394 #ifdef HAVE_NDB_BINLOG
395  share->m_cfn_share= NULL;
396 #endif
397 
398  share->op= 0;
399  share->new_op= 0;
400  share->event_data= 0;
401 
402  if (!ndb_schema_share &&
403  strcmp(share->db, NDB_REP_DB) == 0 &&
404  strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
405  do_event_op= 1;
406  else if (!ndb_apply_status_share &&
407  strcmp(share->db, NDB_REP_DB) == 0 &&
408  strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
409  do_event_op= 1;
410 
411  if (Ndb_dist_priv_util::is_distributed_priv_table(share->db,
412  share->table_name))
413  {
414  do_event_op= 0;
415  }
416 
417  {
418  int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
419  share->subscriber_bitmap= (MY_BITMAP*)
420  alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
421  for (i= 0; i < no_nodes; i++)
422  {
423  bitmap_init(&share->subscriber_bitmap[i],
424  (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
425  max_ndb_nodes, FALSE);
426  bitmap_clear_all(&share->subscriber_bitmap[i]);
427  }
428  }
429 
430  if (!do_event_op)
431  {
432  if (_table)
433  {
434  if (_table->s->primary_key == MAX_KEY)
435  share->flags|= NSF_HIDDEN_PK;
436  if (_table->s->blob_fields != 0)
437  share->flags|= NSF_BLOB_FLAG;
438  }
439  else
440  {
441  share->flags|= NSF_NO_BINLOG;
442  }
443  DBUG_RETURN(error);
444  }
445  while (1)
446  {
447  if ((error= ndb_binlog_open_shadow_table(thd, share)))
448  break;
449  if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
450  share->flags|= NSF_HIDDEN_PK;
451  if (share->event_data->shadow_table->s->blob_fields != 0)
452  share->flags|= NSF_BLOB_FLAG;
453  break;
454  }
455  DBUG_RETURN(error);
456 }
457 
458 /*****************************************************************
459  functions called from master sql client threads
460 ****************************************************************/
461 
462 /*
463  called in mysql_show_binlog_events and reset_logs to make sure we wait for
464  all events originating from this mysql server to arrive in the binlog
465 
466  Wait for the last epoch in which the last transaction is a part of.
467 
468  Wait a maximum of 30 seconds.
469 */
470 static void ndbcluster_binlog_wait(THD *thd)
471 {
472  if (ndb_binlog_running)
473  {
474  DBUG_ENTER("ndbcluster_binlog_wait");
475  ulonglong wait_epoch= ndb_get_latest_trans_gci();
476  /*
477  cluster not connected or no transactions done
478  so nothing to wait for
479  */
480  if (!wait_epoch)
481  DBUG_VOID_RETURN;
482  const char *save_info= thd ? thd->proc_info : 0;
483  int count= 30;
484  if (thd)
485  thd->proc_info= "Waiting for ndbcluster binlog update to "
486  "reach current position";
487  pthread_mutex_lock(&injector_mutex);
488  while (!(thd && thd->killed) && count && ndb_binlog_running &&
489  (ndb_latest_handled_binlog_epoch == 0 ||
490  ndb_latest_handled_binlog_epoch < wait_epoch))
491  {
492  count--;
493  struct timespec abstime;
494  set_timespec(abstime, 1);
495  pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
496  }
497  pthread_mutex_unlock(&injector_mutex);
498  if (thd)
499  thd->proc_info= save_info;
500  DBUG_VOID_RETURN;
501  }
502 }
503 
504 /*
505  Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
506 */
507 static int ndbcluster_reset_logs(THD *thd)
508 {
509  if (!ndb_binlog_running)
510  return 0;
511 
512  /* only reset master should reset logs */
513  if (!((thd->lex->sql_command == SQLCOM_RESET) &&
514  (thd->lex->type & REFRESH_MASTER)))
515  return 0;
516 
517  DBUG_ENTER("ndbcluster_reset_logs");
518 
519  /*
520  Wait for all events originating from this mysql server has
521  reached the binlog before continuing to reset
522  */
523  ndbcluster_binlog_wait(thd);
524 
525  /*
526  Truncate mysql.ndb_binlog_index table, if table does not
527  exist ignore the error as it is a "consistent" behavior
528  */
529  Ndb_local_connection mysqld(thd);
530  const bool ignore_no_such_table = true;
531  if(mysqld.truncate_table(STRING_WITH_LEN("mysql"),
532  STRING_WITH_LEN("ndb_binlog_index"),
533  ignore_no_such_table))
534  {
535  // Failed to truncate table
536  DBUG_RETURN(1);
537  }
538  DBUG_RETURN(0);
539 }
540 
541 /*
542  Setup THD object
543  'Inspired' from ha_ndbcluster.cc : ndb_util_thread_func
544 */
545 static THD *
546 setup_thd(char * stackptr)
547 {
548  DBUG_ENTER("setup_thd");
549  THD * thd= new THD; /* note that contructor of THD uses DBUG_ */
550  if (thd == 0)
551  {
552  DBUG_RETURN(0);
553  }
554  THD_CHECK_SENTRY(thd);
555 
556  thd->thread_id= 0;
557  thd->thread_stack= stackptr; /* remember where our stack is */
558  if (thd->store_globals())
559  {
560  thd->cleanup();
561  delete thd;
562  DBUG_RETURN(0);
563  }
564 
565  lex_start(thd);
566 
567  thd->init_for_queries();
568  thd_set_command(thd, COM_DAEMON);
569  thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
570 #ifndef NDB_THD_HAS_NO_VERSION
571  thd->version= refresh_version;
572 #endif
573  thd->client_capabilities= 0;
574  thd->lex->start_transaction_opt= 0;
575  thd->security_ctx->skip_grants();
576 
577  CHARSET_INFO *charset_connection= get_charset_by_csname("utf8",
578  MY_CS_PRIMARY,
579  MYF(MY_WME));
580  thd->variables.character_set_client= charset_connection;
581  thd->variables.character_set_results= charset_connection;
582  thd->variables.collation_connection= charset_connection;
583  thd->update_charset();
584  DBUG_RETURN(thd);
585 }
586 
587 /*
588  Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
589  is removed
590 */
591 
592 static int
593 ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
594 {
595  int error = 0;
596  THD * save_thd= thd;
597  DBUG_ENTER("ndbcluster_binlog_index_purge_file");
598  DBUG_PRINT("enter", ("file: %s", file));
599 
600  if (!ndb_binlog_running || (thd && thd->slave_thread))
601  DBUG_RETURN(0);
602 
607  if (thd == 0)
608  {
609  if ((thd = setup_thd((char*)&save_thd)) == 0)
610  {
616  sql_print_warning("NDB: Unable to purge "
617  NDB_REP_DB "." NDB_REP_TABLE
618  " File=%s (failed to setup thd)", file);
619  DBUG_RETURN(0);
620  }
621  }
622 
623  /*
624  delete rows from mysql.ndb_binlog_index table for the given
625  filename, if table does not exist ignore the error as it
626  is a "consistent" behavior
627  */
628  Ndb_local_connection mysqld(thd);
629  const bool ignore_no_such_table = true;
630  if(mysqld.delete_rows(STRING_WITH_LEN("mysql"),
631  STRING_WITH_LEN("ndb_binlog_index"),
632  ignore_no_such_table,
633  "File='", file, "'", NULL))
634  {
635  // Failed to delete rows from table
636  error = 1;
637  }
638 
639  if (save_thd == 0)
640  {
641  thd->cleanup();
642  delete thd;
643  }
644 
645  DBUG_RETURN(error);
646 }
647 
648 
649 #ifndef NDB_WITHOUT_DIST_PRIV
650 // Determine if privilege tables are distributed, ie. stored in NDB
651 static bool
652 priv_tables_are_in_ndb(THD *thd)
653 {
654  bool distributed= false;
655  Ndb_dist_priv_util dist_priv;
656  DBUG_ENTER("ndbcluster_distributed_privileges");
657 
658  Ndb *ndb= check_ndb_in_thd(thd);
659  if (!ndb)
660  DBUG_RETURN(false); // MAGNUS, error message?
661 
662  if (ndb->setDatabaseName(dist_priv.database()) != 0)
663  DBUG_RETURN(false);
664 
665  const char* table_name;
666  while((table_name= dist_priv.iter_next_table()))
667  {
668  DBUG_PRINT("info", ("table_name: %s", table_name));
669  Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
670  const NDBTAB *ndbtab= ndbtab_g.get_table();
671  if (ndbtab)
672  {
673  distributed= true;
674  }
675  else if (distributed)
676  {
677  sql_print_error("NDB: Inconsistency detected in distributed "
678  "privilege tables. Table '%s.%s' is not distributed",
679  dist_priv.database(), table_name);
680  DBUG_RETURN(false);
681  }
682  }
683  DBUG_RETURN(distributed);
684 }
685 #endif
686 
687 static void
688 ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binlog_command,
689  const char *query, uint query_length,
690  const char *db, const char *table_name)
691 {
692  DBUG_ENTER("ndbcluster_binlog_log_query");
693  DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
694  db, table_name, query));
695  enum SCHEMA_OP_TYPE type;
696  int log= 0;
697  uint32 table_id= 0, table_version= 0;
698  /*
699  Since databases aren't real ndb schema object
700  they don't have any id/version
701 
702  But since that id/version is used to make sure that event's on SCHEMA_TABLE
703  is correct, we set random numbers
704  */
705  table_id = (uint32)rand();
706  table_version = (uint32)rand();
707  switch (binlog_command)
708  {
709  case LOGCOM_CREATE_TABLE:
710  type= SOT_CREATE_TABLE;
711  DBUG_ASSERT(FALSE);
712  break;
713  case LOGCOM_ALTER_TABLE:
714  type= SOT_ALTER_TABLE_COMMIT;
715  //log= 1;
716  break;
717  case LOGCOM_RENAME_TABLE:
718  type= SOT_RENAME_TABLE;
719  DBUG_ASSERT(FALSE);
720  break;
721  case LOGCOM_DROP_TABLE:
722  type= SOT_DROP_TABLE;
723  DBUG_ASSERT(FALSE);
724  break;
725  case LOGCOM_CREATE_DB:
726  type= SOT_CREATE_DB;
727  log= 1;
728  break;
729  case LOGCOM_ALTER_DB:
730  type= SOT_ALTER_DB;
731  log= 1;
732  break;
733  case LOGCOM_DROP_DB:
734  type= SOT_DROP_DB;
735  DBUG_ASSERT(FALSE);
736  break;
737 #ifndef NDB_WITHOUT_DIST_PRIV
738  case LOGCOM_CREATE_USER:
739  type= SOT_CREATE_USER;
740  if (priv_tables_are_in_ndb(thd))
741  {
742  DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
743  log= 1;
744  }
745  break;
746  case LOGCOM_DROP_USER:
747  type= SOT_DROP_USER;
748  if (priv_tables_are_in_ndb(thd))
749  {
750  DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
751  log= 1;
752  }
753  break;
754  case LOGCOM_RENAME_USER:
755  type= SOT_RENAME_USER;
756  if (priv_tables_are_in_ndb(thd))
757  {
758  DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
759  log= 1;
760  }
761  break;
762  case LOGCOM_GRANT:
763  type= SOT_GRANT;
764  if (priv_tables_are_in_ndb(thd))
765  {
766  DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
767  log= 1;
768  }
769  break;
770  case LOGCOM_REVOKE:
771  type= SOT_REVOKE;
772  if (priv_tables_are_in_ndb(thd))
773  {
774  DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
775  log= 1;
776  }
777  break;
778 #endif
779  }
780  if (log)
781  {
782  ndbcluster_log_schema_op(thd, query, query_length,
783  db, table_name, table_id, table_version, type,
784  NULL, NULL);
785  }
786  DBUG_VOID_RETURN;
787 }
788 
789 
790 /*
791  End use of the NDB Cluster binlog
792  - wait for binlog thread to shutdown
793 */
794 
795 int ndbcluster_binlog_end(THD *thd)
796 {
797  DBUG_ENTER("ndbcluster_binlog_end");
798 
799  if (ndb_util_thread_running > 0)
800  {
801  /*
802  Wait for util thread to die (as this uses the injector mutex)
803  There is a very small change that ndb_util_thread dies and the
804  following mutex is freed before it's accessed. This shouldn't
805  however be a likely case as the ndbcluster_binlog_end is supposed to
806  be called before ndb_cluster_end().
807  */
808  sql_print_information("Stopping Cluster Utility thread");
809  pthread_mutex_lock(&LOCK_ndb_util_thread);
810  /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
811  ndb_util_thread_running++;
812  ndbcluster_terminating= 1;
813  pthread_cond_signal(&COND_ndb_util_thread);
814  while (ndb_util_thread_running > 1)
815  pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
816  ndb_util_thread_running--;
817  pthread_mutex_unlock(&LOCK_ndb_util_thread);
818  }
819 
820  if (ndb_index_stat_thread_running > 0)
821  {
822  /*
823  Index stats thread blindly imitates util thread. Following actually
824  fixes some "[Warning] Plugin 'ndbcluster' will be forced to shutdown".
825  */
826  sql_print_information("Stopping Cluster Index Stats thread");
827  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
828  /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
829  ndb_index_stat_thread_running++;
830  ndbcluster_terminating= 1;
831  pthread_cond_signal(&COND_ndb_index_stat_thread);
832  while (ndb_index_stat_thread_running > 1)
833  pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
834  ndb_index_stat_thread_running--;
835  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
836  }
837 
838  if (ndbcluster_binlog_inited)
839  {
840  ndbcluster_binlog_inited= 0;
841  if (ndb_binlog_thread_running)
842  {
843  /* wait for injector thread to finish */
844  ndbcluster_binlog_terminating= 1;
845  pthread_mutex_lock(&injector_mutex);
846  pthread_cond_signal(&injector_cond);
847  while (ndb_binlog_thread_running > 0)
848  pthread_cond_wait(&injector_cond, &injector_mutex);
849  pthread_mutex_unlock(&injector_mutex);
850  }
851  pthread_mutex_destroy(&injector_mutex);
852  pthread_cond_destroy(&injector_cond);
853  pthread_mutex_destroy(&ndb_schema_share_mutex);
854  }
855 
856  DBUG_RETURN(0);
857 }
858 
859 /*****************************************************************
860  functions called from slave sql client threads
861 ****************************************************************/
862 static void ndbcluster_reset_slave(THD *thd)
863 {
864  int error = 0;
865  if (!ndb_binlog_running)
866  return;
867 
868  DBUG_ENTER("ndbcluster_reset_slave");
869 
870  /*
871  delete all rows from mysql.ndb_apply_status table
872  - if table does not exist ignore the error as it
873  is a consistent behavior
874  */
875  Ndb_local_connection mysqld(thd);
876  const bool ignore_no_such_table = true;
877  if(mysqld.delete_rows(STRING_WITH_LEN("mysql"),
878  STRING_WITH_LEN("ndb_apply_status"),
879  ignore_no_such_table,
880  NULL))
881  {
882  // Failed to delete rows from table
883  error = 1;
884  }
885 
886  g_ndb_slave_state.atResetSlave();
887 
888  // pending fix for bug#59844 will make this function return int
889  DBUG_VOID_RETURN;
890 }
891 
892 /*
893  Initialize the binlog part of the ndb handlerton
894 */
895 
901 static bool ndbcluster_flush_logs(handlerton *hton)
902 {
903  ndbcluster_binlog_wait(current_thd);
904  return FALSE;
905 }
906 
907 
908 static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
909  enum_binlog_func fn,
910  void *arg)
911 {
912  DBUG_ENTER("ndbcluster_binlog_func");
913  int res= 0;
914  switch(fn)
915  {
916  case BFN_RESET_LOGS:
917  res= ndbcluster_reset_logs(thd);
918  break;
919  case BFN_RESET_SLAVE:
920  ndbcluster_reset_slave(thd);
921  break;
922  case BFN_BINLOG_WAIT:
923  ndbcluster_binlog_wait(thd);
924  break;
925  case BFN_BINLOG_END:
926  res= ndbcluster_binlog_end(thd);
927  break;
928  case BFN_BINLOG_PURGE_FILE:
929  res= ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
930  break;
931  }
932  DBUG_RETURN(res);
933 }
934 
935 void ndbcluster_binlog_init_handlerton()
936 {
937  handlerton *h= ndbcluster_hton;
938  h->flush_logs= ndbcluster_flush_logs;
939  h->binlog_func= ndbcluster_binlog_func;
940  h->binlog_log_query= ndbcluster_binlog_log_query;
941 }
942 
943 
944 /*
945  Convert db and table name into a key to use for searching
946  the ndbcluster_open_tables hash
947 */
948 static size_t
949 ndb_open_tables__create_key(char* key_buf, size_t key_buf_length,
950  const char* db, size_t db_length,
951  const char* table, size_t table_length)
952 {
953  size_t key_length = my_snprintf(key_buf, key_buf_length,
954  "./%*s/%*s", db_length, db,
955  table_length, table) - 1;
956  assert(key_length > 0);
957  assert(key_length < key_buf_length);
958 
959  return key_length;
960 }
961 
962 
963 /*
964  Check if table with given name is open, ie. is
965  in ndbcluster_open_tables hash
966 */
967 static bool
968 ndb_open_tables__is_table_open(const char* db, size_t db_length,
969  const char* table, size_t table_length)
970 {
971  char key[FN_REFLEN + 1];
972  size_t key_length = ndb_open_tables__create_key(key, sizeof(key),
973  db, db_length,
974  table, table_length);
975  DBUG_ENTER("ndb_open_tables__is_table_open");
976  DBUG_PRINT("enter", ("db: '%s', table: '%s', key: '%s'",
977  db, table, key));
978 
979  pthread_mutex_lock(&ndbcluster_mutex);
980  bool result = my_hash_search(&ndbcluster_open_tables,
981  (const uchar*)key,
982  key_length) != NULL;
983  pthread_mutex_unlock(&ndbcluster_mutex);
984 
985  DBUG_PRINT("exit", ("result: %d", result));
986  DBUG_RETURN(result);
987 }
988 
989 
990 static bool
991 ndbcluster_check_ndb_schema_share()
992 {
993  return ndb_open_tables__is_table_open(STRING_WITH_LEN("mysql"),
994  STRING_WITH_LEN("ndb_schema"));
995 }
996 
997 
998 static bool
999 ndbcluster_check_ndb_apply_status_share()
1000 {
1001  return ndb_open_tables__is_table_open(STRING_WITH_LEN("mysql"),
1002  STRING_WITH_LEN("ndb_apply_status"));
1003 }
1004 
1005 
1006 static bool
1007 create_cluster_sys_table(THD *thd, const char* db, size_t db_length,
1008  const char* table, size_t table_length,
1009  const char* create_definitions,
1010  const char* create_options)
1011 {
1012  if (ndb_open_tables__is_table_open(db, db_length, table, table_length))
1013  return false;
1014 
1015  if (g_ndb_cluster_connection->get_no_ready() <= 0)
1016  return false;
1017 
1018  if (opt_ndb_extra_logging)
1019  sql_print_information("NDB: Creating %s.%s", db, table);
1020 
1021  Ndb_local_connection mysqld(thd);
1022 
1023  /*
1024  Check if table exists in MySQL "dictionary"(i.e on disk)
1025  if so, remove it since there is none in Ndb
1026  */
1027  {
1028  char path[FN_REFLEN + 1];
1029  build_table_filename(path, sizeof(path) - 1,
1030  db, table, reg_ext, 0);
1031  if (my_delete(path, MYF(0)) == 0)
1032  {
1033  /*
1034  The .frm file existed and was deleted from disk.
1035  It's possible that someone has tried to use it and thus
1036  it might have been inserted in the table definition cache.
1037  It must be flushed to avoid that it exist only in the
1038  table definition cache.
1039  */
1040  if (opt_ndb_extra_logging)
1041  sql_print_information("NDB: Flushing %s.%s", db, table);
1042 
1043  /* Flush mysql.ndb_apply_status table, ignore all errors */
1044  (void)mysqld.flush_table(db, db_length,
1045  table, table_length);
1046  }
1047  }
1048 
1049  const bool create_if_not_exists = true;
1050  const bool res = mysqld.create_sys_table(db, db_length,
1051  table, table_length,
1052  create_if_not_exists,
1053  create_definitions,
1054  create_options);
1055  return res;
1056 }
1057 
1058 
1059 static bool
1060 ndb_apply_table__create(THD *thd)
1061 {
1062  DBUG_ENTER("ndb_apply_table__create");
1063 
1064  /* NOTE! Updating this table schema must be reflected in ndb_restore */
1065  const bool res =
1066  create_cluster_sys_table(thd,
1067  STRING_WITH_LEN("mysql"),
1068  STRING_WITH_LEN("ndb_apply_status"),
1069  // table_definition
1070  "server_id INT UNSIGNED NOT NULL,"
1071  "epoch BIGINT UNSIGNED NOT NULL, "
1072  "log_name VARCHAR(255) BINARY NOT NULL, "
1073  "start_pos BIGINT UNSIGNED NOT NULL, "
1074  "end_pos BIGINT UNSIGNED NOT NULL, "
1075  "PRIMARY KEY USING HASH (server_id)",
1076  // table_options
1077  "ENGINE=NDB CHARACTER SET latin1");
1078  DBUG_RETURN(res);
1079 }
1080 
1081 
1082 static bool
1083 ndb_schema_table__create(THD *thd)
1084 {
1085  DBUG_ENTER("ndb_schema_table__create");
1086 
1087  /* NOTE! Updating this table schema must be reflected in ndb_restore */
1088  const bool res =
1089  create_cluster_sys_table(thd,
1090  STRING_WITH_LEN("mysql"),
1091  STRING_WITH_LEN("ndb_schema"),
1092  // table_definition
1093  "db VARBINARY(63) NOT NULL,"
1094  "name VARBINARY(63) NOT NULL,"
1095  "slock BINARY(32) NOT NULL,"
1096  "query BLOB NOT NULL,"
1097  "node_id INT UNSIGNED NOT NULL,"
1098  "epoch BIGINT UNSIGNED NOT NULL,"
1099  "id INT UNSIGNED NOT NULL,"
1100  "version INT UNSIGNED NOT NULL,"
1101  "type INT UNSIGNED NOT NULL,"
1102  "PRIMARY KEY USING HASH (db,name)",
1103  // table_options
1104  "ENGINE=NDB CHARACTER SET latin1");
1105  DBUG_RETURN(res);
1106 }
1107 
1108 class Thd_ndb_options_guard
1109 {
1110 public:
1111  Thd_ndb_options_guard(Thd_ndb *thd_ndb)
1112  : m_val(thd_ndb->options), m_save_val(thd_ndb->options) {}
1113  ~Thd_ndb_options_guard() { m_val= m_save_val; }
1114  void set(uint32 flag) { m_val|= flag; }
1115 private:
1116  uint32 &m_val;
1117  uint32 m_save_val;
1118 };
1119 
1120 extern int ndb_setup_complete;
1121 extern pthread_cond_t COND_ndb_setup_complete;
1122 
1123 /*
1124  ndb_notify_tables_writable
1125 
1126  Called to notify any waiting threads that Ndb tables are
1127  now writable
1128 */
1129 static void ndb_notify_tables_writable()
1130 {
1131  pthread_mutex_lock(&ndbcluster_mutex);
1132  ndb_setup_complete= 1;
1133  pthread_cond_broadcast(&COND_ndb_setup_complete);
1134  pthread_mutex_unlock(&ndbcluster_mutex);
1135 }
1136 
1137 /*
1138  Ndb has no representation of the database schema objects.
1139  The mysql.ndb_schema table contains the latest schema operations
1140  done via a mysqld, and thus reflects databases created/dropped/altered
1141  while a mysqld was disconnected. This function tries to recover
1142  the correct state w.r.t created databases using the information in
1143  that table.
1144 
1145 
1146 */
1147 static int ndbcluster_find_all_databases(THD *thd)
1148 {
1149  Ndb *ndb= check_ndb_in_thd(thd);
1150  Thd_ndb *thd_ndb= get_thd_ndb(thd);
1151  Thd_ndb_options_guard thd_ndb_options(thd_ndb);
1152  NDBDICT *dict= ndb->getDictionary();
1153  NdbTransaction *trans= NULL;
1154  NdbError ndb_error;
1155  int retries= 100;
1156  int retry_sleep= 30; /* 30 milliseconds, transaction */
1157  DBUG_ENTER("ndbcluster_find_all_databases");
1158 
1159  /*
1160  Function should only be called while ndbcluster_global_schema_lock
1161  is held, to ensure that ndb_schema table is not being updated while
1162  scanning.
1163  */
1164  if (!thd_ndb->has_required_global_schema_lock("ndbcluster_find_all_databases"))
1165  DBUG_RETURN(1);
1166 
1167  ndb->setDatabaseName(NDB_REP_DB);
1168  thd_ndb_options.set(TNO_NO_LOG_SCHEMA_OP);
1169  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
1170  while (1)
1171  {
1172  char db_buffer[FN_REFLEN];
1173  char *db= db_buffer+1;
1174  char name[FN_REFLEN];
1175  char query[64000];
1176  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1177  const NDBTAB *ndbtab= ndbtab_g.get_table();
1178  NdbScanOperation *op;
1179  NdbBlob *query_blob_handle;
1180  int r= 0;
1181  if (ndbtab == NULL)
1182  {
1183  ndb_error= dict->getNdbError();
1184  goto error;
1185  }
1186  trans= ndb->startTransaction();
1187  if (trans == NULL)
1188  {
1189  ndb_error= ndb->getNdbError();
1190  goto error;
1191  }
1192  op= trans->getNdbScanOperation(ndbtab);
1193  if (op == NULL)
1194  {
1195  ndb_error= trans->getNdbError();
1196  goto error;
1197  }
1198 
1200  NdbScanOperation::SF_TupScan, 1);
1201 
1202  r|= op->getValue("db", db_buffer) == NULL;
1203  r|= op->getValue("name", name) == NULL;
1204  r|= (query_blob_handle= op->getBlobHandle("query")) == NULL;
1205  r|= query_blob_handle->getValue(query, sizeof(query));
1206 
1207  if (r)
1208  {
1209  ndb_error= op->getNdbError();
1210  goto error;
1211  }
1212 
1213  if (trans->execute(NdbTransaction::NoCommit))
1214  {
1215  ndb_error= trans->getNdbError();
1216  goto error;
1217  }
1218 
1219  while ((r= op->nextResult()) == 0)
1220  {
1221  unsigned db_len= db_buffer[0];
1222  unsigned name_len= name[0];
1223  /*
1224  name_len == 0 means no table name, hence the row
1225  is for a database
1226  */
1227  if (db_len > 0 && name_len == 0)
1228  {
1229  /* database found */
1230  db[db_len]= 0;
1231 
1232  /* find query */
1233  Uint64 query_length= 0;
1234  if (query_blob_handle->getLength(query_length))
1235  {
1236  ndb_error= query_blob_handle->getNdbError();
1237  goto error;
1238  }
1239  query[query_length]= 0;
1240  build_table_filename(name, sizeof(name), db, "", "", 0);
1241  int database_exists= !my_access(name, F_OK);
1242  if (strncasecmp("CREATE", query, 6) == 0)
1243  {
1244  /* Database should exist */
1245  if (!database_exists)
1246  {
1247  /* create missing database */
1248  sql_print_information("NDB: Discovered missing database '%s'", db);
1249  const int no_print_error[1]= {0};
1250  run_query(thd, query, query + query_length,
1251  no_print_error);
1252  }
1253  }
1254  else if (strncasecmp("ALTER", query, 5) == 0)
1255  {
1256  /* Database should exist */
1257  if (!database_exists)
1258  {
1259  /* create missing database */
1260  sql_print_information("NDB: Discovered missing database '%s'", db);
1261  const int no_print_error[1]= {0};
1262  name_len= my_snprintf(name, sizeof(name), "CREATE DATABASE %s", db);
1263  run_query(thd, name, name + name_len,
1264  no_print_error);
1265  run_query(thd, query, query + query_length,
1266  no_print_error);
1267  }
1268  }
1269  else if (strncasecmp("DROP", query, 4) == 0)
1270  {
1271  /* Database should not exist */
1272  if (database_exists)
1273  {
1274  /* drop missing database */
1275  sql_print_information("NDB: Discovered reamining database '%s'", db);
1276  }
1277  }
1278  }
1279  }
1280  if (r == -1)
1281  {
1282  ndb_error= op->getNdbError();
1283  goto error;
1284  }
1285  ndb->closeTransaction(trans);
1286  trans= NULL;
1287  DBUG_RETURN(0); // success
1288  error:
1289  if (trans)
1290  {
1291  ndb->closeTransaction(trans);
1292  trans= NULL;
1293  }
1294  if (ndb_error.status == NdbError::TemporaryError && !thd->killed)
1295  {
1296  if (retries--)
1297  {
1298  sql_print_warning("NDB: ndbcluster_find_all_databases retry: %u - %s",
1299  ndb_error.code,
1300  ndb_error.message);
1301  do_retry_sleep(retry_sleep);
1302  continue; // retry
1303  }
1304  }
1305  if (!thd->killed)
1306  {
1307  sql_print_error("NDB: ndbcluster_find_all_databases fail: %u - %s",
1308  ndb_error.code,
1309  ndb_error.message);
1310  }
1311 
1312  DBUG_RETURN(1); // not temp error or too many retries
1313  }
1314 }
1315 
1316 bool
1317 ndb_binlog_setup(THD *thd)
1318 {
1319  if (ndb_binlog_tables_inited)
1320  return true; // Already setup -> OK
1321 
1322  Ndb_global_schema_lock_guard global_schema_lock_guard(thd);
1323  if (global_schema_lock_guard.lock(false, false))
1324  return false;
1325  if (!ndb_schema_share &&
1326  ndbcluster_check_ndb_schema_share() == 0)
1327  {
1328  ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
1329  if (!ndb_schema_share)
1330  {
1331  ndb_schema_table__create(thd);
1332  // always make sure we create the 'schema' first
1333  if (!ndb_schema_share)
1334  return false;
1335  }
1336  }
1337  if (!ndb_apply_status_share &&
1338  ndbcluster_check_ndb_apply_status_share() == 0)
1339  {
1340  ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
1341  if (!ndb_apply_status_share)
1342  {
1343  ndb_apply_table__create(thd);
1344  if (!ndb_apply_status_share)
1345  return false;
1346  }
1347  }
1348 
1349  if (ndbcluster_find_all_databases(thd))
1350  {
1351  return false;
1352  }
1353 
1354  if (!ndbcluster_find_all_files(thd))
1355  {
1356  ndb_binlog_tables_inited= TRUE;
1357  if (ndb_binlog_tables_inited &&
1358  ndb_binlog_running && ndb_binlog_is_ready)
1359  {
1360  if (opt_ndb_extra_logging)
1361  sql_print_information("NDB Binlog: ndb tables writable");
1362  close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE);
1363 
1364  /*
1365  Signal any waiting thread that ndb table setup is
1366  now complete
1367  */
1368  ndb_notify_tables_writable();
1369  }
1370  /* Signal injector thread that all is setup */
1371  pthread_cond_signal(&injector_cond);
1372 
1373  return true; // Setup completed -> OK
1374  }
1375  return false;
1376 }
1377 
1378 /*
1379  Defines and struct for schema table.
1380  Should reflect table definition above.
1381 */
1382 #define SCHEMA_DB_I 0u
1383 #define SCHEMA_NAME_I 1u
1384 #define SCHEMA_SLOCK_I 2u
1385 #define SCHEMA_QUERY_I 3u
1386 #define SCHEMA_NODE_ID_I 4u
1387 #define SCHEMA_EPOCH_I 5u
1388 #define SCHEMA_ID_I 6u
1389 #define SCHEMA_VERSION_I 7u
1390 #define SCHEMA_TYPE_I 8u
1391 #define SCHEMA_SIZE 9u
1392 #define SCHEMA_SLOCK_SIZE 32u
1393 
1394 struct Cluster_schema
1395 {
1396  uchar db_length;
1397  char db[64];
1398  uchar name_length;
1399  char name[64];
1400  uchar slock_length;
1401  uint32 slock[SCHEMA_SLOCK_SIZE/4];
1402  unsigned short query_length;
1403  char *query;
1404  Uint64 epoch;
1405  uint32 node_id;
1406  uint32 id;
1407  uint32 version;
1408  uint32 type;
1409  uint32 any_value;
1410 };
1411 
1412 static void
1413 print_could_not_discover_error(THD *thd,
1414  const Cluster_schema *schema)
1415 {
1416  sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
1417  "binlog schema event '%s' from node %d. "
1418  "my_errno: %d",
1419  schema->db, schema->name, schema->query,
1420  schema->node_id, my_errno);
1421  print_warning_list("NDB Binlog", thd);
1422 }
1423 
1424 
1425 /*
1426  Transfer schema table data into corresponding struct
1427 */
1428 static void ndbcluster_get_schema(Ndb_event_data *event_data,
1429  Cluster_schema *s)
1430 {
1431  TABLE *table= event_data->shadow_table;
1432  Field **field;
1433  /* unpack blob values */
1434  uchar* blobs_buffer= 0;
1435  uint blobs_buffer_size= 0;
1436  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1437  {
1438  ptrdiff_t ptrdiff= 0;
1439  int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
1440  blobs_buffer, blobs_buffer_size,
1441  ptrdiff);
1442  if (ret != 0)
1443  {
1444  my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
1445  DBUG_PRINT("info", ("blob read error"));
1446  DBUG_ASSERT(FALSE);
1447  }
1448  }
1449  /* db varchar 1 length uchar */
1450  field= table->field;
1451  s->db_length= *(uint8*)(*field)->ptr;
1452  DBUG_ASSERT(s->db_length <= (*field)->field_length);
1453  DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
1454  memcpy(s->db, (*field)->ptr + 1, s->db_length);
1455  s->db[s->db_length]= 0;
1456  /* name varchar 1 length uchar */
1457  field++;
1458  s->name_length= *(uint8*)(*field)->ptr;
1459  DBUG_ASSERT(s->name_length <= (*field)->field_length);
1460  DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
1461  memcpy(s->name, (*field)->ptr + 1, s->name_length);
1462  s->name[s->name_length]= 0;
1463  /* slock fixed length */
1464  field++;
1465  s->slock_length= (*field)->field_length;
1466  DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
1467  memcpy(s->slock, (*field)->ptr, s->slock_length);
1468  /* query blob */
1469  field++;
1470  {
1471  Field_blob *field_blob= (Field_blob*)(*field);
1472  uint blob_len= field_blob->get_length((*field)->ptr);
1473  uchar *blob_ptr= 0;
1474  field_blob->get_ptr(&blob_ptr);
1475  DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
1476  s->query_length= blob_len;
1477  s->query= sql_strmake((char*) blob_ptr, blob_len);
1478  }
1479  /* node_id */
1480  field++;
1481  s->node_id= (Uint32)((Field_long *)*field)->val_int();
1482  /* epoch */
1483  field++;
1484  s->epoch= ((Field_long *)*field)->val_int();
1485  /* id */
1486  field++;
1487  s->id= (Uint32)((Field_long *)*field)->val_int();
1488  /* version */
1489  field++;
1490  s->version= (Uint32)((Field_long *)*field)->val_int();
1491  /* type */
1492  field++;
1493  s->type= (Uint32)((Field_long *)*field)->val_int();
1494  /* free blobs buffer */
1495  my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
1496  dbug_tmp_restore_column_map(table->read_set, old_map);
1497 }
1498 
1499 /*
1500  helper function to pack a ndb varchar
1501 */
1502 char *ndb_pack_varchar(const NDBCOL *col, char *buf,
1503  const char *str, int sz)
1504 {
1505  switch (col->getArrayType())
1506  {
1507  case NDBCOL::ArrayTypeFixed:
1508  memcpy(buf, str, sz);
1509  break;
1510  case NDBCOL::ArrayTypeShortVar:
1511  *(uchar*)buf= (uchar)sz;
1512  memcpy(buf + 1, str, sz);
1513  break;
1514  case NDBCOL::ArrayTypeMediumVar:
1515  int2store(buf, sz);
1516  memcpy(buf + 2, str, sz);
1517  break;
1518  }
1519  return buf;
1520 }
1521 
1522 /*
1523  acknowledge handling of schema operation
1524 */
1525 static int
1526 ndbcluster_update_slock(THD *thd,
1527  const char *db,
1528  const char *table_name,
1529  uint32 table_id,
1530  uint32 table_version)
1531 {
1532  DBUG_ENTER("ndbcluster_update_slock");
1533  if (!ndb_schema_share)
1534  {
1535  DBUG_RETURN(0);
1536  }
1537 
1538  const NdbError *ndb_error= 0;
1539  uint32 node_id= g_ndb_cluster_connection->node_id();
1540  Ndb *ndb= check_ndb_in_thd(thd);
1541  char save_db[FN_HEADLEN];
1542  strcpy(save_db, ndb->getDatabaseName());
1543 
1544  char tmp_buf[FN_REFLEN];
1545  NDBDICT *dict= ndb->getDictionary();
1546  ndb->setDatabaseName(NDB_REP_DB);
1547  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1548  const NDBTAB *ndbtab= ndbtab_g.get_table();
1549  NdbTransaction *trans= 0;
1550  int retries= 100;
1551  int retry_sleep= 30; /* 30 milliseconds, transaction */
1552  const NDBCOL *col[SCHEMA_SIZE];
1553  unsigned sz[SCHEMA_SIZE];
1554 
1555  MY_BITMAP slock;
1556  uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
1557  bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
1558 
1559  if (ndbtab == 0)
1560  {
1561  if (dict->getNdbError().code != 4009)
1562  abort();
1563  DBUG_RETURN(0);
1564  }
1565 
1566  {
1567  uint i;
1568  for (i= 0; i < SCHEMA_SIZE; i++)
1569  {
1570  col[i]= ndbtab->getColumn(i);
1571  if (i != SCHEMA_QUERY_I)
1572  {
1573  sz[i]= col[i]->getLength();
1574  DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
1575  }
1576  }
1577  }
1578 
1579  while (1)
1580  {
1581  if ((trans= ndb->startTransaction()) == 0)
1582  goto err;
1583  {
1584  NdbOperation *op= 0;
1585  int r= 0;
1586 
1587  /* read the bitmap exlusive */
1588  r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1589  DBUG_ASSERT(r == 0);
1590  r|= op->readTupleExclusive();
1591  DBUG_ASSERT(r == 0);
1592 
1593  /* db */
1594  ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1595  r|= op->equal(SCHEMA_DB_I, tmp_buf);
1596  DBUG_ASSERT(r == 0);
1597  /* name */
1598  ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1599  strlen(table_name));
1600  r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1601  DBUG_ASSERT(r == 0);
1602  /* slock */
1603  r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
1604  DBUG_ASSERT(r == 0);
1605  }
1606  if (trans->execute(NdbTransaction::NoCommit))
1607  goto err;
1608 
1609  if (opt_ndb_extra_logging > 19)
1610  {
1611  uint32 copy[SCHEMA_SLOCK_SIZE/4];
1612  memcpy(copy, bitbuf, sizeof(copy));
1613  bitmap_clear_bit(&slock, node_id);
1614  sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
1615  db, table_name,
1616  table_id, table_version,
1617  copy[0], copy[1],
1618  slock.bitmap[0],
1619  slock.bitmap[1]);
1620  }
1621  else
1622  {
1623  bitmap_clear_bit(&slock, node_id);
1624  }
1625 
1626  {
1627  NdbOperation *op= 0;
1628  int r= 0;
1629 
1630  /* now update the tuple */
1631  r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1632  DBUG_ASSERT(r == 0);
1633  r|= op->updateTuple();
1634  DBUG_ASSERT(r == 0);
1635 
1636  /* db */
1637  ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1638  r|= op->equal(SCHEMA_DB_I, tmp_buf);
1639  DBUG_ASSERT(r == 0);
1640  /* name */
1641  ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1642  strlen(table_name));
1643  r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1644  DBUG_ASSERT(r == 0);
1645  /* slock */
1646  r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
1647  DBUG_ASSERT(r == 0);
1648  /* node_id */
1649  r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1650  DBUG_ASSERT(r == 0);
1651  /* type */
1652  r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
1653  DBUG_ASSERT(r == 0);
1654  }
1655  if (trans->execute(NdbTransaction::Commit,
1656  NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
1657  {
1658  DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
1659  node_id, db, table_name));
1660  dict->forceGCPWait(1);
1661  break;
1662  }
1663  err:
1664  const NdbError *this_error= trans ?
1665  &trans->getNdbError() : &ndb->getNdbError();
1666  if (this_error->status == NdbError::TemporaryError && !thd->killed)
1667  {
1668  if (retries--)
1669  {
1670  if (trans)
1671  ndb->closeTransaction(trans);
1672  do_retry_sleep(retry_sleep);
1673  continue; // retry
1674  }
1675  }
1676  ndb_error= this_error;
1677  break;
1678  }
1679 
1680  if (ndb_error)
1681  {
1682  char buf[1024];
1683  my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
1684  db, table_name);
1685  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
1686  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1687  ndb_error->code, ndb_error->message, buf);
1688  }
1689  if (trans)
1690  ndb->closeTransaction(trans);
1691  ndb->setDatabaseName(save_db);
1692  DBUG_RETURN(0);
1693 }
1694 
1695 
1696 /*
1697  log query in schema table
1698 */
1699 static void ndb_report_waiting(const char *key,
1700  int the_time,
1701  const char *op,
1702  const char *obj,
1703  const MY_BITMAP * map)
1704 {
1705  ulonglong ndb_latest_epoch= 0;
1706  const char *proc_info= "<no info>";
1707  pthread_mutex_lock(&injector_mutex);
1708  if (injector_ndb)
1709  ndb_latest_epoch= injector_ndb->getLatestGCI();
1710  if (injector_thd)
1711  proc_info= injector_thd->proc_info;
1712  pthread_mutex_unlock(&injector_mutex);
1713  if (map == 0)
1714  {
1715  sql_print_information("NDB %s:"
1716  " waiting max %u sec for %s %s."
1717  " epochs: (%u/%u,%u/%u,%u/%u)"
1718  " injector proc_info: %s"
1719  ,key, the_time, op, obj
1720  ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1721  ,(uint)(ndb_latest_handled_binlog_epoch)
1722  ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1723  ,(uint)(ndb_latest_received_binlog_epoch)
1724  ,(uint)(ndb_latest_epoch >> 32)
1725  ,(uint)(ndb_latest_epoch)
1726  ,proc_info
1727  );
1728  }
1729  else
1730  {
1731  sql_print_information("NDB %s:"
1732  " waiting max %u sec for %s %s."
1733  " epochs: (%u/%u,%u/%u,%u/%u)"
1734  " injector proc_info: %s map: %x%x"
1735  ,key, the_time, op, obj
1736  ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1737  ,(uint)(ndb_latest_handled_binlog_epoch)
1738  ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1739  ,(uint)(ndb_latest_received_binlog_epoch)
1740  ,(uint)(ndb_latest_epoch >> 32)
1741  ,(uint)(ndb_latest_epoch)
1742  ,proc_info
1743  ,map->bitmap[0]
1744  ,map->bitmap[1]
1745  );
1746  }
1747 }
1748 
1749 static
1750 const char*
1751 get_schema_type_name(uint type)
1752 {
1753  switch(type){
1754  case SOT_DROP_TABLE:
1755  return "DROP_TABLE";
1756  case SOT_CREATE_TABLE:
1757  return "CREATE_TABLE";
1758  case SOT_RENAME_TABLE_NEW:
1759  return "RENAME_TABLE_NEW";
1760  case SOT_ALTER_TABLE_COMMIT:
1761  return "ALTER_TABLE_COMMIT";
1762  case SOT_DROP_DB:
1763  return "DROP_DB";
1764  case SOT_CREATE_DB:
1765  return "CREATE_DB";
1766  case SOT_ALTER_DB:
1767  return "ALTER_DB";
1768  case SOT_CLEAR_SLOCK:
1769  return "CLEAR_SLOCK";
1770  case SOT_TABLESPACE:
1771  return "TABLESPACE";
1772  case SOT_LOGFILE_GROUP:
1773  return "LOGFILE_GROUP";
1774  case SOT_RENAME_TABLE:
1775  return "RENAME_TABLE";
1776  case SOT_TRUNCATE_TABLE:
1777  return "TRUNCATE_TABLE";
1778  case SOT_RENAME_TABLE_PREPARE:
1779  return "RENAME_TABLE_PREPARE";
1780  case SOT_ONLINE_ALTER_TABLE_PREPARE:
1781  return "ONLINE_ALTER_TABLE_PREPARE";
1782  case SOT_ONLINE_ALTER_TABLE_COMMIT:
1783  return "ONLINE_ALTER_TABLE_COMMIT";
1784  case SOT_CREATE_USER:
1785  return "CREATE_USER";
1786  case SOT_DROP_USER:
1787  return "DROP_USER";
1788  case SOT_RENAME_USER:
1789  return "RENAME_USER";
1790  case SOT_GRANT:
1791  return "GRANT";
1792  case SOT_REVOKE:
1793  return "REVOKE";
1794  }
1795  return "<unknown>";
1796 }
1797 
1798 extern void update_slave_api_stats(Ndb*);
1799 
1800 int ndbcluster_log_schema_op(THD *thd,
1801  const char *query, int query_length,
1802  const char *db, const char *table_name,
1803  uint32 ndb_table_id,
1804  uint32 ndb_table_version,
1805  enum SCHEMA_OP_TYPE type,
1806  const char *new_db, const char *new_table_name)
1807 {
1808  DBUG_ENTER("ndbcluster_log_schema_op");
1809  Thd_ndb *thd_ndb= get_thd_ndb(thd);
1810  if (!thd_ndb)
1811  {
1812  if (!(thd_ndb= Thd_ndb::seize(thd)))
1813  {
1814  sql_print_error("Could not allocate Thd_ndb object");
1815  DBUG_RETURN(1);
1816  }
1817  thd_set_thd_ndb(thd, thd_ndb);
1818  }
1819 
1820  DBUG_PRINT("enter",
1821  ("query: %s db: %s table_name: %s thd_ndb->options: %d",
1822  query, db, table_name, thd_ndb->options));
1823  if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
1824  {
1825  if (thd->slave_thread)
1826  update_slave_api_stats(thd_ndb->ndb);
1827 
1828  DBUG_RETURN(0);
1829  }
1830 
1831  char tmp_buf2[FN_REFLEN];
1832  char quoted_table1[2 + 2 * FN_REFLEN + 1];
1833  char quoted_db1[2 + 2 * FN_REFLEN + 1];
1834  char quoted_db2[2 + 2 * FN_REFLEN + 1];
1835  char quoted_table2[2 + 2 * FN_REFLEN + 1];
1836  int id_length= 0;
1837  const char *type_str;
1838  int also_internal= 0;
1839  uint32 log_type= (uint32)type;
1840  switch (type)
1841  {
1842  case SOT_DROP_TABLE:
1843  /* drop database command, do not log at drop table */
1844  if (thd->lex->sql_command == SQLCOM_DROP_DB)
1845  DBUG_RETURN(0);
1846  /* redo the drop table query as is may contain several tables */
1847  query= tmp_buf2;
1848  id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1849  table_name, 0);
1850  quoted_table1[id_length]= '\0';
1851  id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1852  db, 0);
1853  quoted_db1[id_length]= '\0';
1854  query_length= (uint) (strxmov(tmp_buf2, "drop table ", quoted_db1, ".",
1855  quoted_table1, NullS) - tmp_buf2);
1856  type_str= "drop table";
1857  break;
1858  case SOT_RENAME_TABLE_PREPARE:
1859  type_str= "rename table prepare";
1860  also_internal= 1;
1861  break;
1862  case SOT_RENAME_TABLE:
1863  /* redo the rename table query as is may contain several tables */
1864  query= tmp_buf2;
1865  id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1866  db, 0);
1867  quoted_db1[id_length]= '\0';
1868  id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1869  table_name, 0);
1870  quoted_table1[id_length]= '\0';
1871  id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db2,
1872  new_db, 0);
1873  quoted_db2[id_length]= '\0';
1874  id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table2,
1875  new_table_name, 0);
1876  quoted_table2[id_length]= '\0';
1877  query_length= (uint) (strxmov(tmp_buf2, "rename table ",
1878  quoted_db1, ".", quoted_table1, " to ",
1879  quoted_db2, ".", quoted_table2, NullS) - tmp_buf2);
1880  type_str= "rename table";
1881  break;
1882  case SOT_CREATE_TABLE:
1883  type_str= "create table";
1884  break;
1885  case SOT_ALTER_TABLE_COMMIT:
1886  type_str= "alter table";
1887  also_internal= 1;
1888  break;
1889  case SOT_ONLINE_ALTER_TABLE_PREPARE:
1890  type_str= "online alter table prepare";
1891  also_internal= 1;
1892  break;
1893  case SOT_ONLINE_ALTER_TABLE_COMMIT:
1894  type_str= "online alter table commit";
1895  also_internal= 1;
1896  break;
1897  case SOT_DROP_DB:
1898  type_str= "drop db";
1899  break;
1900  case SOT_CREATE_DB:
1901  type_str= "create db";
1902  break;
1903  case SOT_ALTER_DB:
1904  type_str= "alter db";
1905  break;
1906  case SOT_TABLESPACE:
1907  type_str= "tablespace";
1908  break;
1909  case SOT_LOGFILE_GROUP:
1910  type_str= "logfile group";
1911  break;
1912  case SOT_TRUNCATE_TABLE:
1913  type_str= "truncate table";
1914  break;
1915  case SOT_CREATE_USER:
1916  type_str= "create user";
1917  break;
1918  case SOT_DROP_USER:
1919  type_str= "drop user";
1920  break;
1921  case SOT_RENAME_USER:
1922  type_str= "rename user";
1923  break;
1924  case SOT_GRANT:
1925  type_str= "grant/revoke";
1926  break;
1927  case SOT_REVOKE:
1928  type_str= "revoke all";
1929  break;
1930  default:
1931  abort(); /* should not happen, programming error */
1932  }
1933 
1934  NDB_SCHEMA_OBJECT *ndb_schema_object;
1935  {
1936  char key[FN_REFLEN + 1];
1937  build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
1938  ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
1939  ndb_schema_object->table_id= ndb_table_id;
1940  ndb_schema_object->table_version= ndb_table_version;
1941  }
1942 
1943  const NdbError *ndb_error= 0;
1944  uint32 node_id= g_ndb_cluster_connection->node_id();
1945  Uint64 epoch= 0;
1946  {
1947  int i;
1948  int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1949 
1950  /* begin protect ndb_schema_share */
1951  pthread_mutex_lock(&ndb_schema_share_mutex);
1952  if (ndb_schema_share == 0)
1953  {
1954  pthread_mutex_unlock(&ndb_schema_share_mutex);
1955  if (ndb_schema_object)
1956  ndb_free_schema_object(&ndb_schema_object, FALSE);
1957  DBUG_RETURN(0);
1958  }
1959  pthread_mutex_lock(&ndb_schema_share->mutex);
1960  for (i= 0; i < no_storage_nodes; i++)
1961  {
1962  bitmap_union(&ndb_schema_object->slock_bitmap,
1963  &ndb_schema_share->subscriber_bitmap[i]);
1964  }
1965  pthread_mutex_unlock(&ndb_schema_share->mutex);
1966  pthread_mutex_unlock(&ndb_schema_share_mutex);
1967  /* end protect ndb_schema_share */
1968 
1969  if (also_internal)
1970  bitmap_set_bit(&ndb_schema_object->slock_bitmap, node_id);
1971  else
1972  bitmap_clear_bit(&ndb_schema_object->slock_bitmap, node_id);
1973 
1974  DBUG_DUMP("schema_subscribers", (uchar*)&ndb_schema_object->slock,
1975  no_bytes_in_map(&ndb_schema_object->slock_bitmap));
1976  }
1977 
1978  Ndb *ndb= thd_ndb->ndb;
1979  char save_db[FN_REFLEN];
1980  strcpy(save_db, ndb->getDatabaseName());
1981 
1982  char tmp_buf[FN_REFLEN];
1983  NDBDICT *dict= ndb->getDictionary();
1984  ndb->setDatabaseName(NDB_REP_DB);
1985  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1986  const NDBTAB *ndbtab= ndbtab_g.get_table();
1987  NdbTransaction *trans= 0;
1988  int retries= 100;
1989  int retry_sleep= 30; /* 30 milliseconds, transaction */
1990  const NDBCOL *col[SCHEMA_SIZE];
1991  unsigned sz[SCHEMA_SIZE];
1992 
1993  if (ndbtab == 0)
1994  {
1995  if (strcmp(NDB_REP_DB, db) != 0 ||
1996  strcmp(NDB_SCHEMA_TABLE, table_name))
1997  {
1998  ndb_error= &dict->getNdbError();
1999  }
2000  goto end;
2001  }
2002 
2003  {
2004  uint i;
2005  for (i= 0; i < SCHEMA_SIZE; i++)
2006  {
2007  col[i]= ndbtab->getColumn(i);
2008  if (i != SCHEMA_QUERY_I)
2009  {
2010  sz[i]= col[i]->getLength();
2011  DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
2012  }
2013  }
2014  }
2015 
2016  while (1)
2017  {
2018  const char *log_db= db;
2019  const char *log_tab= table_name;
2020  const char *log_subscribers= (char*)ndb_schema_object->slock;
2021  if ((trans= ndb->startTransaction()) == 0)
2022  goto err;
2023  while (1)
2024  {
2025  NdbOperation *op= 0;
2026  int r= 0;
2027  r|= (op= trans->getNdbOperation(ndbtab)) == 0;
2028  DBUG_ASSERT(r == 0);
2029  r|= op->writeTuple();
2030  DBUG_ASSERT(r == 0);
2031 
2032  /* db */
2033  ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
2034  r|= op->equal(SCHEMA_DB_I, tmp_buf);
2035  DBUG_ASSERT(r == 0);
2036  /* name */
2037  ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
2038  strlen(log_tab));
2039  r|= op->equal(SCHEMA_NAME_I, tmp_buf);
2040  DBUG_ASSERT(r == 0);
2041  /* slock */
2042  DBUG_ASSERT(sz[SCHEMA_SLOCK_I] ==
2043  no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2044  r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
2045  DBUG_ASSERT(r == 0);
2046  /* query */
2047  {
2048  NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
2049  DBUG_ASSERT(ndb_blob != 0);
2050  uint blob_len= query_length;
2051  const char* blob_ptr= query;
2052  r|= ndb_blob->setValue(blob_ptr, blob_len);
2053  DBUG_ASSERT(r == 0);
2054  }
2055  /* node_id */
2056  r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
2057  DBUG_ASSERT(r == 0);
2058  /* epoch */
2059  r|= op->setValue(SCHEMA_EPOCH_I, epoch);
2060  DBUG_ASSERT(r == 0);
2061  /* id */
2062  r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
2063  DBUG_ASSERT(r == 0);
2064  /* version */
2065  r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
2066  DBUG_ASSERT(r == 0);
2067  /* type */
2068  r|= op->setValue(SCHEMA_TYPE_I, log_type);
2069  DBUG_ASSERT(r == 0);
2070  /* any value */
2071  Uint32 anyValue = 0;
2072  if (! thd->slave_thread)
2073  {
2074  /* Schema change originating from this MySQLD, check SQL_LOG_BIN
2075  * variable and pass 'setting' to all logging MySQLDs via AnyValue
2076  */
2077  if (thd_options(thd) & OPTION_BIN_LOG) /* e.g. SQL_LOG_BIN == on */
2078  {
2079  DBUG_PRINT("info", ("Schema event for binlogging"));
2080  ndbcluster_anyvalue_set_normal(anyValue);
2081  }
2082  else
2083  {
2084  DBUG_PRINT("info", ("Schema event not for binlogging"));
2085  ndbcluster_anyvalue_set_nologging(anyValue);
2086  }
2087  }
2088  else
2089  {
2090  /*
2091  Slave propagating replicated schema event in ndb_schema
2092  In case replicated serverId is composite
2093  (server-id-bits < 31) we copy it into the
2094  AnyValue as-is
2095  This is for 'future', as currently Schema operations
2096  do not have composite AnyValues.
2097  In future it may be useful to support *not* mapping composite
2098  AnyValues to/from Binlogged server-ids.
2099  */
2100  DBUG_PRINT("info", ("Replicated schema event with original server id %d",
2101  thd->server_id));
2102  anyValue = thd_unmasked_server_id(thd);
2103  }
2104 
2105 #ifndef DBUG_OFF
2106  /*
2107  MySQLD will set the user-portion of AnyValue (if any) to all 1s
2108  This tests code filtering ServerIds on the value of server-id-bits.
2109  */
2110  const char* p = getenv("NDB_TEST_ANYVALUE_USERDATA");
2111  if (p != 0 && *p != 0 && *p != '0' && *p != 'n' && *p != 'N')
2112  {
2113  dbug_ndbcluster_anyvalue_set_userbits(anyValue);
2114  }
2115 #endif
2116 
2117  r|= op->setAnyValue(anyValue);
2118  DBUG_ASSERT(r == 0);
2119  break;
2120  }
2122  1 /* force send */) == 0)
2123  {
2124  DBUG_PRINT("info", ("logged: %s", query));
2125  dict->forceGCPWait(1);
2126  break;
2127  }
2128 err:
2129  const NdbError *this_error= trans ?
2130  &trans->getNdbError() : &ndb->getNdbError();
2131  if (this_error->status == NdbError::TemporaryError && !thd->killed)
2132  {
2133  if (retries--)
2134  {
2135  if (trans)
2136  ndb->closeTransaction(trans);
2137  do_retry_sleep(retry_sleep);
2138  continue; // retry
2139  }
2140  }
2141  ndb_error= this_error;
2142  break;
2143  }
2144 end:
2145  if (ndb_error)
2146  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
2147  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2148  ndb_error->code,
2149  ndb_error->message,
2150  "Could not log query '%s' on other mysqld's");
2151 
2152  if (trans)
2153  ndb->closeTransaction(trans);
2154  ndb->setDatabaseName(save_db);
2155 
2156  if (opt_ndb_extra_logging > 19)
2157  {
2158  sql_print_information("NDB: distributed %s.%s(%u/%u) type: %s(%u) query: \'%s\' to %x%x",
2159  db,
2160  table_name,
2161  ndb_table_id,
2162  ndb_table_version,
2163  get_schema_type_name(log_type),
2164  log_type,
2165  query,
2166  ndb_schema_object->slock_bitmap.bitmap[0],
2167  ndb_schema_object->slock_bitmap.bitmap[1]);
2168  }
2169 
2170  /*
2171  Wait for other mysqld's to acknowledge the table operation
2172  */
2173  if (ndb_error == 0 && !bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2174  {
2175  int max_timeout= DEFAULT_SYNC_TIMEOUT;
2176  pthread_mutex_lock(&ndb_schema_object->mutex);
2177  while (1)
2178  {
2179  struct timespec abstime;
2180  int i;
2181  int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
2182  set_timespec(abstime, 1);
2183  int ret= pthread_cond_timedwait(&injector_cond,
2184  &ndb_schema_object->mutex,
2185  &abstime);
2186  if (thd->killed)
2187  break;
2188 
2189  /* begin protect ndb_schema_share */
2190  pthread_mutex_lock(&ndb_schema_share_mutex);
2191  if (ndb_schema_share == 0)
2192  {
2193  pthread_mutex_unlock(&ndb_schema_share_mutex);
2194  break;
2195  }
2196  MY_BITMAP servers;
2197  bitmap_init(&servers, 0, 256, FALSE);
2198  bitmap_clear_all(&servers);
2199  bitmap_set_bit(&servers, node_id); // "we" are always alive
2200  pthread_mutex_lock(&ndb_schema_share->mutex);
2201  for (i= 0; i < no_storage_nodes; i++)
2202  {
2203  /* remove any unsubscribed from schema_subscribers */
2204  MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
2205  bitmap_union(&servers, tmp);
2206  }
2207  pthread_mutex_unlock(&ndb_schema_share->mutex);
2208  pthread_mutex_unlock(&ndb_schema_share_mutex);
2209  /* end protect ndb_schema_share */
2210 
2211  /* remove any unsubscribed from ndb_schema_object->slock */
2212  bitmap_intersect(&ndb_schema_object->slock_bitmap, &servers);
2213  bitmap_free(&servers);
2214 
2215  if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2216  break;
2217 
2218  if (ret)
2219  {
2220  max_timeout--;
2221  if (max_timeout == 0)
2222  {
2223  sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
2224  type_str, ndb_schema_object->key);
2225  DBUG_ASSERT(false);
2226  break;
2227  }
2228  if (opt_ndb_extra_logging)
2229  ndb_report_waiting(type_str, max_timeout,
2230  "distributing", ndb_schema_object->key,
2231  &ndb_schema_object->slock_bitmap);
2232  }
2233  }
2234  pthread_mutex_unlock(&ndb_schema_object->mutex);
2235  }
2236  else if (ndb_error)
2237  {
2238  sql_print_error("NDB %s: distributing %s err: %u",
2239  type_str, ndb_schema_object->key,
2240  ndb_error->code);
2241  }
2242  else if (opt_ndb_extra_logging > 19)
2243  {
2244  sql_print_information("NDB %s: not waiting for distributing %s",
2245  type_str, ndb_schema_object->key);
2246  }
2247 
2248  if (ndb_schema_object)
2249  ndb_free_schema_object(&ndb_schema_object, FALSE);
2250 
2251  if (opt_ndb_extra_logging > 19)
2252  {
2253  sql_print_information("NDB: distribution of %s.%s(%u/%u) type: %s(%u) query: \'%s\'"
2254  " - complete!",
2255  db,
2256  table_name,
2257  ndb_table_id,
2258  ndb_table_version,
2259  get_schema_type_name(log_type),
2260  log_type,
2261  query);
2262  }
2263 
2264  if (thd->slave_thread)
2265  update_slave_api_stats(ndb);
2266 
2267  DBUG_RETURN(0);
2268 }
2269 
2270 /*
2271  Handle _non_ data events from the storage nodes
2272 */
2273 
2274 int
2275 ndb_handle_schema_change(THD *thd, Ndb *is_ndb, NdbEventOperation *pOp,
2276  Ndb_event_data *event_data)
2277 {
2278  DBUG_ENTER("ndb_handle_schema_change");
2279  NDB_SHARE *share= event_data->share;
2280  TABLE *shadow_table= event_data->shadow_table;
2281  const char *tabname= shadow_table->s->table_name.str;
2282  const char *dbname= shadow_table->s->db.str;
2283  bool do_close_cached_tables= FALSE;
2284  bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
2285 
2286  if (pOp->getEventType() == NDBEVENT::TE_ALTER)
2287  {
2288  DBUG_RETURN(0);
2289  }
2290  DBUG_ASSERT(pOp->getEventType() == NDBEVENT::TE_DROP ||
2292  {
2293  Thd_ndb *thd_ndb= get_thd_ndb(thd);
2294  Ndb *ndb= thd_ndb->ndb;
2295  NDBDICT *dict= ndb->getDictionary();
2296  ndb->setDatabaseName(dbname);
2297  Ndb_table_guard ndbtab_g(dict, tabname);
2298  const NDBTAB *ev_tab= pOp->getTable();
2299  const NDBTAB *cache_tab= ndbtab_g.get_table();
2300  if (cache_tab &&
2301  cache_tab->getObjectId() == ev_tab->getObjectId() &&
2302  cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
2303  ndbtab_g.invalidate();
2304  }
2305 
2306  pthread_mutex_lock(&share->mutex);
2307  DBUG_ASSERT(share->state == NSS_DROPPED ||
2308  share->op == pOp || share->new_op == pOp);
2309  if (share->new_op)
2310  {
2311  share->new_op= 0;
2312  }
2313  if (share->op)
2314  {
2315  share->op= 0;
2316  }
2317  // either just us or drop table handling as well
2318 
2319  /* Signal ha_ndbcluster::delete/rename_table that drop is done */
2320  pthread_mutex_unlock(&share->mutex);
2321  (void) pthread_cond_signal(&injector_cond);
2322 
2323  pthread_mutex_lock(&ndbcluster_mutex);
2324  /* ndb_share reference binlog free */
2325  DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
2326  share->key, share->use_count));
2327  free_share(&share, TRUE);
2328  if (is_remote_change && share && share->state != NSS_DROPPED)
2329  {
2330  DBUG_PRINT("info", ("remote change"));
2331  share->state= NSS_DROPPED;
2332  if (share->use_count != 1)
2333  {
2334  /* open handler holding reference */
2335  /* wait with freeing create ndb_share to below */
2336  do_close_cached_tables= TRUE;
2337  }
2338  else
2339  {
2340  /* ndb_share reference create free */
2341  DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
2342  share->key, share->use_count));
2343  free_share(&share, TRUE);
2344  share= 0;
2345  }
2346  }
2347  else
2348  share= 0;
2349  pthread_mutex_unlock(&ndbcluster_mutex);
2350 
2351  if (event_data)
2352  {
2353  delete event_data;
2354  pOp->setCustomData(NULL);
2355  }
2356 
2357  pthread_mutex_lock(&injector_mutex);
2358  is_ndb->dropEventOperation(pOp);
2359  pOp= 0;
2360  pthread_mutex_unlock(&injector_mutex);
2361 
2362  if (do_close_cached_tables)
2363  {
2364  TABLE_LIST table_list;
2365  memset(&table_list, 0, sizeof(table_list));
2366  table_list.db= (char *)dbname;
2367  table_list.alias= table_list.table_name= (char *)tabname;
2368  close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2369  /* ndb_share reference create free */
2370  DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
2371  share->key, share->use_count));
2372  free_share(&share);
2373  }
2374  DBUG_RETURN(0);
2375 }
2376 
2377 static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
2378 {
2379  /* any_value == 0 means local cluster sourced change that
2380  * should be logged
2381  */
2382  if (ndbcluster_anyvalue_is_reserved(schema->any_value))
2383  {
2384  /* Originating SQL node did not want this query logged */
2385  if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
2386  sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
2387  "query not logged",
2388  schema->any_value);
2389  return;
2390  }
2391 
2392  Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
2393  /*
2394  Start with serverId as received AnyValue, in case it's a composite
2395  (server_id_bits < 31).
2396  This is for 'future', as currently schema ops do not have composite
2397  AnyValues.
2398  In future it may be useful to support *not* mapping composite
2399  AnyValues to/from Binlogged server-ids.
2400  */
2401  Uint32 loggedServerId = schema->any_value;
2402 
2403  if (queryServerId)
2404  {
2405  /*
2406  AnyValue has non-zero serverId, must be a query applied by a slave
2407  mysqld.
2408  TODO : Assert that we are running in the Binlog injector thread?
2409  */
2410  if (! g_ndb_log_slave_updates)
2411  {
2412  /* This MySQLD does not log slave updates */
2413  return;
2414  }
2415  }
2416  else
2417  {
2418  /* No ServerId associated with this query, mark it as ours */
2419  ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
2420  }
2421 
2422  uint32 thd_server_id_save= thd->server_id;
2423  DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
2424  char *thd_db_save= thd->db;
2425  thd->server_id = loggedServerId;
2426  thd->db= schema->db;
2427  int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
2428  thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
2429  schema->query_length, FALSE,
2430 #ifdef NDB_THD_BINLOG_QUERY_HAS_DIRECT
2431  TRUE,
2432 #endif
2433  schema->name[0] == 0 || thd->db[0] == 0,
2434  errcode);
2435  thd->server_id= thd_server_id_save;
2436  thd->db= thd_db_save;
2437 }
2438 
2439 static int
2440 ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *s_ndb,
2441  NdbEventOperation *pOp,
2443  *post_epoch_log_list,
2445  *post_epoch_unlock_list,
2446  MEM_ROOT *mem_root)
2447 {
2448  DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
2449  Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
2450  NDB_SHARE *tmp_share= event_data->share;
2451  if (tmp_share && ndb_schema_share == tmp_share)
2452  {
2453  NDBEVENT::TableEvent ev_type= pOp->getEventType();
2454  DBUG_PRINT("enter", ("%s.%s ev_type: %d",
2455  tmp_share->db, tmp_share->table_name, ev_type));
2456  if (ev_type == NDBEVENT::TE_UPDATE ||
2457  ev_type == NDBEVENT::TE_INSERT)
2458  {
2459  Thd_ndb *thd_ndb= get_thd_ndb(thd);
2460  Ndb *ndb= thd_ndb->ndb;
2461  NDBDICT *dict= ndb->getDictionary();
2462  Thd_ndb_options_guard thd_ndb_options(thd_ndb);
2463  Cluster_schema *schema= (Cluster_schema *)
2464  sql_alloc(sizeof(Cluster_schema));
2465  MY_BITMAP slock;
2466  bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
2467  uint node_id= g_ndb_cluster_connection->node_id();
2468  {
2469  ndbcluster_get_schema(event_data, schema);
2470  schema->any_value= pOp->getAnyValue();
2471  }
2472  enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
2473  DBUG_PRINT("info",
2474  ("%s.%s: log query_length: %d query: '%s' type: %d",
2475  schema->db, schema->name,
2476  schema->query_length, schema->query,
2477  schema_type));
2478 
2479  if (opt_ndb_extra_logging > 19)
2480  {
2481  sql_print_information("NDB: got schema event on %s.%s(%u/%u) query: '%s' type: %s(%d) node: %u slock: %x%x",
2482  schema->db, schema->name,
2483  schema->id, schema->version,
2484  schema->query,
2485  get_schema_type_name(schema_type),
2486  schema_type,
2487  schema->node_id,
2488  slock.bitmap[0], slock.bitmap[1]);
2489  }
2490 
2491  if ((schema->db[0] == 0) && (schema->name[0] == 0))
2492  DBUG_RETURN(0);
2493  switch (schema_type)
2494  {
2495  case SOT_CLEAR_SLOCK:
2496  /*
2497  handle slock after epoch is completed to ensure that
2498  schema events get inserted in the binlog after any data
2499  events
2500  */
2501  post_epoch_log_list->push_back(schema, mem_root);
2502  DBUG_RETURN(0);
2503  case SOT_ALTER_TABLE_COMMIT:
2504  // fall through
2505  case SOT_RENAME_TABLE_PREPARE:
2506  // fall through
2507  case SOT_ONLINE_ALTER_TABLE_PREPARE:
2508  // fall through
2509  case SOT_ONLINE_ALTER_TABLE_COMMIT:
2510  post_epoch_log_list->push_back(schema, mem_root);
2511  post_epoch_unlock_list->push_back(schema, mem_root);
2512  DBUG_RETURN(0);
2513  break;
2514  default:
2515  break;
2516  }
2517 
2518  if (schema->node_id != node_id)
2519  {
2520  int log_query= 0, post_epoch_unlock= 0;
2521  char errmsg[MYSQL_ERRMSG_SIZE];
2522 
2523  switch (schema_type)
2524  {
2525  case SOT_RENAME_TABLE:
2526  // fall through
2527  case SOT_RENAME_TABLE_NEW:
2528  {
2529  uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
2530  "NDB Binlog: Skipping renaming locally "
2531  "defined table '%s.%s' from binlog schema "
2532  "event '%s' from node %d. ",
2533  schema->db, schema->name, schema->query,
2534  schema->node_id);
2535  errmsg[end]= '\0';
2536  }
2537  // fall through
2538  case SOT_DROP_TABLE:
2539  if (schema_type == SOT_DROP_TABLE)
2540  {
2541  uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
2542  "NDB Binlog: Skipping dropping locally "
2543  "defined table '%s.%s' from binlog schema "
2544  "event '%s' from node %d. ",
2545  schema->db, schema->name, schema->query,
2546  schema->node_id);
2547  errmsg[end]= '\0';
2548  }
2549  if (! ndbcluster_check_if_local_table(schema->db, schema->name))
2550  {
2551  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2552  const int no_print_error[2]=
2553  {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
2554  run_query(thd, schema->query,
2555  schema->query + schema->query_length,
2556  no_print_error);
2557  /* binlog dropping table after any table operations */
2558  post_epoch_log_list->push_back(schema, mem_root);
2559  /* acknowledge this query _after_ epoch completion */
2560  post_epoch_unlock= 1;
2561  }
2562  else
2563  {
2564  /* Tables exists as a local table, leave it */
2565  DBUG_PRINT("info", ("%s", errmsg));
2566  sql_print_error("%s", errmsg);
2567  log_query= 1;
2568  }
2569  // Fall through
2570  case SOT_TRUNCATE_TABLE:
2571  {
2572  char key[FN_REFLEN + 1];
2573  build_table_filename(key, sizeof(key) - 1,
2574  schema->db, schema->name, "", 0);
2575  /* ndb_share reference temporary, free below */
2576  NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2577  if (share)
2578  {
2579  DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
2580  share->key, share->use_count));
2581  }
2582  // invalidation already handled by binlog thread
2583  if (!share || !share->op)
2584  {
2585  {
2586  ndb->setDatabaseName(schema->db);
2587  Ndb_table_guard ndbtab_g(dict, schema->name);
2588  ndbtab_g.invalidate();
2589  }
2590  TABLE_LIST table_list;
2591  memset(&table_list, 0, sizeof(table_list));
2592  table_list.db= schema->db;
2593  table_list.alias= table_list.table_name= schema->name;
2594  close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2595  }
2596  /* ndb_share reference temporary free */
2597  if (share)
2598  {
2599  DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
2600  share->key, share->use_count));
2601  free_share(&share);
2602  }
2603  }
2604  if (schema_type != SOT_TRUNCATE_TABLE)
2605  break;
2606  // fall through
2607  case SOT_CREATE_TABLE:
2608  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2609  if (ndbcluster_check_if_local_table(schema->db, schema->name))
2610  {
2611  DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
2612  schema->db, schema->name));
2613  sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2614  "binlog schema event '%s' from node %d. ",
2615  schema->db, schema->name, schema->query,
2616  schema->node_id);
2617  }
2618  else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
2619  {
2620  print_could_not_discover_error(thd, schema);
2621  }
2622  log_query= 1;
2623  break;
2624  case SOT_DROP_DB:
2625  /* Drop the database locally if it only contains ndb tables */
2626  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2627  if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
2628  {
2629  const int no_print_error[1]= {0};
2630  run_query(thd, schema->query,
2631  schema->query + schema->query_length,
2632  no_print_error);
2633  /* binlog dropping database after any table operations */
2634  post_epoch_log_list->push_back(schema, mem_root);
2635  /* acknowledge this query _after_ epoch completion */
2636  post_epoch_unlock= 1;
2637  }
2638  else
2639  {
2640  /* Database contained local tables, leave it */
2641  sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
2642  "binlog schema event '%s' from node %d. ",
2643  schema->db, schema->query,
2644  schema->node_id);
2645  log_query= 1;
2646  }
2647  break;
2648  case SOT_CREATE_DB:
2649  if (opt_ndb_extra_logging > 9)
2650  sql_print_information("SOT_CREATE_DB %s", schema->db);
2651 
2652  /* fall through */
2653  case SOT_ALTER_DB:
2654  {
2655  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2656  const int no_print_error[1]= {0};
2657  run_query(thd, schema->query,
2658  schema->query + schema->query_length,
2659  no_print_error);
2660  log_query= 1;
2661  break;
2662  }
2663  case SOT_CREATE_USER:
2664  case SOT_DROP_USER:
2665  case SOT_RENAME_USER:
2666  case SOT_GRANT:
2667  case SOT_REVOKE:
2668  {
2669  if (opt_ndb_extra_logging > 9)
2670  sql_print_information("Got dist_priv event: %s, "
2671  "flushing privileges",
2672  get_schema_type_name(schema_type));
2673 
2674  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2675  const int no_print_error[1]= {0};
2676  char *cmd= (char *) "flush privileges";
2677  run_query(thd, cmd,
2678  cmd + strlen(cmd),
2679  no_print_error);
2680  log_query= 1;
2681  break;
2682  }
2683  case SOT_TABLESPACE:
2684  case SOT_LOGFILE_GROUP:
2685  log_query= 1;
2686  break;
2687  case SOT_ALTER_TABLE_COMMIT:
2688  case SOT_RENAME_TABLE_PREPARE:
2689  case SOT_ONLINE_ALTER_TABLE_PREPARE:
2690  case SOT_ONLINE_ALTER_TABLE_COMMIT:
2691  case SOT_CLEAR_SLOCK:
2692  abort();
2693  }
2694  if (log_query && ndb_binlog_running)
2695  ndb_binlog_query(thd, schema);
2696  /* signal that schema operation has been handled */
2697  DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
2698  if (bitmap_is_set(&slock, node_id))
2699  {
2700  if (post_epoch_unlock)
2701  post_epoch_unlock_list->push_back(schema, mem_root);
2702  else
2703  ndbcluster_update_slock(thd, schema->db, schema->name,
2704  schema->id, schema->version);
2705  }
2706  }
2707  DBUG_RETURN(0);
2708  }
2709  /*
2710  the normal case of UPDATE/INSERT has already been handled
2711  */
2712  switch (ev_type)
2713  {
2714  case NDBEVENT::TE_DELETE:
2715  // skip
2716  break;
2718  if (opt_ndb_extra_logging)
2719  sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
2720  ndb_schema_share->key,
2721  (uint)(pOp->getGCI() >> 32),
2722  (uint)(pOp->getGCI()));
2723  // fall through
2724  case NDBEVENT::TE_DROP:
2725  if (opt_ndb_extra_logging &&
2726  ndb_binlog_tables_inited && ndb_binlog_running)
2727  sql_print_information("NDB Binlog: ndb tables initially "
2728  "read only on reconnect.");
2729 
2730  /* begin protect ndb_schema_share */
2731  pthread_mutex_lock(&ndb_schema_share_mutex);
2732  /* ndb_share reference binlog extra free */
2733  DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
2734  ndb_schema_share->key,
2735  ndb_schema_share->use_count));
2736  free_share(&ndb_schema_share);
2737  ndb_schema_share= 0;
2738  ndb_binlog_tables_inited= FALSE;
2739  ndb_binlog_is_ready= FALSE;
2740  pthread_mutex_unlock(&ndb_schema_share_mutex);
2741  /* end protect ndb_schema_share */
2742 
2743  close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
2744  // fall through
2745  case NDBEVENT::TE_ALTER:
2746  ndb_handle_schema_change(thd, s_ndb, pOp, event_data);
2747  break;
2749  {
2750  uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2751  DBUG_ASSERT(node_id != 0xFF);
2752  pthread_mutex_lock(&tmp_share->mutex);
2753  bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
2754  DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
2755  if (opt_ndb_extra_logging)
2756  {
2757  sql_print_information("NDB Binlog: Node: %d, down,"
2758  " Subscriber bitmask %x%x",
2759  pOp->getNdbdNodeId(),
2760  tmp_share->subscriber_bitmap[node_id].bitmap[1],
2761  tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2762  }
2763  pthread_mutex_unlock(&tmp_share->mutex);
2764  (void) pthread_cond_signal(&injector_cond);
2765  break;
2766  }
2768  {
2769  uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2770  uint8 req_id= pOp->getReqNodeId();
2771  DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2772  pthread_mutex_lock(&tmp_share->mutex);
2773  bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2774  DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
2775  if (opt_ndb_extra_logging)
2776  {
2777  sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
2778  " Subscriber bitmask %x%x",
2779  pOp->getNdbdNodeId(),
2780  req_id,
2781  tmp_share->subscriber_bitmap[node_id].bitmap[1],
2782  tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2783  }
2784  pthread_mutex_unlock(&tmp_share->mutex);
2785  (void) pthread_cond_signal(&injector_cond);
2786  break;
2787  }
2789  {
2790  uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2791  uint8 req_id= pOp->getReqNodeId();
2792  DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2793  pthread_mutex_lock(&tmp_share->mutex);
2794  bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2795  DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
2796  if (opt_ndb_extra_logging)
2797  {
2798  sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
2799  " Subscriber bitmask %x%x",
2800  pOp->getNdbdNodeId(),
2801  req_id,
2802  tmp_share->subscriber_bitmap[node_id].bitmap[1],
2803  tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2804  }
2805  pthread_mutex_unlock(&tmp_share->mutex);
2806  (void) pthread_cond_signal(&injector_cond);
2807  break;
2808  }
2809  default:
2810  sql_print_error("NDB Binlog: unknown non data event %d for %s. "
2811  "Ignoring...", (unsigned) ev_type, tmp_share->key);
2812  }
2813  }
2814  DBUG_RETURN(0);
2815 }
2816 
2817 /*
2818  process any operations that should be done after
2819  the epoch is complete
2820 */
2821 static void
2822 ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
2824  *post_epoch_log_list,
2826  *post_epoch_unlock_list)
2827 {
2828  if (post_epoch_log_list->elements == 0)
2829  return;
2830  DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
2831  Cluster_schema *schema;
2832  Thd_ndb *thd_ndb= get_thd_ndb(thd);
2833  Ndb *ndb= thd_ndb->ndb;
2834  NDBDICT *dict= ndb->getDictionary();
2835  while ((schema= post_epoch_log_list->pop()))
2836  {
2837  Thd_ndb_options_guard thd_ndb_options(thd_ndb);
2838  DBUG_PRINT("info",
2839  ("%s.%s: log query_length: %d query: '%s' type: %d",
2840  schema->db, schema->name,
2841  schema->query_length, schema->query,
2842  schema->type));
2843  int log_query= 0;
2844  {
2845  enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
2846  char key[FN_REFLEN + 1];
2847  build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
2848  if (schema_type == SOT_CLEAR_SLOCK)
2849  {
2850  pthread_mutex_lock(&ndbcluster_mutex);
2851  NDB_SCHEMA_OBJECT *ndb_schema_object=
2852  (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
2853  (const uchar*) key, strlen(key));
2854  if (ndb_schema_object &&
2855  (ndb_schema_object->table_id == schema->id &&
2856  ndb_schema_object->table_version == schema->version))
2857  {
2858  pthread_mutex_lock(&ndb_schema_object->mutex);
2859  if (opt_ndb_extra_logging > 19)
2860  {
2861  sql_print_information("NDB: CLEAR_SLOCK key: %s(%u/%u) from"
2862  " %x%x to %x%x",
2863  key, schema->id, schema->version,
2864  ndb_schema_object->slock[0],
2865  ndb_schema_object->slock[1],
2866  schema->slock[0],
2867  schema->slock[1]);
2868  }
2869  memcpy(ndb_schema_object->slock, schema->slock,
2870  sizeof(ndb_schema_object->slock));
2871  DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
2872  (uchar*)ndb_schema_object->slock_bitmap.bitmap,
2873  no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2874  pthread_mutex_unlock(&ndb_schema_object->mutex);
2875  pthread_cond_signal(&injector_cond);
2876  }
2877  else if (opt_ndb_extra_logging > 19)
2878  {
2879  if (ndb_schema_object == 0)
2880  {
2881  sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)",
2882  key, schema->id, schema->version);
2883  }
2884  else
2885  {
2886  sql_print_information("NDB: Discarding event...key: %s "
2887  "non matching id/version [%u/%u] != [%u/%u]",
2888  key,
2889  ndb_schema_object->table_id,
2890  ndb_schema_object->table_version,
2891  schema->id,
2892  schema->version);
2893  }
2894  }
2895  pthread_mutex_unlock(&ndbcluster_mutex);
2896  continue;
2897  }
2898  /* ndb_share reference temporary, free below */
2899  NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2900  if (share)
2901  {
2902  DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
2903  share->key, share->use_count));
2904  }
2905  switch (schema_type)
2906  {
2907  case SOT_DROP_DB:
2908  log_query= 1;
2909  break;
2910  case SOT_DROP_TABLE:
2911  if (opt_ndb_extra_logging > 9)
2912  sql_print_information("SOT_DROP_TABLE %s.%s", schema->db, schema->name);
2913  log_query= 1;
2914  {
2915  ndb->setDatabaseName(schema->db);
2916  Ndb_table_guard ndbtab_g(dict, schema->name);
2917  ndbtab_g.invalidate();
2918  }
2919  {
2920  TABLE_LIST table_list;
2921  memset(&table_list, 0, sizeof(table_list));
2922  table_list.db= schema->db;
2923  table_list.alias= table_list.table_name= schema->name;
2924  close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2925  }
2926  break;
2927  case SOT_RENAME_TABLE:
2928  if (opt_ndb_extra_logging > 9)
2929  sql_print_information("SOT_RENAME_TABLE %s.%s", schema->db, schema->name);
2930  log_query= 1;
2931  if (share)
2932  {
2933  ndbcluster_rename_share(thd, share);
2934  }
2935  break;
2936  case SOT_RENAME_TABLE_PREPARE:
2937  if (opt_ndb_extra_logging > 9)
2938  sql_print_information("SOT_RENAME_TABLE_PREPARE %s.%s -> %s",
2939  schema->db, schema->name, schema->query);
2940  if (share &&
2941  schema->node_id != g_ndb_cluster_connection->node_id())
2942  ndbcluster_prepare_rename_share(share, schema->query);
2943  break;
2944  case SOT_ALTER_TABLE_COMMIT:
2945  if (opt_ndb_extra_logging > 9)
2946  sql_print_information("SOT_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
2947  if (schema->node_id == g_ndb_cluster_connection->node_id())
2948  break;
2949  log_query= 1;
2950  {
2951  ndb->setDatabaseName(schema->db);
2952  Ndb_table_guard ndbtab_g(dict, schema->name);
2953  ndbtab_g.invalidate();
2954  }
2955  {
2956  TABLE_LIST table_list;
2957  memset(&table_list, 0, sizeof(table_list));
2958  table_list.db= schema->db;
2959  table_list.alias= table_list.table_name= schema->name;
2960  close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2961  }
2962  if (share)
2963  {
2964  if (share->op)
2965  {
2966  Ndb_event_data *event_data= (Ndb_event_data *) share->op->getCustomData();
2967  if (event_data)
2968  delete event_data;
2969  share->op->setCustomData(NULL);
2970  {
2971  Mutex_guard injector_mutex_g(injector_mutex);
2972  injector_ndb->dropEventOperation(share->op);
2973  }
2974  share->op= 0;
2975  free_share(&share);
2976  }
2977  free_share(&share);
2978  }
2979 
2980  if (share)
2981  {
2982  /*
2983  Free the share pointer early, ndb_create_table_from_engine()
2984  may delete what share is pointing to as a sideeffect
2985  */
2986  DBUG_PRINT("NDB_SHARE", ("%s early free, use_count: %u",
2987  share->key, share->use_count));
2988  free_share(&share);
2989  share= 0;
2990  }
2991 
2992  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2993  if (ndbcluster_check_if_local_table(schema->db, schema->name) &&
2994  !Ndb_dist_priv_util::is_distributed_priv_table(schema->db,
2995  schema->name))
2996  {
2997  sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' "
2998  "from binlog schema event '%s' from node %d.",
2999  schema->db, schema->name, schema->query,
3000  schema->node_id);
3001  }
3002  else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
3003  {
3004  print_could_not_discover_error(thd, schema);
3005  }
3006  break;
3007 
3008  case SOT_ONLINE_ALTER_TABLE_PREPARE:
3009  {
3010  if (opt_ndb_extra_logging > 9)
3011  sql_print_information("SOT_ONLINE_ALTER_TABLE_PREPARE %s.%s", schema->db, schema->name);
3012  int error= 0;
3013  ndb->setDatabaseName(schema->db);
3014  {
3015  Ndb_table_guard ndbtab_g(dict, schema->name);
3016  ndbtab_g.get_table();
3017  ndbtab_g.invalidate();
3018  }
3019  Ndb_table_guard ndbtab_g(dict, schema->name);
3020  const NDBTAB *ndbtab= ndbtab_g.get_table();
3021  /*
3022  Refresh local frm file and dictionary cache if
3023  remote on-line alter table
3024  */
3025  TABLE_LIST table_list;
3026  memset(&table_list, 0, sizeof(table_list));
3027  table_list.db= (char *)schema->db;
3028  table_list.alias= table_list.table_name= (char *)schema->name;
3029  close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE);
3030 
3031  if (schema->node_id != g_ndb_cluster_connection->node_id())
3032  {
3033  char key[FN_REFLEN];
3034  uchar *data= 0, *pack_data= 0;
3035  size_t length, pack_length;
3036 
3037  DBUG_PRINT("info", ("Detected frm change of table %s.%s",
3038  schema->db, schema->name));
3039  log_query= 1;
3040  build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0);
3041  /*
3042  If the there is no local table shadowing the altered table and
3043  it has an frm that is different than the one on disk then
3044  overwrite it with the new table definition
3045  */
3046  if (!ndbcluster_check_if_local_table(schema->db, schema->name) &&
3047  readfrm(key, &data, &length) == 0 &&
3048  packfrm(data, length, &pack_data, &pack_length) == 0 &&
3049  cmp_frm(ndbtab, pack_data, pack_length))
3050  {
3051  DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
3052  ndbtab->getFrmLength());
3053  my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
3054  data= NULL;
3055  if ((error= unpackfrm(&data, &length,
3056  (const uchar*) ndbtab->getFrmData())) ||
3057  (error= writefrm(key, data, length)))
3058  {
3059  sql_print_error("NDB: Failed write frm for %s.%s, error %d",
3060  schema->db, schema->name, error);
3061  }
3062  }
3063  my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
3064  my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
3065  }
3066  if (share)
3067  {
3068  if (opt_ndb_extra_logging > 9)
3069  sql_print_information("NDB Binlog: handeling online alter/rename");
3070 
3071  pthread_mutex_lock(&share->mutex);
3072  ndbcluster_binlog_close_table(thd, share);
3073 
3074  if ((error= ndb_binlog_open_shadow_table(thd, share)))
3075  sql_print_error("NDB Binlog: Failed to re-open shadow table %s.%s",
3076  schema->db, schema->name);
3077  if (error)
3078  pthread_mutex_unlock(&share->mutex);
3079  }
3080  if (!error && share)
3081  {
3082  if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
3083  share->flags|= NSF_HIDDEN_PK;
3084  /*
3085  Refresh share->flags to handle added BLOB columns
3086  */
3087  if (share->event_data->shadow_table->s->blob_fields != 0)
3088  share->flags|= NSF_BLOB_FLAG;
3089 
3090  /*
3091  Start subscribing to data changes to the new table definition
3092  */
3093  String event_name(INJECTOR_EVENT_LEN);
3094  ndb_rep_event_name(&event_name, schema->db, schema->name,
3095  get_binlog_full(share));
3096  NdbEventOperation *tmp_op= share->op;
3097  share->new_op= 0;
3098  share->op= 0;
3099 
3100  if (ndbcluster_create_event_ops(thd, share, ndbtab, event_name.c_ptr()))
3101  {
3102  sql_print_error("NDB Binlog:"
3103  "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
3104  event_name.c_ptr());
3105  }
3106  else
3107  {
3108  share->new_op= share->op;
3109  }
3110  share->op= tmp_op;
3111  pthread_mutex_unlock(&share->mutex);
3112 
3113  if (opt_ndb_extra_logging > 9)
3114  sql_print_information("NDB Binlog: handeling online alter/rename done");
3115  }
3116  break;
3117  }
3118  case SOT_ONLINE_ALTER_TABLE_COMMIT:
3119  {
3120  if (opt_ndb_extra_logging > 9)
3121  sql_print_information("SOT_ONLINE_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
3122  if (share)
3123  {
3124  pthread_mutex_lock(&share->mutex);
3125  if (share->op && share->new_op)
3126  {
3127  Ndb_event_data *event_data= (Ndb_event_data *) share->op->getCustomData();
3128  if (event_data)
3129  delete event_data;
3130  share->op->setCustomData(NULL);
3131  {
3132  Mutex_guard injector_mutex_g(injector_mutex);
3133  injector_ndb->dropEventOperation(share->op);
3134  }
3135  share->op= share->new_op;
3136  share->new_op= 0;
3137  free_share(&share);
3138  }
3139  pthread_mutex_unlock(&share->mutex);
3140  }
3141  break;
3142  }
3143  case SOT_RENAME_TABLE_NEW:
3144  if (opt_ndb_extra_logging > 9)
3145  sql_print_information("SOT_RENAME_TABLE_NEW %s.%s", schema->db, schema->name);
3146  log_query= 1;
3147  if (ndb_binlog_running && (!share || !share->op))
3148  {
3149  /*
3150  we need to free any share here as command below
3151  may need to call handle_trailing_share
3152  */
3153  if (share)
3154  {
3155  /* ndb_share reference temporary free */
3156  DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
3157  share->key, share->use_count));
3158  free_share(&share);
3159  share= 0;
3160  }
3161  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3162  if (ndbcluster_check_if_local_table(schema->db, schema->name))
3163  {
3164  DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
3165  schema->db, schema->name));
3166  sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
3167  "binlog schema event '%s' from node %d. ",
3168  schema->db, schema->name, schema->query,
3169  schema->node_id);
3170  }
3171  else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
3172  {
3173  print_could_not_discover_error(thd, schema);
3174  }
3175  }
3176  break;
3177  default:
3178  DBUG_ASSERT(FALSE);
3179  }
3180  if (share)
3181  {
3182  /* ndb_share reference temporary free */
3183  DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
3184  share->key, share->use_count));
3185  free_share(&share);
3186  share= 0;
3187  }
3188  }
3189  if (ndb_binlog_running && log_query)
3190  ndb_binlog_query(thd, schema);
3191  }
3192  while ((schema= post_epoch_unlock_list->pop()))
3193  {
3194  ndbcluster_update_slock(thd, schema->db, schema->name,
3195  schema->id, schema->version);
3196  }
3197  DBUG_VOID_RETURN;
3198 }
3199 
3200 /*
3201  Timer class for doing performance measurements
3202 */
3203 
3204 /*********************************************************************
3205  Internal helper functions for handeling of the cluster replication tables
3206  - ndb_binlog_index
3207  - ndb_apply_status
3208 *********************************************************************/
3209 
3210 /*
3211  struct to hold the data to be inserted into the
3212  ndb_binlog_index table
3213 */
3214 struct ndb_binlog_index_row {
3215  ulonglong epoch;
3216  const char *master_log_file;
3217  ulonglong master_log_pos;
3218  ulong n_inserts;
3219  ulong n_updates;
3220  ulong n_deletes;
3221  ulong n_schemaops;
3222 
3223  ulong orig_server_id;
3224  ulonglong orig_epoch;
3225 
3226  ulong gci;
3227 
3228  struct ndb_binlog_index_row *next;
3229 };
3230 
3231 
3232 /*
3233  Open the ndb_binlog_index table for writing
3234 */
3235 static int
3236 ndb_binlog_index_table__open(THD *thd,
3237  TABLE **ndb_binlog_index)
3238 {
3239  const char *save_proc_info=
3240  thd_proc_info(thd, "Opening " NDB_REP_DB "." NDB_REP_TABLE);
3241 
3242  TABLE_LIST tables;
3243  tables.init_one_table(STRING_WITH_LEN(NDB_REP_DB), // db
3244  STRING_WITH_LEN(NDB_REP_TABLE), // name
3245  NDB_REP_TABLE, // alias
3246  TL_WRITE); // for write
3247 
3248  /* Only allow real table to be opened */
3249  tables.required_type= FRMTYPE_TABLE;
3250 
3251  const bool derived = false;
3252  const uint flags =
3253  MYSQL_LOCK_IGNORE_TIMEOUT; /* Wait for lock "infinitely" */
3254  if (open_and_lock_tables(thd, &tables, derived, flags))
3255  {
3256  if (thd->killed)
3257  sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
3258  else
3259  sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
3260  thd->get_stmt_da()->sql_errno(),
3261  thd->get_stmt_da()->message());
3262  thd_proc_info(thd, save_proc_info);
3263  return -1;
3264  }
3265  *ndb_binlog_index= tables.table;
3266  thd_proc_info(thd, save_proc_info);
3267  return 0;
3268 }
3269 
3270 
3271 /*
3272  Write rows to the ndb_binlog_index table
3273 */
3274 static int
3275 ndb_binlog_index_table__write_rows(THD *thd,
3276  ndb_binlog_index_row *row)
3277 {
3278  int error= 0;
3279  ndb_binlog_index_row *first= row;
3280  TABLE *ndb_binlog_index= 0;
3281 
3282  /*
3283  Assume this function is not called with an error set in thd
3284  (but clear for safety in release version)
3285  */
3286  assert(!thd->is_error());
3287  thd->clear_error();
3288 
3289  /*
3290  Turn of binlogging to prevent the table changes to be written to
3291  the binary log.
3292  */
3293  tmp_disable_binlog(thd);
3294 
3295  if (ndb_binlog_index_table__open(thd, &ndb_binlog_index))
3296  {
3297  sql_print_error("NDB Binlog: Unable to open ndb_binlog_index table");
3298  error= -1;
3299  goto add_ndb_binlog_index_err;
3300  }
3301 
3302  // Set all columns to be written
3303  ndb_binlog_index->use_all_columns();
3304 
3305  do
3306  {
3307  ulonglong epoch= 0, orig_epoch= 0;
3308  uint orig_server_id= 0;
3309 
3310  // Intialize ndb_binlog_index->record[0]
3311  empty_record(ndb_binlog_index);
3312 
3313  ndb_binlog_index->field[0]->store(first->master_log_pos, true);
3314  ndb_binlog_index->field[1]->store(first->master_log_file,
3315  strlen(first->master_log_file),
3316  &my_charset_bin);
3317  ndb_binlog_index->field[2]->store(epoch= first->epoch, true);
3318  if (ndb_binlog_index->s->fields > 7)
3319  {
3320  ndb_binlog_index->field[3]->store(row->n_inserts, true);
3321  ndb_binlog_index->field[4]->store(row->n_updates, true);
3322  ndb_binlog_index->field[5]->store(row->n_deletes, true);
3323  ndb_binlog_index->field[6]->store(row->n_schemaops, true);
3324  ndb_binlog_index->field[7]->store(orig_server_id= row->orig_server_id, true);
3325  ndb_binlog_index->field[8]->store(orig_epoch= row->orig_epoch, true);
3326  ndb_binlog_index->field[9]->store(first->gci, true);
3327  row= row->next;
3328  }
3329  else
3330  {
3331  while ((row= row->next))
3332  {
3333  first->n_inserts+= row->n_inserts;
3334  first->n_updates+= row->n_updates;
3335  first->n_deletes+= row->n_deletes;
3336  first->n_schemaops+= row->n_schemaops;
3337  }
3338  ndb_binlog_index->field[3]->store((ulonglong)first->n_inserts, true);
3339  ndb_binlog_index->field[4]->store((ulonglong)first->n_updates, true);
3340  ndb_binlog_index->field[5]->store((ulonglong)first->n_deletes, true);
3341  ndb_binlog_index->field[6]->store((ulonglong)first->n_schemaops, true);
3342  }
3343 
3344  if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
3345  {
3346  char tmp[128];
3347  if (ndb_binlog_index->s->fields > 7)
3348  my_snprintf(tmp, sizeof(tmp), "%u/%u,%u,%u/%u",
3349  uint(epoch >> 32), uint(epoch),
3350  orig_server_id,
3351  uint(orig_epoch >> 32), uint(orig_epoch));
3352 
3353  else
3354  my_snprintf(tmp, sizeof(tmp), "%u/%u", uint(epoch >> 32), uint(epoch));
3355  sql_print_error("NDB Binlog: Writing row (%s) to ndb_binlog_index: %d",
3356  tmp, error);
3357  error= -1;
3358  goto add_ndb_binlog_index_err;
3359  }
3360  } while (row);
3361 
3362 add_ndb_binlog_index_err:
3363  /*
3364  Explicitly commit or rollback the writes(although we normally
3365  use a non transactional engine for the ndb_binlog_index table)
3366  */
3367  thd->get_stmt_da()->set_overwrite_status(true);
3368  thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
3369  thd->get_stmt_da()->set_overwrite_status(false);
3370 
3371  // Close the tables this thread has opened
3372  close_thread_tables(thd);
3373 
3374  /*
3375  There should be no need for rolling back transaction due to deadlock
3376  (since ndb_binlog_index is non transactional).
3377  */
3378  DBUG_ASSERT(! thd->transaction_rollback_request);
3379 
3380  // Release MDL locks on the opened table
3381  thd->mdl_context.release_transactional_locks();
3382 
3383  reenable_binlog(thd);
3384  return error;
3385 }
3386 
3387 /*********************************************************************
3388  Functions for start, stop, wait for ndbcluster binlog thread
3389 *********************************************************************/
3390 
3391 pthread_handler_t ndb_binlog_thread_func(void *arg);
3392 
3393 int ndbcluster_binlog_start()
3394 {
3395  DBUG_ENTER("ndbcluster_binlog_start");
3396 
3397  if (::server_id == 0)
3398  {
3399  sql_print_warning("NDB: server id set to zero - changes logged to "
3400  "bin log with server id zero will be logged with "
3401  "another server id by slave mysqlds");
3402  }
3403 
3404  /*
3405  Check that ServerId is not using the reserved bit or bits reserved
3406  for application use
3407  */
3408  if ((::server_id & 0x1 << 31) || // Reserved bit
3409  !ndbcluster_anyvalue_is_serverid_in_range(::server_id)) // server_id_bits
3410  {
3411  sql_print_error("NDB: server id provided is too large to be represented in "
3412  "opt_server_id_bits or is reserved");
3413  DBUG_RETURN(-1);
3414  }
3415 
3416  pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
3417  pthread_cond_init(&injector_cond, NULL);
3418  pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
3419 
3420  /* Create injector thread */
3421  if (pthread_create(&ndb_binlog_thread, &connection_attrib,
3422  ndb_binlog_thread_func, 0))
3423  {
3424  DBUG_PRINT("error", ("Could not create ndb injector thread"));
3425  pthread_cond_destroy(&injector_cond);
3426  pthread_mutex_destroy(&injector_mutex);
3427  DBUG_RETURN(-1);
3428  }
3429 
3430  ndbcluster_binlog_inited= 1;
3431 
3432  /* Wait for the injector thread to start */
3433  pthread_mutex_lock(&injector_mutex);
3434  while (!ndb_binlog_thread_running)
3435  pthread_cond_wait(&injector_cond, &injector_mutex);
3436  pthread_mutex_unlock(&injector_mutex);
3437 
3438  if (ndb_binlog_thread_running < 0)
3439  DBUG_RETURN(-1);
3440 
3441  DBUG_RETURN(0);
3442 }
3443 
3444 
3445 /**************************************************************
3446  Internal helper functions for creating/dropping ndb events
3447  used by the client sql threads
3448 **************************************************************/
3449 void
3450 ndb_rep_event_name(String *event_name,const char *db, const char *tbl,
3451  my_bool full)
3452 {
3453  if (full)
3454  event_name->set_ascii("REPLF$", 6);
3455  else
3456  event_name->set_ascii("REPL$", 5);
3457  event_name->append(db);
3458 #ifdef NDB_WIN32
3459  /*
3460  * Some bright spark decided that we should sometimes have backslashes.
3461  * This causes us pain as the event is db/table and not db\table so trying
3462  * to drop db\table when we meant db/table ends in the event lying around
3463  * after drop table, leading to all sorts of pain.
3464  */
3465  String backslash_sep(1);
3466  backslash_sep.set_ascii("\\",1);
3467 
3468  int bsloc;
3469  if((bsloc= event_name->strstr(backslash_sep,0))!=-1)
3470  event_name->replace(bsloc, 1, "/", 1);
3471 #endif
3472  if (tbl)
3473  {
3474  event_name->append('/');
3475  event_name->append(tbl);
3476  }
3477  DBUG_PRINT("info", ("ndb_rep_event_name: %s", event_name->c_ptr()));
3478 }
3479 
3480 #ifdef HAVE_NDB_BINLOG
3481 static void
3482 set_binlog_flags(NDB_SHARE *share,
3483  Ndb_binlog_type ndb_binlog_type)
3484 {
3485  DBUG_ENTER("set_binlog_flags");
3486  switch (ndb_binlog_type)
3487  {
3488  case NBT_NO_LOGGING:
3489  DBUG_PRINT("info", ("NBT_NO_LOGGING"));
3490  set_binlog_nologging(share);
3491  DBUG_VOID_RETURN;
3492  case NBT_DEFAULT:
3493  DBUG_PRINT("info", ("NBT_DEFAULT"));
3494  if (opt_ndb_log_updated_only)
3495  {
3496  set_binlog_updated_only(share);
3497  }
3498  else
3499  {
3500  set_binlog_full(share);
3501  }
3502  if (opt_ndb_log_update_as_write)
3503  {
3504  set_binlog_use_write(share);
3505  }
3506  else
3507  {
3508  set_binlog_use_update(share);
3509  }
3510  break;
3511  case NBT_UPDATED_ONLY:
3512  DBUG_PRINT("info", ("NBT_UPDATED_ONLY"));
3513  set_binlog_updated_only(share);
3514  set_binlog_use_write(share);
3515  break;
3516  case NBT_USE_UPDATE:
3517  DBUG_PRINT("info", ("NBT_USE_UPDATE"));
3518  case NBT_UPDATED_ONLY_USE_UPDATE:
3519  DBUG_PRINT("info", ("NBT_UPDATED_ONLY_USE_UPDATE"));
3520  set_binlog_updated_only(share);
3521  set_binlog_use_update(share);
3522  break;
3523  case NBT_FULL:
3524  DBUG_PRINT("info", ("NBT_FULL"));
3525  set_binlog_full(share);
3526  set_binlog_use_write(share);
3527  break;
3528  case NBT_FULL_USE_UPDATE:
3529  DBUG_PRINT("info", ("NBT_FULL_USE_UPDATE"));
3530  set_binlog_full(share);
3531  set_binlog_use_update(share);
3532  break;
3533  }
3534  set_binlog_logging(share);
3535  DBUG_VOID_RETURN;
3536 }
3537 
3538 
3539 inline void slave_reset_conflict_fn(NDB_SHARE *share)
3540 {
3541  NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
3542  if (cfn_share)
3543  {
3544  memset(cfn_share, 0, sizeof(*cfn_share));
3545  }
3546 }
3547 
3548 static uint
3549 slave_check_resolve_col_type(const NDBTAB *ndbtab,
3550  uint field_index)
3551 {
3552  DBUG_ENTER("slave_check_resolve_col_type");
3553  const NDBCOL *c= ndbtab->getColumn(field_index);
3554  uint sz= 0;
3555  switch (c->getType())
3556  {
3557  case NDBCOL::Unsigned:
3558  sz= sizeof(Uint32);
3559  DBUG_PRINT("info", ("resolve column Uint32 %u",
3560  field_index));
3561  break;
3562  case NDBCOL::Bigunsigned:
3563  sz= sizeof(Uint64);
3564  DBUG_PRINT("info", ("resolve column Uint64 %u",
3565  field_index));
3566  break;
3567  default:
3568  DBUG_PRINT("info", ("resolve column %u has wrong type",
3569  field_index));
3570  break;
3571  }
3572  DBUG_RETURN(sz);
3573 }
3574 
3575 static int
3576 slave_set_resolve_fn(THD *thd, NDB_SHARE *share,
3577  const NDBTAB *ndbtab, uint field_index,
3578  uint resolve_col_sz,
3579  const st_conflict_fn_def* conflict_fn,
3580  TABLE *table,
3581  uint8 flags)
3582 {
3583  DBUG_ENTER("slave_set_resolve_fn");
3584 
3585  Thd_ndb *thd_ndb= get_thd_ndb(thd);
3586  Ndb *ndb= thd_ndb->ndb;
3587  NDBDICT *dict= ndb->getDictionary();
3588  NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
3589  if (cfn_share == NULL)
3590  {
3591  share->m_cfn_share= cfn_share= (NDB_CONFLICT_FN_SHARE*)
3592  alloc_root(&share->mem_root, sizeof(NDB_CONFLICT_FN_SHARE));
3593  slave_reset_conflict_fn(share);
3594  }
3595  cfn_share->m_conflict_fn= conflict_fn;
3596 
3597  /* Calculate resolve col stuff (if relevant) */
3598  cfn_share->m_resolve_size= resolve_col_sz;
3599  cfn_share->m_resolve_column= field_index;
3600  cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
3601  table->record[0]);
3602  cfn_share->m_flags = flags;
3603 
3604  {
3605  /* get exceptions table */
3606  char ex_tab_name[FN_REFLEN];
3607  strxnmov(ex_tab_name, sizeof(ex_tab_name), share->table_name,
3608  lower_case_table_names ? NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER :
3609  NDB_EXCEPTIONS_TABLE_SUFFIX, NullS);
3610  ndb->setDatabaseName(share->db);
3611  Ndb_table_guard ndbtab_g(dict, ex_tab_name);
3612  const NDBTAB *ex_tab= ndbtab_g.get_table();
3613  if (ex_tab)
3614  {
3615  const int fixed_cols= 4;
3616  bool ok=
3617  ex_tab->getNoOfColumns() >= fixed_cols &&
3618  ex_tab->getNoOfPrimaryKeys() == 4 &&
3619  /* server id */
3620  ex_tab->getColumn(0)->getType() == NDBCOL::Unsigned &&
3621  ex_tab->getColumn(0)->getPrimaryKey() &&
3622  /* master_server_id */
3623  ex_tab->getColumn(1)->getType() == NDBCOL::Unsigned &&
3624  ex_tab->getColumn(1)->getPrimaryKey() &&
3625  /* master_epoch */
3626  ex_tab->getColumn(2)->getType() == NDBCOL::Bigunsigned &&
3627  ex_tab->getColumn(2)->getPrimaryKey() &&
3628  /* count */
3629  ex_tab->getColumn(3)->getType() == NDBCOL::Unsigned &&
3630  ex_tab->getColumn(3)->getPrimaryKey();
3631  if (ok)
3632  {
3633  int ncol= ndbtab->getNoOfColumns();
3634  int nkey= ndbtab->getNoOfPrimaryKeys();
3635  int i, k;
3636  for (i= k= 0; i < ncol && k < nkey; i++)
3637  {
3638  const NdbDictionary::Column* col= ndbtab->getColumn(i);
3639  if (col->getPrimaryKey())
3640  {
3641  const NdbDictionary::Column* ex_col=
3642  ex_tab->getColumn(fixed_cols + k);
3643  ok=
3644  ex_col != NULL &&
3645  col->getType() == ex_col->getType() &&
3646  col->getLength() == ex_col->getLength() &&
3647  col->getNullable() == ex_col->getNullable();
3648  if (!ok)
3649  break;
3650  cfn_share->m_offset[k]=
3651  (uint16)(table->field[i]->ptr - table->record[0]);
3652  k++;
3653  }
3654  }
3655  if (ok)
3656  {
3657  cfn_share->m_ex_tab= ex_tab;
3658  cfn_share->m_pk_cols= nkey;
3659  ndbtab_g.release();
3660  if (opt_ndb_extra_logging)
3661  sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
3662  table->s->db.str,
3663  table->s->table_name.str,
3664  table->s->db.str,
3665  ex_tab_name);
3666  }
3667  else
3668  sql_print_warning("NDB Slave: exceptions table %s has wrong "
3669  "definition (column %d)",
3670  ex_tab_name, fixed_cols + k);
3671  }
3672  else
3673  sql_print_warning("NDB Slave: exceptions table %s has wrong "
3674  "definition (initial %d columns)",
3675  ex_tab_name, fixed_cols);
3676  }
3677  }
3678  DBUG_RETURN(0);
3679 }
3680 
3694 int
3695 row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
3696  enum_conflicting_op_type op_type,
3697  const uchar* old_data,
3698  const uchar* new_data,
3699  const MY_BITMAP* write_set,
3701 {
3702  DBUG_ENTER("row_conflict_fn_old");
3703  uint32 resolve_column= cfn_share->m_resolve_column;
3704  uint32 resolve_size= cfn_share->m_resolve_size;
3705  const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
3706 
3707  assert((resolve_size == 4) || (resolve_size == 8));
3708 
3709  if (unlikely(!bitmap_is_set(write_set, resolve_column)))
3710  {
3711  sql_print_information("NDB Slave: missing data for %s",
3712  cfn_share->m_conflict_fn->name);
3713  DBUG_RETURN(1);
3714  }
3715 
3716  const uint label_0= 0;
3717  const Uint32 RegOldValue= 1, RegCurrentValue= 2;
3718  int r;
3719 
3720  DBUG_PRINT("info",
3721  ("Adding interpreted filter, existing value must eq event old value"));
3722  /*
3723  * read old value from record
3724  */
3725  union {
3726  uint32 old_value_32;
3727  uint64 old_value_64;
3728  };
3729  {
3730  if (resolve_size == 4)
3731  {
3732  memcpy(&old_value_32, field_ptr, resolve_size);
3733  DBUG_PRINT("info", (" old_value_32: %u", old_value_32));
3734  }
3735  else
3736  {
3737  memcpy(&old_value_64, field_ptr, resolve_size);
3738  DBUG_PRINT("info", (" old_value_64: %llu",
3739  (unsigned long long) old_value_64));
3740  }
3741  }
3742 
3743  /*
3744  * Load registers RegOldValue and RegCurrentValue
3745  */
3746  if (resolve_size == 4)
3747  r= code->load_const_u32(RegOldValue, old_value_32);
3748  else
3749  r= code->load_const_u64(RegOldValue, old_value_64);
3750  DBUG_ASSERT(r == 0);
3751  r= code->read_attr(RegCurrentValue, resolve_column);
3752  DBUG_ASSERT(r == 0);
3753  /*
3754  * if RegOldValue == RegCurrentValue goto label_0
3755  * else raise error for this row
3756  */
3757  r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
3758  DBUG_ASSERT(r == 0);
3759  r= code->interpret_exit_nok(error_conflict_fn_violation);
3760  DBUG_ASSERT(r == 0);
3761  r= code->def_label(label_0);
3762  DBUG_ASSERT(r == 0);
3763  r= code->interpret_exit_ok();
3764  DBUG_ASSERT(r == 0);
3765  r= code->finalise();
3766  DBUG_ASSERT(r == 0);
3767  DBUG_RETURN(r);
3768 }
3769 
3770 int
3771 row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
3772  enum_conflicting_op_type op_type,
3773  const uchar* old_data,
3774  const uchar* new_data,
3775  const MY_BITMAP* write_set,
3776  NdbInterpretedCode* code)
3777 {
3778  DBUG_ENTER("row_conflict_fn_max_update_only");
3779  uint32 resolve_column= cfn_share->m_resolve_column;
3780  uint32 resolve_size= cfn_share->m_resolve_size;
3781  const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
3782 
3783  assert((resolve_size == 4) || (resolve_size == 8));
3784 
3785  if (unlikely(!bitmap_is_set(write_set, resolve_column)))
3786  {
3787  sql_print_information("NDB Slave: missing data for %s",
3788  cfn_share->m_conflict_fn->name);
3789  DBUG_RETURN(1);
3790  }
3791 
3792  const uint label_0= 0;
3793  const Uint32 RegNewValue= 1, RegCurrentValue= 2;
3794  int r;
3795 
3796  DBUG_PRINT("info",
3797  ("Adding interpreted filter, existing value must be lt event new"));
3798  /*
3799  * read new value from record
3800  */
3801  union {
3802  uint32 new_value_32;
3803  uint64 new_value_64;
3804  };
3805  {
3806  if (resolve_size == 4)
3807  {
3808  memcpy(&new_value_32, field_ptr, resolve_size);
3809  DBUG_PRINT("info", (" new_value_32: %u", new_value_32));
3810  }
3811  else
3812  {
3813  memcpy(&new_value_64, field_ptr, resolve_size);
3814  DBUG_PRINT("info", (" new_value_64: %llu",
3815  (unsigned long long) new_value_64));
3816  }
3817  }
3818  /*
3819  * Load registers RegNewValue and RegCurrentValue
3820  */
3821  if (resolve_size == 4)
3822  r= code->load_const_u32(RegNewValue, new_value_32);
3823  else
3824  r= code->load_const_u64(RegNewValue, new_value_64);
3825  DBUG_ASSERT(r == 0);
3826  r= code->read_attr(RegCurrentValue, resolve_column);
3827  DBUG_ASSERT(r == 0);
3828  /*
3829  * if RegNewValue > RegCurrentValue goto label_0
3830  * else raise error for this row
3831  */
3832  r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
3833  DBUG_ASSERT(r == 0);
3834  r= code->interpret_exit_nok(error_conflict_fn_violation);
3835  DBUG_ASSERT(r == 0);
3836  r= code->def_label(label_0);
3837  DBUG_ASSERT(r == 0);
3838  r= code->interpret_exit_ok();
3839  DBUG_ASSERT(r == 0);
3840  r= code->finalise();
3841  DBUG_ASSERT(r == 0);
3842  DBUG_RETURN(r);
3843 }
3844 
3857 int
3858 row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
3859  enum_conflicting_op_type op_type,
3860  const uchar* old_data,
3861  const uchar* new_data,
3862  const MY_BITMAP* write_set,
3863  NdbInterpretedCode* code)
3864 {
3865  switch(op_type)
3866  {
3867  case WRITE_ROW:
3868  abort();
3869  return 1;
3870  case UPDATE_ROW:
3871  return row_conflict_fn_max_update_only(cfn_share,
3872  op_type,
3873  old_data,
3874  new_data,
3875  write_set,
3876  code);
3877  case DELETE_ROW:
3878  /* Can't use max of new image, as there's no new image
3879  * for DELETE
3880  * Use OLD instead
3881  */
3882  return row_conflict_fn_old(cfn_share,
3883  op_type,
3884  old_data,
3885  new_data,
3886  write_set,
3887  code);
3888  default:
3889  abort();
3890  return 1;
3891  }
3892 }
3893 
3894 
3909 int
3910 row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
3911  enum_conflicting_op_type op_type,
3912  const uchar* old_data,
3913  const uchar* new_data,
3914  const MY_BITMAP* write_set,
3915  NdbInterpretedCode* code)
3916 {
3917  switch(op_type)
3918  {
3919  case WRITE_ROW:
3920  abort();
3921  return 1;
3922  case UPDATE_ROW:
3923  return row_conflict_fn_max_update_only(cfn_share,
3924  op_type,
3925  old_data,
3926  new_data,
3927  write_set,
3928  code);
3929  case DELETE_ROW:
3930  /* This variant always lets a received DELETE_ROW
3931  * succeed.
3932  */
3933  return 1;
3934  default:
3935  abort();
3936  return 1;
3937  }
3938 };
3939 
3940 
3946 int
3947 row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share,
3948  enum_conflicting_op_type op_type,
3949  const uchar* old_data,
3950  const uchar* new_data,
3951  const MY_BITMAP* write_set,
3952  NdbInterpretedCode* code)
3953 {
3954  DBUG_ENTER("row_conflict_fn_epoch");
3955  switch(op_type)
3956  {
3957  case WRITE_ROW:
3958  abort();
3959  DBUG_RETURN(1);
3960  case UPDATE_ROW:
3961  case DELETE_ROW:
3962  {
3963  const uint label_0= 0;
3964  const Uint32
3965  RegAuthor= 1, RegZero= 2,
3966  RegMaxRepEpoch= 1, RegRowEpoch= 2;
3967  int r;
3968 
3969  r= code->load_const_u32(RegZero, 0);
3970  assert(r == 0);
3971  r= code->read_attr(RegAuthor, NdbDictionary::Column::ROW_AUTHOR);
3972  assert(r == 0);
3973  /* If last author was not local, assume no conflict */
3974  r= code->branch_ne(RegZero, RegAuthor, label_0);
3975  assert(r == 0);
3976 
3977  /*
3978  * Load registers RegMaxRepEpoch and RegRowEpoch
3979  */
3980  r= code->load_const_u64(RegMaxRepEpoch, g_ndb_slave_state.max_rep_epoch);
3981  assert(r == 0);
3982  r= code->read_attr(RegRowEpoch, NdbDictionary::Column::ROW_GCI64);
3983  assert(r == 0);
3984 
3985  /*
3986  * if RegRowEpoch <= RegMaxRepEpoch goto label_0
3987  * else raise error for this row
3988  */
3989  r= code->branch_le(RegRowEpoch, RegMaxRepEpoch, label_0);
3990  assert(r == 0);
3991  r= code->interpret_exit_nok(error_conflict_fn_violation);
3992  assert(r == 0);
3993  r= code->def_label(label_0);
3994  assert(r == 0);
3995  r= code->interpret_exit_ok();
3996  assert(r == 0);
3997  r= code->finalise();
3998  assert(r == 0);
3999  DBUG_RETURN(r);
4000  }
4001  default:
4002  abort();
4003  DBUG_RETURN(1);
4004  }
4005 };
4006 
4007 static const st_conflict_fn_arg_def resolve_col_args[]=
4008 {
4009  /* Arg type Optional */
4010  { CFAT_COLUMN_NAME, false },
4011  { CFAT_END, false }
4012 };
4013 
4014 static const st_conflict_fn_arg_def epoch_fn_args[]=
4015 {
4016  /* Arg type Optional */
4017  { CFAT_EXTRA_GCI_BITS, true },
4018  { CFAT_END, false }
4019 };
4020 
4021 static const st_conflict_fn_def conflict_fns[]=
4022 {
4023  { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
4024  &resolve_col_args[0], row_conflict_fn_max_del_win },
4025  { "NDB$MAX", CFT_NDB_MAX,
4026  &resolve_col_args[0], row_conflict_fn_max },
4027  { "NDB$OLD", CFT_NDB_OLD,
4028  &resolve_col_args[0], row_conflict_fn_old },
4029  { "NDB$EPOCH", CFT_NDB_EPOCH,
4030  &epoch_fn_args[0], row_conflict_fn_epoch }
4031 };
4032 
4033 static unsigned n_conflict_fns=
4034  sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
4035 
4036 
4037 int
4038 parse_conflict_fn_spec(const char* conflict_fn_spec,
4039  const st_conflict_fn_def** conflict_fn,
4040  st_conflict_fn_arg* args,
4041  Uint32* max_args,
4042  const TABLE* table,
4043  char *msg, uint msg_len)
4044 {
4045  DBUG_ENTER("parse_conflict_fn_spec");
4046 
4047  Uint32 no_args = 0;
4048  const char *ptr= conflict_fn_spec;
4049  const char *error_str= "unknown conflict resolution function";
4050  /* remove whitespace */
4051  while (*ptr == ' ' && *ptr != '\0') ptr++;
4052 
4053  DBUG_PRINT("info", ("parsing %s", conflict_fn_spec));
4054 
4055  for (unsigned i= 0; i < n_conflict_fns; i++)
4056  {
4057  const st_conflict_fn_def &fn= conflict_fns[i];
4058 
4059  uint len= strlen(fn.name);
4060  if (strncmp(ptr, fn.name, len))
4061  continue;
4062 
4063  DBUG_PRINT("info", ("found function %s", fn.name));
4064 
4065  /* skip function name */
4066  ptr+= len;
4067 
4068  /* remove whitespace */
4069  while (*ptr == ' ' && *ptr != '\0') ptr++;
4070 
4071  /* next '(' */
4072  if (*ptr != '(')
4073  {
4074  error_str= "missing '('";
4075  DBUG_PRINT("info", ("parse error %s", error_str));
4076  break;
4077  }
4078  ptr++;
4079 
4080  /* find all arguments */
4081  for (;;)
4082  {
4083  if (no_args >= *max_args)
4084  {
4085  error_str= "too many arguments";
4086  DBUG_PRINT("info", ("parse error %s", error_str));
4087  break;
4088  }
4089 
4090  /* expected type */
4091  enum enum_conflict_fn_arg_type type=
4092  conflict_fns[i].arg_defs[no_args].arg_type;
4093 
4094  /* remove whitespace */
4095  while (*ptr == ' ' && *ptr != '\0') ptr++;
4096 
4097  if (type == CFAT_END)
4098  {
4099  args[no_args].type= type;
4100  error_str= NULL;
4101  break;
4102  }
4103 
4104  /* arg */
4105  /* Todo : Should support comma as an arg separator? */
4106  const char *start_arg= ptr;
4107  while (*ptr != ')' && *ptr != ' ' && *ptr != '\0') ptr++;
4108  const char *end_arg= ptr;
4109 
4110  bool optional_arg = conflict_fns[i].arg_defs[no_args].optional;
4111  /* any arg given? */
4112  if (start_arg == end_arg)
4113  {
4114  if (!optional_arg)
4115  {
4116  error_str= "missing function argument";
4117  DBUG_PRINT("info", ("parse error %s", error_str));
4118  break;
4119  }
4120  else
4121  {
4122  /* Arg was optional, and not present
4123  * Must be at end of args, finish parsing
4124  */
4125  args[no_args].type= CFAT_END;
4126  error_str= NULL;
4127  break;
4128  }
4129  }
4130 
4131  uint len= end_arg - start_arg;
4132  args[no_args].type= type;
4133  args[no_args].ptr= start_arg;
4134  args[no_args].len= len;
4135  args[no_args].fieldno= (uint32)-1;
4136 
4137  DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
4138 
4139  bool arg_processing_error = false;
4140  switch (type)
4141  {
4142  case CFAT_COLUMN_NAME:
4143  {
4144  /* find column in table */
4145  DBUG_PRINT("info", ("searching for %s %u", start_arg, len));
4146  TABLE_SHARE *table_s= table->s;
4147  for (uint j= 0; j < table_s->fields; j++)
4148  {
4149  Field *field= table_s->field[j];
4150  if (strncmp(start_arg, field->field_name, len) == 0 &&
4151  field->field_name[len] == '\0')
4152  {
4153  DBUG_PRINT("info", ("found %s", field->field_name));
4154  args[no_args].fieldno= j;
4155  break;
4156  }
4157  }
4158  break;
4159  }
4160  case CFAT_EXTRA_GCI_BITS:
4161  {
4162  /* Map string to number and check it's in range etc */
4163  char* end_of_arg = (char*) end_arg;
4164  Uint32 bits = strtoul(start_arg, &end_of_arg, 0);
4165  DBUG_PRINT("info", ("Using %u as the number of extra bits", bits));
4166 
4167  if (bits > 31)
4168  {
4169  arg_processing_error= true;
4170  error_str= "Too many extra Gci bits";
4171  DBUG_PRINT("info", ("%s", error_str));
4172  break;
4173  }
4174  /* Num bits seems ok */
4175  args[no_args].extraGciBits = bits;
4176  break;
4177  }
4178  case CFAT_END:
4179  abort();
4180  }
4181 
4182  if (arg_processing_error)
4183  break;
4184  no_args++;
4185  }
4186 
4187  if (error_str)
4188  break;
4189 
4190  /* remove whitespace */
4191  while (*ptr == ' ' && *ptr != '\0') ptr++;
4192 
4193  /* next ')' */
4194  if (*ptr != ')')
4195  {
4196  error_str= "missing ')'";
4197  break;
4198  }
4199  ptr++;
4200 
4201  /* remove whitespace */
4202  while (*ptr == ' ' && *ptr != '\0') ptr++;
4203 
4204  /* garbage in the end? */
4205  if (*ptr != '\0')
4206  {
4207  error_str= "garbage in the end";
4208  break;
4209  }
4210 
4211  /* Update ptrs to conflict fn + # of args */
4212  *conflict_fn = &conflict_fns[i];
4213  *max_args = no_args;
4214 
4215  DBUG_RETURN(0);
4216  }
4217  /* parse error */
4218  my_snprintf(msg, msg_len, "%s, %s at '%s'",
4219  conflict_fn_spec, error_str, ptr);
4220  DBUG_PRINT("info", ("%s", msg));
4221  DBUG_RETURN(-1);
4222 }
4223 
4224 static int
4225 setup_conflict_fn(THD *thd, NDB_SHARE *share,
4226  const NDBTAB *ndbtab,
4227  char *msg, uint msg_len,
4228  TABLE *table,
4229  const st_conflict_fn_def* conflict_fn,
4230  const st_conflict_fn_arg* args,
4231  const Uint32 num_args)
4232 {
4233  DBUG_ENTER("setup_conflict_fn");
4234 
4235  /* setup the function */
4236  switch (conflict_fn->type)
4237  {
4238  case CFT_NDB_MAX:
4239  case CFT_NDB_OLD:
4240  case CFT_NDB_MAX_DEL_WIN:
4241  {
4242  if (num_args != 1)
4243  {
4244  my_snprintf(msg, msg_len,
4245  "Incorrect arguments to conflict function");
4246  DBUG_PRINT("info", ("%s", msg));
4247  DBUG_RETURN(-1);
4248  }
4249 
4250  uint resolve_col_sz= 0;
4251 
4252  if (0 == (resolve_col_sz =
4253  slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
4254  {
4255  /* wrong data type */
4256  slave_reset_conflict_fn(share);
4257  my_snprintf(msg, msg_len,
4258  "column '%s' has wrong datatype",
4259  table->s->field[args[0].fieldno]->field_name);
4260  DBUG_PRINT("info", ("%s", msg));
4261  DBUG_RETURN(-1);
4262  }
4263 
4264  if (slave_set_resolve_fn(thd, share, ndbtab,
4265  args[0].fieldno, resolve_col_sz,
4266  conflict_fn, table, CFF_NONE))
4267  {
4268  my_snprintf(msg, msg_len,
4269  "unable to setup conflict resolution using column '%s'",
4270  table->s->field[args[0].fieldno]->field_name);
4271  DBUG_PRINT("info", ("%s", msg));
4272  DBUG_RETURN(-1);
4273  }
4274  if (opt_ndb_extra_logging)
4275  {
4276  sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
4277  table->s->db.str,
4278  table->s->table_name.str,
4279  conflict_fn->name,
4280  table->s->field[args[0].fieldno]->field_name);
4281  }
4282  break;
4283  }
4284  case CFT_NDB_EPOCH:
4285  {
4286  if (num_args > 1)
4287  {
4288  my_snprintf(msg, msg_len,
4289  "Too many arguments to conflict function");
4290  DBUG_PRINT("info", ("%s", msg));
4291  DBUG_RETURN(-1);
4292  }
4293 
4294  /* Check that table doesn't have Blobs as we don't support that */
4295  if (share->flags & NSF_BLOB_FLAG)
4296  {
4297  my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for NDB$EPOCH.");
4298  DBUG_PRINT("info", ("%s", msg));
4299  DBUG_RETURN(-1);
4300  }
4301 
4302  /* Check that table has required extra meta-columns */
4303  /* Todo : Could warn if extra gcibits is insufficient to
4304  * represent SavePeriod/EpochPeriod
4305  */
4306  if (ndbtab->getExtraRowGciBits() == 0)
4307  sql_print_information("Ndb Slave : CFT_NDB_EPOCH, low epoch resolution");
4308 
4309  if (ndbtab->getExtraRowAuthorBits() == 0)
4310  {
4311  my_snprintf(msg, msg_len, "No extra row author bits in table.");
4312  DBUG_PRINT("info", ("%s", msg));
4313  DBUG_RETURN(-1);
4314  }
4315 
4316  if (slave_set_resolve_fn(thd, share, ndbtab,
4317  0, // field_no
4318  0, // resolve_col_sz
4319  conflict_fn, table, CFF_REFRESH_ROWS))
4320  {
4321  my_snprintf(msg, msg_len,
4322  "unable to setup conflict resolution");
4323  DBUG_PRINT("info", ("%s", msg));
4324  DBUG_RETURN(-1);
4325  }
4326  if (opt_ndb_extra_logging)
4327  {
4328  sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s.",
4329  table->s->db.str,
4330  table->s->table_name.str,
4331  conflict_fn->name);
4332  }
4333  break;
4334  }
4335  case CFT_NUMBER_OF_CFTS:
4336  case CFT_NDB_UNDEF:
4337  abort();
4338  }
4339  DBUG_RETURN(0);
4340 }
4341 
4342 static const char *ndb_rep_db= NDB_REP_DB;
4343 static const char *ndb_replication_table= NDB_REPLICATION_TABLE;
4344 static const char *nrt_db= "db";
4345 static const char *nrt_table_name= "table_name";
4346 static const char *nrt_server_id= "server_id";
4347 static const char *nrt_binlog_type= "binlog_type";
4348 static const char *nrt_conflict_fn= "conflict_fn";
4349 
4350 /*
4351  ndbcluster_read_replication_table
4352 
4353  This function reads the information for the supplied table from
4354  the mysql.ndb_replication table.
4355  Where there is no information (or no table), defaults are
4356  returned.
4357 */
4358 int
4359 ndbcluster_read_replication_table(THD *thd, Ndb *ndb,
4360  const char* db,
4361  const char* table_name,
4362  uint server_id,
4363  Uint32* binlog_flags,
4364  char** conflict_fn_spec,
4365  char* conflict_fn_buffer,
4366  Uint32 conflict_fn_buffer_len)
4367 {
4368  DBUG_ENTER("ndbcluster_read_replication_table");
4369  NdbError ndberror;
4370  int error= 0;
4371  const char *error_str= "<none>";
4372 
4373  ndb->setDatabaseName(ndb_rep_db);
4374  NDBDICT *dict= ndb->getDictionary();
4375  Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
4376  const NDBTAB *reptab= ndbtab_g.get_table();
4377  if (reptab == NULL &&
4379  dict->getNdbError().code == 4009))
4380  {
4381  DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
4382  *binlog_flags= NBT_DEFAULT;
4383  *conflict_fn_spec= NULL;
4384  DBUG_RETURN(0);
4385  }
4386  const NDBCOL
4387  *col_db, *col_table_name, *col_server_id, *col_binlog_type, *col_conflict_fn;
4388  char tmp_buf[FN_REFLEN];
4389  uint retries= 100;
4390  int retry_sleep= 30; /* 30 milliseconds, transaction */
4391  if (reptab == NULL)
4392  {
4393  ndberror= dict->getNdbError();
4394  goto err;
4395  }
4396  if (reptab->getNoOfPrimaryKeys() != 3)
4397  {
4398  error= -2;
4399  error_str= "Wrong number of primary key parts, expected 3";
4400  goto err;
4401  }
4402  error= -1;
4403  col_db= reptab->getColumn(error_str= nrt_db);
4404  if (col_db == NULL ||
4405  !col_db->getPrimaryKey() ||
4406  col_db->getType() != NDBCOL::Varbinary)
4407  goto err;
4408  col_table_name= reptab->getColumn(error_str= nrt_table_name);
4409  if (col_table_name == NULL ||
4410  !col_table_name->getPrimaryKey() ||
4411  col_table_name->getType() != NDBCOL::Varbinary)
4412  goto err;
4413  col_server_id= reptab->getColumn(error_str= nrt_server_id);
4414  if (col_server_id == NULL ||
4415  !col_server_id->getPrimaryKey() ||
4416  col_server_id->getType() != NDBCOL::Unsigned)
4417  goto err;
4418  col_binlog_type= reptab->getColumn(error_str= nrt_binlog_type);
4419  if (col_binlog_type == NULL ||
4420  col_binlog_type->getPrimaryKey() ||
4421  col_binlog_type->getType() != NDBCOL::Unsigned)
4422  goto err;
4423  col_conflict_fn= reptab->getColumn(error_str= nrt_conflict_fn);
4424  if (col_conflict_fn == NULL)
4425  {
4426  col_conflict_fn= NULL;
4427  }
4428  else if (col_conflict_fn->getPrimaryKey() ||
4429  col_conflict_fn->getType() != NDBCOL::Varbinary)
4430  goto err;
4431 
4432  error= 0;
4433  for (;;)
4434  {
4435  NdbTransaction *trans= ndb->startTransaction();
4436  if (trans == NULL)
4437  {
4438  ndberror= ndb->getNdbError();
4439  break;
4440  }
4441  NdbRecAttr *col_binlog_type_rec_attr[2];
4442  NdbRecAttr *col_conflict_fn_rec_attr[2]= {NULL, NULL};
4443  uint32 ndb_binlog_type[2];
4444  const uint sz= 256;
4445  char ndb_conflict_fn_buf[2*sz];
4446  char *ndb_conflict_fn[2]= {ndb_conflict_fn_buf, ndb_conflict_fn_buf+sz};
4447  NdbOperation *op[2];
4448  uint32 i, id= 0;
4449  /* Read generic row (server_id==0) and specific row (server_id == our id)
4450  * from ndb_replication.
4451  * Specific overrides generic, if present
4452  */
4453  for (i= 0; i < 2; i++)
4454  {
4455  NdbOperation *_op;
4456  DBUG_PRINT("info", ("reading[%u]: %s,%s,%u", i, db, table_name, id));
4457  if ((_op= trans->getNdbOperation(reptab)) == NULL) abort();
4458  if (_op->readTuple(NdbOperation::LM_CommittedRead)) abort();
4459  ndb_pack_varchar(col_db, tmp_buf, db, strlen(db));
4460  if (_op->equal(col_db->getColumnNo(), tmp_buf)) abort();
4461  ndb_pack_varchar(col_table_name, tmp_buf, table_name, strlen(table_name));
4462  if (_op->equal(col_table_name->getColumnNo(), tmp_buf)) abort();
4463  if (_op->equal(col_server_id->getColumnNo(), id)) abort();
4464  if ((col_binlog_type_rec_attr[i]=
4465  _op->getValue(col_binlog_type, (char *)&(ndb_binlog_type[i]))) == 0) abort();
4466  /* optional columns */
4467  if (col_conflict_fn)
4468  {
4469  if ((col_conflict_fn_rec_attr[i]=
4470  _op->getValue(col_conflict_fn, ndb_conflict_fn[i])) == 0) abort();
4471  }
4472  id= server_id;
4473  op[i]= _op;
4474  }
4475 
4476  if (trans->execute(NdbTransaction::Commit,
4478  {
4480  {
4481  if (retries--)
4482  {
4483  if (trans)
4484  ndb->closeTransaction(trans);
4485  do_retry_sleep(retry_sleep);
4486  continue;
4487  }
4488  }
4489  ndberror= trans->getNdbError();
4490  ndb->closeTransaction(trans);
4491  break;
4492  }
4493  for (i= 0; i < 2; i++)
4494  {
4495  if (op[i]->getNdbError().code)
4496  {
4497  if (op[i]->getNdbError().classification == NdbError::NoDataFound)
4498  {
4499  col_binlog_type_rec_attr[i]= NULL;
4500  col_conflict_fn_rec_attr[i]= NULL;
4501  DBUG_PRINT("info", ("not found row[%u]", i));
4502  continue;
4503  }
4504  ndberror= op[i]->getNdbError();
4505  break;
4506  }
4507  DBUG_PRINT("info", ("found row[%u]", i));
4508  }
4509  if (col_binlog_type_rec_attr[1] == NULL ||
4510  col_binlog_type_rec_attr[1]->isNULL())
4511  {
4512  /* No specific value, use generic */
4513  col_binlog_type_rec_attr[1]= col_binlog_type_rec_attr[0];
4514  ndb_binlog_type[1]= ndb_binlog_type[0];
4515  }
4516  if (col_conflict_fn_rec_attr[1] == NULL ||
4517  col_conflict_fn_rec_attr[1]->isNULL())
4518  {
4519  /* No specific value, use generic */
4520  col_conflict_fn_rec_attr[1]= col_conflict_fn_rec_attr[0];
4521  ndb_conflict_fn[1]= ndb_conflict_fn[0];
4522  }
4523 
4524  if (col_binlog_type_rec_attr[1] == NULL ||
4525  col_binlog_type_rec_attr[1]->isNULL())
4526  {
4527  DBUG_PRINT("info", ("No binlog flag value, using default"));
4528  /* No value */
4529  *binlog_flags= NBT_DEFAULT;
4530  }
4531  else
4532  {
4533  DBUG_PRINT("info", ("Taking binlog flag value from the table"));
4534  *binlog_flags= (enum Ndb_binlog_type) ndb_binlog_type[1];
4535  }
4536 
4537  if (col_conflict_fn_rec_attr[1] == NULL ||
4538  col_conflict_fn_rec_attr[1]->isNULL())
4539  {
4540  /* No conflict function */
4541  *conflict_fn_spec = NULL;
4542  }
4543  else
4544  {
4545  const char* conflict_fn = ndb_conflict_fn[1];
4546  uint len= 0;
4547  switch (col_conflict_fn->getArrayType())
4548  {
4549  case NDBCOL::ArrayTypeShortVar:
4550  len= *(uchar*)conflict_fn;
4551  conflict_fn++;
4552  break;
4553  case NDBCOL::ArrayTypeMediumVar:
4554  len= uint2korr(conflict_fn);
4555  conflict_fn+= 2;
4556  break;
4557  default:
4558  abort();
4559  }
4560  if ((len + 1) > conflict_fn_buffer_len)
4561  {
4562  ndb->closeTransaction(trans);
4563  error= -2;
4564  error_str= "Conflict function specification too long.";
4565  goto err;
4566  }
4567  memcpy(conflict_fn_buffer, conflict_fn, len);
4568  conflict_fn_buffer[len] = '\0';
4569  *conflict_fn_spec = conflict_fn_buffer;
4570  }
4571 
4572  DBUG_PRINT("info", ("Retrieved Binlog flags : %u and function spec : %s",
4573  *binlog_flags, (*conflict_fn_spec != NULL ?*conflict_fn_spec:
4574  "NULL")));
4575 
4576  ndb->closeTransaction(trans);
4577 
4578  DBUG_RETURN(0);
4579  }
4580 
4581 err:
4582  DBUG_PRINT("info", ("error %d, error_str %s, ndberror.code %u",
4583  error, error_str, ndberror.code));
4584  if (error < 0)
4585  {
4586  char msg[FN_REFLEN];
4587  switch (error)
4588  {
4589  case -1:
4590  my_snprintf(msg, sizeof(msg),
4591  "Missing or wrong type for column '%s'", error_str);
4592  break;
4593  case -2:
4594  my_snprintf(msg, sizeof(msg), "%s", error_str);
4595  break;
4596  default:
4597  abort();
4598  }
4599  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4600  ER_NDB_REPLICATION_SCHEMA_ERROR,
4601  ER(ER_NDB_REPLICATION_SCHEMA_ERROR),
4602  msg);
4603  }
4604  else
4605  {
4606  char msg[FN_REFLEN];
4607  my_snprintf(tmp_buf, sizeof(tmp_buf), "ndberror %u", ndberror.code);
4608  my_snprintf(msg, sizeof(msg), "Unable to retrieve %s.%s, logging and "
4609  "conflict resolution may not function as intended (%s)",
4610  ndb_rep_db, ndb_replication_table, tmp_buf);
4611  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4612  ER_ILLEGAL_HA_CREATE_OPTION,
4613  ER(ER_ILLEGAL_HA_CREATE_OPTION),
4614  ndbcluster_hton_name, msg);
4615  }
4616  *binlog_flags= NBT_DEFAULT;
4617  *conflict_fn_spec= NULL;
4618 
4619  if (ndberror.code && opt_ndb_extra_logging)
4620  print_warning_list("NDB", thd);
4621  DBUG_RETURN(ndberror.code);
4622 }
4623 
4624 /*
4625  ndbcluster_get_binlog_replication_info
4626 
4627  This function retrieves the data for the given table
4628  from the ndb_replication table.
4629 
4630  If the table is not found, or the table does not exist,
4631  then defaults are returned.
4632 */
4633 int
4634 ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
4635  const char* db,
4636  const char* table_name,
4637  uint server_id,
4638  const TABLE *table,
4639  Uint32* binlog_flags,
4640  const st_conflict_fn_def** conflict_fn,
4641  st_conflict_fn_arg* args,
4642  Uint32* num_args)
4643 {
4644  DBUG_ENTER("ndbcluster_get_binlog_replication_info");
4645 
4646  /* Override for ndb_apply_status when logging */
4647  if (opt_ndb_log_apply_status)
4648  {
4649  if (strcmp(db, NDB_REP_DB) == 0 &&
4650  strcmp(table_name, NDB_APPLY_TABLE) == 0)
4651  {
4652  /*
4653  Ensure that we get all columns from ndb_apply_status updates
4654  by forcing FULL event type
4655  Also, ensure that ndb_apply_status events are always logged as
4656  WRITES.
4657  */
4658  DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
4659  sql_print_information("NDB : ndb-log-apply-status forcing "
4660  "%s.%s to FULL USE_WRITE",
4661  NDB_REP_DB, NDB_APPLY_TABLE);
4662  *binlog_flags = NBT_FULL;
4663  *conflict_fn = NULL;
4664  *num_args = 0;
4665  DBUG_RETURN(0);
4666  }
4667  }
4668 
4669  const Uint32 MAX_CONFLICT_FN_SPEC_LEN = 256;
4670  char conflict_fn_buffer[MAX_CONFLICT_FN_SPEC_LEN];
4671  char* conflict_fn_spec;
4672 
4673  if (ndbcluster_read_replication_table(thd,
4674  ndb,
4675  db,
4676  table_name,
4677  server_id,
4678  binlog_flags,
4679  &conflict_fn_spec,
4680  conflict_fn_buffer,
4681  MAX_CONFLICT_FN_SPEC_LEN) != 0)
4682  {
4683  DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
4684  }
4685 
4686  if (table != NULL)
4687  {
4688  if (conflict_fn_spec != NULL)
4689  {
4690  char tmp_buf[FN_REFLEN];
4691 
4692  if (parse_conflict_fn_spec(conflict_fn_spec,
4693  conflict_fn,
4694  args,
4695  num_args,
4696  table,
4697  tmp_buf,
4698  sizeof(tmp_buf)) != 0)
4699  {
4700  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4701  ER_CONFLICT_FN_PARSE_ERROR,
4702  ER(ER_CONFLICT_FN_PARSE_ERROR),
4703  tmp_buf);
4704  DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
4705  }
4706  }
4707  else
4708  {
4709  /* No conflict function specified */
4710  conflict_fn= NULL;
4711  num_args= 0;
4712  }
4713  }
4714 
4715  DBUG_RETURN(0);
4716 }
4717 
4718 int
4719 ndbcluster_apply_binlog_replication_info(THD *thd,
4720  NDB_SHARE *share,
4721  const NDBTAB* ndbtab,
4722  TABLE* table,
4723  const st_conflict_fn_def* conflict_fn,
4724  const st_conflict_fn_arg* args,
4725  Uint32 num_args,
4726  bool do_set_binlog_flags,
4727  Uint32 binlog_flags)
4728 {
4729  DBUG_ENTER("ndbcluster_apply_binlog_replication_info");
4730  char tmp_buf[FN_REFLEN];
4731 
4732  if (do_set_binlog_flags)
4733  {
4734  DBUG_PRINT("info", ("Setting binlog flags to %u", binlog_flags));
4735  set_binlog_flags(share, (enum Ndb_binlog_type)binlog_flags);
4736  }
4737 
4738  if (conflict_fn != NULL)
4739  {
4740  if (setup_conflict_fn(thd, share,
4741  ndbtab,
4742  tmp_buf, sizeof(tmp_buf),
4743  table,
4744  conflict_fn,
4745  args,
4746  num_args) != 0)
4747  {
4748  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4749  ER_CONFLICT_FN_PARSE_ERROR,
4750  ER(ER_CONFLICT_FN_PARSE_ERROR),
4751  tmp_buf);
4752  DBUG_RETURN(-1);
4753  }
4754  }
4755  else
4756  {
4757  /* No conflict function specified */
4758  slave_reset_conflict_fn(share);
4759  }
4760 
4761  DBUG_RETURN(0);
4762 }
4763 
4764 int
4765 ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
4766  NDB_SHARE *share,
4767  const NDBTAB *ndbtab,
4768  uint server_id,
4769  TABLE *table,
4770  bool do_set_binlog_flags)
4771 {
4772  DBUG_ENTER("ndbcluster_read_binlog_replication");
4773  Uint32 binlog_flags;
4774  const st_conflict_fn_def* conflict_fn= NULL;
4775  st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
4776  Uint32 num_args = MAX_CONFLICT_ARGS;
4777 
4778  if ((ndbcluster_get_binlog_replication_info(thd, ndb,
4779  share->db,
4780  share->table_name,
4781  server_id,
4782  table,
4783  &binlog_flags,
4784  &conflict_fn,
4785  args,
4786  &num_args) != 0) ||
4787  (ndbcluster_apply_binlog_replication_info(thd,
4788  share,
4789  ndbtab,
4790  table,
4791  conflict_fn,
4792  args,
4793  num_args,
4794  do_set_binlog_flags,
4795  binlog_flags) != 0))
4796  {
4797  DBUG_RETURN(-1);
4798  }
4799 
4800  DBUG_RETURN(0);
4801 }
4802 #endif /* HAVE_NDB_BINLOG */
4803 
4804 bool
4805 ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
4806 {
4807  char key[FN_REFLEN + 1];
4808  char ndb_file[FN_REFLEN + 1];
4809 
4810  DBUG_ENTER("ndbcluster_check_if_local_table");
4811  build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
4812  build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
4813  /* Check that any defined table is an ndb table */
4814  DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
4815  if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
4816  {
4817  DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));
4818 
4819 
4820  DBUG_RETURN(true);
4821  }
4822 
4823  DBUG_RETURN(false);
4824 }
4825 
4826 bool
4827 ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
4828 {
4829  DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
4830  DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
4831  LEX_STRING *tabname;
4832  List<LEX_STRING> files;
4833  char path[FN_REFLEN + 1];
4834 
4835  build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
4836  if (find_files(thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
4837  {
4838  thd->clear_error();
4839  DBUG_PRINT("info", ("Failed to find files"));
4840  DBUG_RETURN(true);
4841  }
4842  DBUG_PRINT("info",("found: %d files", files.elements));
4843  while ((tabname= files.pop()))
4844  {
4845  DBUG_PRINT("info", ("Found table %s", tabname->str));
4846  if (ndbcluster_check_if_local_table(dbname, tabname->str))
4847  DBUG_RETURN(true);
4848  }
4849 
4850  DBUG_RETURN(false);
4851 }
4852 
4853 /*
4854  Common function for setting up everything for logging a table at
4855  create/discover.
4856 */
4857 int ndbcluster_create_binlog_setup(THD *thd, Ndb *ndb, const char *key,
4858  uint key_len,
4859  const char *db,
4860  const char *table_name,
4861  TABLE * table)
4862 {
4863  int do_event_op= ndb_binlog_running;
4864  DBUG_ENTER("ndbcluster_create_binlog_setup");
4865  DBUG_PRINT("enter",("key: %s key_len: %d %s.%s",
4866  key, key_len, db, table_name));
4867  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
4868  DBUG_ASSERT(strlen(key) == key_len);
4869 
4870  pthread_mutex_lock(&ndbcluster_mutex);
4871  NDB_SHARE * share = get_share(key, table, TRUE, TRUE);
4872  if (share == 0)
4873  {
4877  pthread_mutex_unlock(&ndbcluster_mutex);
4878  DBUG_RETURN(-1);
4879  }
4880  pthread_mutex_unlock(&ndbcluster_mutex);
4881 
4882  pthread_mutex_lock(&share->mutex);
4883  if (get_binlog_nologging(share) || share->op != 0 || share->new_op != 0)
4884  {
4885  pthread_mutex_unlock(&share->mutex);
4886  free_share(&share);
4887  DBUG_RETURN(0); // replication already setup, or should not
4888  }
4889 
4890  if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
4891  {
4892  // The distributed privilege tables are distributed by writing
4893  // the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need
4894  // to listen to events from this table
4895  DBUG_PRINT("info", ("Skipping binlogging of table %s/%s", db, table_name));
4896  do_event_op= 0;
4897  }
4898 
4899  if (!ndb_schema_share &&
4900  strcmp(share->db, NDB_REP_DB) == 0 &&
4901  strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
4902  do_event_op= 1;
4903  else if (!ndb_apply_status_share &&
4904  strcmp(share->db, NDB_REP_DB) == 0 &&
4905  strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
4906  do_event_op= 1;
4907 
4908  if (!do_event_op)
4909  {
4910  set_binlog_nologging(share);
4911  pthread_mutex_unlock(&share->mutex);
4912  DBUG_RETURN(0);
4913  }
4914 
4915  while (share && !IS_TMP_PREFIX(table_name))
4916  {
4917  /*
4918  ToDo make sanity check of share so that the table is actually the same
4919  I.e. we need to do open file from frm in this case
4920  Currently awaiting this to be fixed in the 4.1 tree in the general
4921  case
4922  */
4923 
4924  /* Create the event in NDB */
4925  ndb->setDatabaseName(db);
4926 
4927  NDBDICT *dict= ndb->getDictionary();
4928  Ndb_table_guard ndbtab_g(dict, table_name);
4929  const NDBTAB *ndbtab= ndbtab_g.get_table();
4930  if (ndbtab == 0)
4931  {
4932  if (opt_ndb_extra_logging)
4933  sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
4934  "%s, %d", key, dict->getNdbError().message,
4935  dict->getNdbError().code);
4936  break; // error
4937  }
4938 #ifdef HAVE_NDB_BINLOG
4939  /*
4940  */
4941  ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
4942  ::server_id, NULL, TRUE);
4943 #endif
4944  /*
4945  check if logging turned off for this table
4946  */
4947  if (get_binlog_nologging(share))
4948  {
4949  if (opt_ndb_extra_logging)
4950  sql_print_information("NDB Binlog: NOT logging %s", share->key);
4951  pthread_mutex_unlock(&share->mutex);
4952  DBUG_RETURN(0);
4953  }
4954 
4955  String event_name(INJECTOR_EVENT_LEN);
4956  ndb_rep_event_name(&event_name, db, table_name, get_binlog_full(share));
4957  /*
4958  event should have been created by someone else,
4959  but let's make sure, and create if it doesn't exist
4960  */
4961  const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
4962  if (!ev)
4963  {
4964  if (ndbcluster_create_event(thd, ndb, ndbtab, event_name.c_ptr(), share))
4965  {
4966  sql_print_error("NDB Binlog: "
4967  "FAILED CREATE (DISCOVER) TABLE Event: %s",
4968  event_name.c_ptr());
4969  break; // error
4970  }
4971  if (opt_ndb_extra_logging)
4972  sql_print_information("NDB Binlog: "
4973  "CREATE (DISCOVER) TABLE Event: %s",
4974  event_name.c_ptr());
4975  }
4976  else
4977  {
4978  delete ev;
4979  if (opt_ndb_extra_logging)
4980  sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
4981  event_name.c_ptr());
4982  }
4983 
4984  /*
4985  create the event operations for receiving logging events
4986  */
4987  if (ndbcluster_create_event_ops(thd, share,
4988  ndbtab, event_name.c_ptr()))
4989  {
4990  sql_print_error("NDB Binlog:"
4991  "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
4992  event_name.c_ptr());
4993  /* a warning has been issued to the client */
4994  break;
4995  }
4996  pthread_mutex_unlock(&share->mutex);
4997  DBUG_RETURN(0);
4998  }
4999 
5000  pthread_mutex_unlock(&share->mutex);
5001  free_share(&share);
5002  DBUG_RETURN(-1);
5003 }
5004 
5005 int
5006 ndbcluster_create_event(THD *thd, Ndb *ndb, const NDBTAB *ndbtab,
5007  const char *event_name, NDB_SHARE *share,
5008  int push_warning)
5009 {
5010  DBUG_ENTER("ndbcluster_create_event");
5011  DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
5012  ndbtab->getName(), ndbtab->getObjectVersion(),
5013  event_name, share ? share->key : "(nil)"));
5014  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
5015  if (!share)
5016  {
5017  DBUG_PRINT("info", ("share == NULL"));
5018  DBUG_RETURN(0);
5019  }
5020  if (get_binlog_nologging(share))
5021  {
5022  if (opt_ndb_extra_logging && ndb_binlog_running)
5023  sql_print_information("NDB Binlog: NOT logging %s", share->key);
5024  DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
5025  share->flags, share->flags & NSF_NO_BINLOG));
5026  DBUG_RETURN(0);
5027  }
5028 
5029  ndb->setDatabaseName(share->db);
5030  NDBDICT *dict= ndb->getDictionary();
5031  NDBEVENT my_event(event_name);
5032  my_event.setTable(*ndbtab);
5033  my_event.addTableEvent(NDBEVENT::TE_ALL);
5034  if (share->flags & NSF_HIDDEN_PK)
5035  {
5036  if (share->flags & NSF_BLOB_FLAG)
5037  {
5038  sql_print_error("NDB Binlog: logging of table %s "
5039  "with BLOB attribute and no PK is not supported",
5040  share->key);
5041  if (push_warning)
5042  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5043  ER_ILLEGAL_HA_CREATE_OPTION,
5044  ER(ER_ILLEGAL_HA_CREATE_OPTION),
5045  ndbcluster_hton_name,
5046  "Binlog of table with BLOB attribute and no PK");
5047 
5048  share->flags|= NSF_NO_BINLOG;
5049  DBUG_RETURN(-1);
5050  }
5051  /* No primary key, subscribe for all attributes */
5052  my_event.setReport((NDBEVENT::EventReport)
5053  (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
5054  DBUG_PRINT("info", ("subscription all"));
5055  }
5056  else
5057  {
5058  if (strcmp(share->db, NDB_REP_DB) == 0 &&
5059  strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
5060  {
5064  my_event.setReport((NDBEVENT::EventReport)
5065  (NDBEVENT::ER_ALL |
5066  NDBEVENT::ER_SUBSCRIBE |
5067  NDBEVENT::ER_DDL));
5068  DBUG_PRINT("info", ("subscription all and subscribe"));
5069  }
5070  else
5071  {
5072  if (get_binlog_full(share))
5073  {
5074  my_event.setReport((NDBEVENT::EventReport)
5075  (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
5076  DBUG_PRINT("info", ("subscription all"));
5077  }
5078  else
5079  {
5080  my_event.setReport((NDBEVENT::EventReport)
5081  (NDBEVENT::ER_UPDATED | NDBEVENT::ER_DDL));
5082  DBUG_PRINT("info", ("subscription only updated"));
5083  }
5084  }
5085  }
5086  if (share->flags & NSF_BLOB_FLAG)
5087  my_event.mergeEvents(TRUE);
5088 
5089  /* add all columns to the event */
5090  int n_cols= ndbtab->getNoOfColumns();
5091  for(int a= 0; a < n_cols; a++)
5092  my_event.addEventColumn(a);
5093 
5094  if (dict->createEvent(my_event)) // Add event to database
5095  {
5097  {
5098  /*
5099  failed, print a warning
5100  */
5101  if (push_warning > 1)
5102  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5103  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5104  dict->getNdbError().code,
5105  dict->getNdbError().message, "NDB");
5106  sql_print_error("NDB Binlog: Unable to create event in database. "
5107  "Event: %s Error Code: %d Message: %s", event_name,
5108  dict->getNdbError().code, dict->getNdbError().message);
5109  DBUG_RETURN(-1);
5110  }
5111 
5112  /*
5113  try retrieving the event, if table version/id matches, we will get
5114  a valid event. Otherwise we have a trailing event from before
5115  */
5116  const NDBEVENT *ev;
5117  if ((ev= dict->getEvent(event_name)))
5118  {
5119  delete ev;
5120  DBUG_RETURN(0);
5121  }
5122 
5123  /*
5124  trailing event from before; an error, but try to correct it
5125  */
5126  if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
5127  dict->dropEvent(my_event.getName(), 1))
5128  {
5129  if (push_warning > 1)
5130  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5131  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5132  dict->getNdbError().code,
5133  dict->getNdbError().message, "NDB");
5134  sql_print_error("NDB Binlog: Unable to create event in database. "
5135  " Attempt to correct with drop failed. "
5136  "Event: %s Error Code: %d Message: %s",
5137  event_name,
5138  dict->getNdbError().code,
5139  dict->getNdbError().message);
5140  DBUG_RETURN(-1);
5141  }
5142 
5143  /*
5144  try to add the event again
5145  */
5146  if (dict->createEvent(my_event))
5147  {
5148  if (push_warning > 1)
5149  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5150  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5151  dict->getNdbError().code,
5152  dict->getNdbError().message, "NDB");
5153  sql_print_error("NDB Binlog: Unable to create event in database. "
5154  " Attempt to correct with drop ok, but create failed. "
5155  "Event: %s Error Code: %d Message: %s",
5156  event_name,
5157  dict->getNdbError().code,
5158  dict->getNdbError().message);
5159  DBUG_RETURN(-1);
5160  }
5161 #ifdef NDB_BINLOG_EXTRA_WARNINGS
5162  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5163  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5164  0, "NDB Binlog: Removed trailing event",
5165  "NDB");
5166 #endif
5167  }
5168 
5169  DBUG_RETURN(0);
5170 }
5171 
5172 inline int is_ndb_compatible_type(Field *field)
5173 {
5174  return
5175  !(field->flags & BLOB_FLAG) &&
5176  field->type() != MYSQL_TYPE_BIT &&
5177  field->pack_length() != 0;
5178 }
5179 
5180 /*
5181  - create eventOperations for receiving log events
5182  - setup ndb recattrs for reception of log event data
5183  - "start" the event operation
5184 
5185  used at create/discover of tables
5186 */
5187 int
5188 ndbcluster_create_event_ops(THD *thd, NDB_SHARE *share,
5189  const NDBTAB *ndbtab, const char *event_name)
5190 {
5191  /*
5192  we are in either create table or rename table so table should be
5193  locked, hence we can work with the share without locks
5194  */
5195 
5196  DBUG_ENTER("ndbcluster_create_event_ops");
5197  DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
5198  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
5199 
5200  DBUG_ASSERT(share != 0);
5201 
5202  if (get_binlog_nologging(share))
5203  {
5204  DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
5205  share->flags));
5206  DBUG_RETURN(0);
5207  }
5208 
5209  // Don't allow event ops to be created on distributed priv tables
5210  // they are distributed via ndb_schema
5211  assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db,
5212  share->table_name));
5213 
5214  Ndb_event_data *event_data= share->event_data;
5215  int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
5216 #ifdef HAVE_NDB_BINLOG
5217  uint len= strlen(share->table_name);
5218 #endif
5219  if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
5220  strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
5221  do_ndb_schema_share= 1;
5222  else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
5223  strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
5224  do_ndb_apply_status_share= 1;
5225  else
5226 #ifdef HAVE_NDB_BINLOG
5227  if (!binlog_filter->db_ok(share->db) ||
5228  !ndb_binlog_running ||
5229  (len >= sizeof(NDB_EXCEPTIONS_TABLE_SUFFIX) &&
5230  strcmp(share->table_name+len-sizeof(NDB_EXCEPTIONS_TABLE_SUFFIX)+1,
5231  lower_case_table_names ? NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER :
5232  NDB_EXCEPTIONS_TABLE_SUFFIX) == 0))
5233 #endif
5234  {
5235  share->flags|= NSF_NO_BINLOG;
5236  DBUG_RETURN(0);
5237  }
5238 
5239  if (share->op)
5240  {
5241  event_data= (Ndb_event_data *) share->op->getCustomData();
5242  assert(event_data->share == share);
5243  assert(share->event_data == 0);
5244 
5245  DBUG_ASSERT(share->use_count > 1);
5246  sql_print_error("NDB Binlog: discover reusing old ev op");
5247  /* ndb_share reference ToDo free */
5248  DBUG_PRINT("NDB_SHARE", ("%s ToDo free use_count: %u",
5249  share->key, share->use_count));
5250  free_share(&share); // old event op already has reference
5251  DBUG_RETURN(0);
5252  }
5253 
5254  DBUG_ASSERT(event_data != 0);
5255  TABLE *table= event_data->shadow_table;
5256 
5257  int retries= 100;
5258  /*
5259  100 milliseconds, temporary error on schema operation can
5260  take some time to be resolved
5261  */
5262  int retry_sleep= 100;
5263  while (1)
5264  {
5265  Mutex_guard injector_mutex_g(injector_mutex);
5266  Ndb *ndb= injector_ndb;
5267  if (do_ndb_schema_share)
5268  ndb= schema_ndb;
5269 
5270  if (ndb == 0)
5271  DBUG_RETURN(-1);
5272 
5273  NdbEventOperation* op;
5274  if (do_ndb_schema_share)
5275  op= ndb->createEventOperation(event_name);
5276  else
5277  {
5278  // set injector_ndb database/schema from table internal name
5279  int ret= ndb->setDatabaseAndSchemaName(ndbtab);
5280  assert(ret == 0); NDB_IGNORE_VALUE(ret);
5281  op= ndb->createEventOperation(event_name);
5282  // reset to catch errors
5283  ndb->setDatabaseName("");
5284  }
5285  if (!op)
5286  {
5287  sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
5288  " %s",event_name);
5289  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5290  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5291  ndb->getNdbError().code,
5292  ndb->getNdbError().message,
5293  "NDB");
5294  DBUG_RETURN(-1);
5295  }
5296 
5297  if (share->flags & NSF_BLOB_FLAG)
5298  op->mergeEvents(TRUE); // currently not inherited from event
5299 
5300  uint n_columns= ndbtab->getNoOfColumns();
5301  uint n_fields= table->s->fields;
5302  uint val_length= sizeof(NdbValue) * n_columns;
5303 
5304  /*
5305  Allocate memory globally so it can be reused after online alter table
5306  */
5307  if (my_multi_malloc(MYF(MY_WME),
5308  &event_data->ndb_value[0],
5309  val_length,
5310  &event_data->ndb_value[1],
5311  val_length,
5312  NULL) == 0)
5313  {
5314  DBUG_PRINT("info", ("Failed to allocate records for event operation"));
5315  DBUG_RETURN(-1);
5316  }
5317 
5318  for (uint j= 0; j < n_columns; j++)
5319  {
5320  const char *col_name= ndbtab->getColumn(j)->getName();
5321  NdbValue attr0, attr1;
5322  if (j < n_fields)
5323  {
5324  Field *f= table->field[j];
5325  if (is_ndb_compatible_type(f))
5326  {
5327  DBUG_PRINT("info", ("%s compatible", col_name));
5328  attr0.rec= op->getValue(col_name, (char*) f->ptr);
5329  attr1.rec= op->getPreValue(col_name,
5330  (f->ptr - table->record[0]) +
5331  (char*) table->record[1]);
5332  }
5333  else if (! (f->flags & BLOB_FLAG))
5334  {
5335  DBUG_PRINT("info", ("%s non compatible", col_name));
5336  attr0.rec= op->getValue(col_name);
5337  attr1.rec= op->getPreValue(col_name);
5338  }
5339  else
5340  {
5341  DBUG_PRINT("info", ("%s blob", col_name));
5342  DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
5343  attr0.blob= op->getBlobHandle(col_name);
5344  attr1.blob= op->getPreBlobHandle(col_name);
5345  if (attr0.blob == NULL || attr1.blob == NULL)
5346  {
5347  sql_print_error("NDB Binlog: Creating NdbEventOperation"
5348  " blob field %u handles failed (code=%d) for %s",
5349  j, op->getNdbError().code, event_name);
5350  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5351  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5352  op->getNdbError().code,
5353  op->getNdbError().message,
5354  "NDB");
5355  ndb->dropEventOperation(op);
5356  DBUG_RETURN(-1);
5357  }
5358  }
5359  }
5360  else
5361  {
5362  DBUG_PRINT("info", ("%s hidden key", col_name));
5363  attr0.rec= op->getValue(col_name);
5364  attr1.rec= op->getPreValue(col_name);
5365  }
5366  event_data->ndb_value[0][j].ptr= attr0.ptr;
5367  event_data->ndb_value[1][j].ptr= attr1.ptr;
5368  DBUG_PRINT("info", ("&event_data->ndb_value[0][%d]: 0x%lx "
5369  "event_data->ndb_value[0][%d]: 0x%lx",
5370  j, (long) &event_data->ndb_value[0][j],
5371  j, (long) attr0.ptr));
5372  DBUG_PRINT("info", ("&event_data->ndb_value[1][%d]: 0x%lx "
5373  "event_data->ndb_value[1][%d]: 0x%lx",
5374  j, (long) &event_data->ndb_value[0][j],
5375  j, (long) attr1.ptr));
5376  }
5377  op->setCustomData((void *) event_data); // set before execute
5378  share->event_data= 0; // take over event data
5379  share->op= op; // assign op in NDB_SHARE
5380 
5381  if (op->execute())
5382  {
5383  share->op= NULL;
5384  retries--;
5386  op->getNdbError().code != 1407)
5387  retries= 0;
5388  if (retries == 0)
5389  {
5390  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5391  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5392  op->getNdbError().code, op->getNdbError().message,
5393  "NDB");
5394  sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
5395  event_name,
5396  op->getNdbError().code, op->getNdbError().message);
5397  }
5398  share->event_data= event_data;
5399  op->setCustomData(NULL);
5400  ndb->dropEventOperation(op);
5401  if (retries && !thd->killed)
5402  {
5403  do_retry_sleep(retry_sleep);
5404  continue;
5405  }
5406  DBUG_RETURN(-1);
5407  }
5408  break;
5409  }
5410 
5411  /* ndb_share reference binlog */
5412  get_share(share);
5413  DBUG_PRINT("NDB_SHARE", ("%s binlog use_count: %u",
5414  share->key, share->use_count));
5415  if (do_ndb_apply_status_share)
5416  {
5417  /* ndb_share reference binlog extra */
5418  ndb_apply_status_share= get_share(share);
5419  DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
5420  share->key, share->use_count));
5421  (void) pthread_cond_signal(&injector_cond);
5422  }
5423  else if (do_ndb_schema_share)
5424  {
5425  /* ndb_share reference binlog extra */
5426  ndb_schema_share= get_share(share);
5427  DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
5428  share->key, share->use_count));
5429  (void) pthread_cond_signal(&injector_cond);
5430  }
5431 
5432  DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u",
5433  share->key, (long) share->op, share->use_count));
5434 
5435  if (opt_ndb_extra_logging)
5436  sql_print_information("NDB Binlog: logging %s (%s,%s)", share->key,
5437  get_binlog_full(share) ? "FULL" : "UPDATED",
5438  get_binlog_use_update(share) ? "USE_UPDATE" : "USE_WRITE");
5439  DBUG_RETURN(0);
5440 }
5441 
5442 int
5443 ndbcluster_drop_event(THD *thd, Ndb *ndb, NDB_SHARE *share,
5444  const char *type_str,
5445  const char *dbname,
5446  const char *tabname)
5447 {
5448  DBUG_ENTER("ndbcluster_drop_event");
5449  /*
5450  There might be 2 types of events setup for the table, we cannot know
5451  which ones are supposed to be there as they may have been created
5452  differently for different mysqld's. So we drop both
5453  */
5454  for (uint i= 0; i < 2; i++)
5455  {
5456  NDBDICT *dict= ndb->getDictionary();
5457  String event_name(INJECTOR_EVENT_LEN);
5458  ndb_rep_event_name(&event_name, dbname, tabname, i);
5459 
5460  if (!dict->dropEvent(event_name.c_ptr()))
5461  continue;
5462 
5463  if (dict->getNdbError().code != 4710 &&
5464  dict->getNdbError().code != 1419)
5465  {
5466  /* drop event failed for some reason, issue a warning */
5467  push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5468  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5469  dict->getNdbError().code,
5470  dict->getNdbError().message, "NDB");
5471  /* error is not that the event did not exist */
5472  sql_print_error("NDB Binlog: Unable to drop event in database. "
5473  "Event: %s Error Code: %d Message: %s",
5474  event_name.c_ptr(),
5475  dict->getNdbError().code,
5476  dict->getNdbError().message);
5477  /* ToDo; handle error? */
5478  if (share && share->op &&
5479  share->op->getState() == NdbEventOperation::EO_EXECUTING &&
5480  dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
5481  {
5482  DBUG_ASSERT(FALSE);
5483  DBUG_RETURN(-1);
5484  }
5485  }
5486  }
5487  DBUG_RETURN(0);
5488 }
5489 
5490 /*
5491  when entering the calling thread should have a share lock id share != 0
5492  then the injector thread will have one as well, i.e. share->use_count == 0
5493  (unless it has already dropped... then share->op == 0)
5494 */
5495 
5496 int
5497 ndbcluster_handle_drop_table(THD *thd, Ndb *ndb, NDB_SHARE *share,
5498  const char *type_str,
5499  const char * dbname, const char * tabname)
5500 {
5501  DBUG_ENTER("ndbcluster_handle_drop_table");
5502 
5503  if (dbname && tabname)
5504  {
5505  if (ndbcluster_drop_event(thd, ndb, share, type_str, dbname, tabname))
5506  DBUG_RETURN(-1);
5507  }
5508 
5509  if (share == 0 || share->op == 0)
5510  {
5511  DBUG_RETURN(0);
5512  }
5513 
5514 /*
5515  Syncronized drop between client thread and injector thread is
5516  neccessary in order to maintain ordering in the binlog,
5517  such that the drop occurs _after_ any inserts/updates/deletes.
5518 
5519  The penalty for this is that the drop table becomes slow.
5520 
5521  This wait is however not strictly neccessary to produce a binlog
5522  that is usable. However the slave does not currently handle
5523  these out of order, thus we are keeping the SYNC_DROP_ defined
5524  for now.
5525 */
5526  const char *save_proc_info= thd->proc_info;
5527 #define SYNC_DROP_
5528 #ifdef SYNC_DROP_
5529  thd->proc_info= "Syncing ndb table schema operation and binlog";
5530  pthread_mutex_lock(&share->mutex);
5531  int max_timeout= DEFAULT_SYNC_TIMEOUT;
5532  while (share->op)
5533  {
5534  struct timespec abstime;
5535  set_timespec(abstime, 1);
5536  int ret= pthread_cond_timedwait(&injector_cond,
5537  &share->mutex,
5538  &abstime);
5539  if (thd->killed ||
5540  share->op == 0)
5541  break;
5542  if (ret)
5543  {
5544  max_timeout--;
5545  if (max_timeout == 0)
5546  {
5547  sql_print_error("NDB %s: %s timed out. Ignoring...",
5548  type_str, share->key);
5549  DBUG_ASSERT(false);
5550  break;
5551  }
5552  if (opt_ndb_extra_logging)
5553  ndb_report_waiting(type_str, max_timeout,
5554  type_str, share->key, 0);
5555  }
5556  }
5557  pthread_mutex_unlock(&share->mutex);
5558 #else
5559  pthread_mutex_lock(&share->mutex);
5560  share->op= 0;
5561  pthread_mutex_unlock(&share->mutex);
5562 #endif
5563  thd->proc_info= save_proc_info;
5564 
5565  DBUG_RETURN(0);
5566 }
5567 
5568 
5569 /********************************************************************
5570  Internal helper functions for differentd events from the stoarage nodes
5571  used by the ndb injector thread
5572 ********************************************************************/
5573 
5574 /*
5575  Unpack a record read from NDB
5576 
5577  SYNOPSIS
5578  ndb_unpack_record()
5579  buf Buffer to store read row
5580 
5581  NOTE
5582  The data for each row is read directly into the
5583  destination buffer. This function is primarily
5584  called in order to check if any fields should be
5585  set to null.
5586 */
5587 
5588 static void ndb_unpack_record(TABLE *table, NdbValue *value,
5589  MY_BITMAP *defined, uchar *buf)
5590 {
5591  Field **p_field= table->field, *field= *p_field;
5592  my_ptrdiff_t row_offset= (my_ptrdiff_t) (buf - table->record[0]);
5593  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
5594  DBUG_ENTER("ndb_unpack_record");
5595 
5596  /*
5597  Set the filler bits of the null byte, since they are
5598  not touched in the code below.
5599 
5600  The filler bits are the MSBs in the last null byte
5601  */
5602  if (table->s->null_bytes > 0)
5603  buf[table->s->null_bytes - 1]|= 256U - (1U <<
5604  table->s->last_null_bit_pos);
5605  /*
5606  Set null flag(s)
5607  */
5608  for ( ; field;
5609  p_field++, value++, field= *p_field)
5610  {
5611  field->set_notnull(row_offset);
5612  if ((*value).ptr)
5613  {
5614  if (!(field->flags & BLOB_FLAG))
5615  {
5616  int is_null= (*value).rec->isNULL();
5617  if (is_null)
5618  {
5619  if (is_null > 0)
5620  {
5621  DBUG_PRINT("info",("[%u] NULL", field->field_index));
5622  field->set_null(row_offset);
5623  }
5624  else
5625  {
5626  DBUG_PRINT("info",("[%u] UNDEFINED", field->field_index));
5627  bitmap_clear_bit(defined, field->field_index);
5628  }
5629  }
5630  else if (field->type() == MYSQL_TYPE_BIT)
5631  {
5632  Field_bit *field_bit= static_cast<Field_bit*>(field);
5633 
5634  /*
5635  Move internal field pointer to point to 'buf'. Calling
5636  the correct member function directly since we know the
5637  type of the object.
5638  */
5639  field_bit->Field_bit::move_field_offset(row_offset);
5640  if (field->pack_length() < 5)
5641  {
5642  DBUG_PRINT("info", ("bit field H'%.8X",
5643  (*value).rec->u_32_value()));
5644  field_bit->Field_bit::store((longlong) (*value).rec->u_32_value(),
5645  TRUE);
5646  }
5647  else
5648  {
5649  DBUG_PRINT("info", ("bit field H'%.8X%.8X",
5650  *(Uint32 *)(*value).rec->aRef(),
5651  *((Uint32 *)(*value).rec->aRef()+1)));
5652 #ifdef WORDS_BIGENDIAN
5653  /* lsw is stored first */
5654  Uint32 *buf= (Uint32 *)(*value).rec->aRef();
5655  field_bit->Field_bit::store((((longlong)*buf)
5656  & 0x00000000FFFFFFFFLL)
5657  |
5658  ((((longlong)*(buf+1)) << 32)
5659  & 0xFFFFFFFF00000000LL),
5660  TRUE);
5661 #else
5662  field_bit->Field_bit::store((longlong)
5663  (*value).rec->u_64_value(), TRUE);
5664 #endif
5665  }
5666  /*
5667  Move back internal field pointer to point to original
5668  value (usually record[0]).
5669  */
5670  field_bit->Field_bit::move_field_offset(-row_offset);
5671  DBUG_PRINT("info",("[%u] SET",
5672  (*value).rec->getColumn()->getColumnNo()));
5673  DBUG_DUMP("info", (const uchar*) field->ptr, field->pack_length());
5674  }
5675  else
5676  {
5677  DBUG_PRINT("info",("[%u] SET",
5678  (*value).rec->getColumn()->getColumnNo()));
5679  DBUG_DUMP("info", (const uchar*) field->ptr, field->pack_length());
5680  }
5681  }
5682  else
5683  {
5684  NdbBlob *ndb_blob= (*value).blob;
5685  uint col_no= field->field_index;
5686  int isNull;
5687  ndb_blob->getDefined(isNull);
5688  if (isNull == 1)
5689  {
5690  DBUG_PRINT("info",("[%u] NULL", col_no));
5691  field->set_null(row_offset);
5692  }
5693  else if (isNull == -1)
5694  {
5695  DBUG_PRINT("info",("[%u] UNDEFINED", col_no));
5696  bitmap_clear_bit(defined, col_no);
5697  }
5698  else
5699  {
5700 #ifndef DBUG_OFF
5701  // pointer vas set in get_ndb_blobs_value
5702  Field_blob *field_blob= (Field_blob*)field;
5703  uchar* ptr;
5704  field_blob->get_ptr(&ptr, row_offset);
5705  uint32 len= field_blob->get_length(row_offset);
5706  DBUG_PRINT("info",("[%u] SET ptr: 0x%lx len: %u",
5707  col_no, (long) ptr, len));
5708 #endif
5709  }
5710  }
5711  }
5712  }
5713  dbug_tmp_restore_column_map(table->write_set, old_map);
5714  DBUG_VOID_RETURN;
5715 }
5716 
5717 /*
5718  Handle error states on events from the storage nodes
5719 */
5720 static int
5721 ndb_binlog_thread_handle_error(Ndb *ndb,
5722  NdbEventOperation *pOp)
5723 {
5724  Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5725  NDB_SHARE *share= event_data->share;
5726  DBUG_ENTER("ndb_binlog_thread_handle_error");
5727 
5728  int overrun= pOp->isOverrun();
5729  if (overrun)
5730  {
5731  /*
5732  ToDo: this error should rather clear the ndb_binlog_index...
5733  and continue
5734  */
5735  sql_print_error("NDB Binlog: Overrun in event buffer, "
5736  "this means we have dropped events. Cannot "
5737  "continue binlog for %s", share->key);
5738  pOp->clearError();
5739  DBUG_RETURN(-1);
5740  }
5741 
5742  if (!pOp->isConsistent())
5743  {
5744  /*
5745  ToDo: this error should rather clear the ndb_binlog_index...
5746  and continue
5747  */
5748  sql_print_error("NDB Binlog: Not Consistent. Cannot "
5749  "continue binlog for %s. Error code: %d"
5750  " Message: %s", share->key,
5751  pOp->getNdbError().code,
5752  pOp->getNdbError().message);
5753  pOp->clearError();
5754  DBUG_RETURN(-1);
5755  }
5756  sql_print_error("NDB Binlog: unhandled error %d for table %s",
5757  pOp->hasError(), share->key);
5758  pOp->clearError();
5759  DBUG_RETURN(0);
5760 }
5761 
5762 static int
5763 ndb_binlog_thread_handle_non_data_event(THD *thd,
5764  NdbEventOperation *pOp,
5765  ndb_binlog_index_row &row)
5766 {
5767  Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5768  NDB_SHARE *share= event_data->share;
5769  NDBEVENT::TableEvent type= pOp->getEventType();
5770 
5771  switch (type)
5772  {
5774  if (opt_ndb_extra_logging)
5775  sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
5776  share->key,
5777  (uint)(pOp->getGCI() >> 32),
5778  (uint)(pOp->getGCI()));
5779  if (ndb_apply_status_share == share)
5780  {
5781  if (opt_ndb_extra_logging &&
5782  ndb_binlog_tables_inited && ndb_binlog_running)
5783  sql_print_information("NDB Binlog: ndb tables initially "
5784  "read only on reconnect.");
5785  /* ndb_share reference binlog extra free */
5786  DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
5787  share->key, share->use_count));
5788  free_share(&ndb_apply_status_share);
5789  ndb_apply_status_share= 0;
5790  ndb_binlog_tables_inited= FALSE;
5791  }
5792  DBUG_PRINT("error", ("CLUSTER FAILURE EVENT: "
5793  "%s received share: 0x%lx op: 0x%lx share op: 0x%lx "
5794  "new_op: 0x%lx",
5795  share->key, (long) share, (long) pOp,
5796  (long) share->op, (long) share->new_op));
5797  break;
5798  case NDBEVENT::TE_DROP:
5799  if (ndb_apply_status_share == share)
5800  {
5801  if (opt_ndb_extra_logging &&
5802  ndb_binlog_tables_inited && ndb_binlog_running)
5803  sql_print_information("NDB Binlog: ndb tables initially "
5804  "read only on reconnect.");
5805  /* ndb_share reference binlog extra free */
5806  DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
5807  share->key, share->use_count));
5808  free_share(&ndb_apply_status_share);
5809  ndb_apply_status_share= 0;
5810  ndb_binlog_tables_inited= FALSE;
5811  }
5812  /* ToDo: remove printout */
5813  if (opt_ndb_extra_logging)
5814  sql_print_information("NDB Binlog: drop table %s.", share->key);
5815  // fall through
5816  case NDBEVENT::TE_ALTER:
5817  row.n_schemaops++;
5818  DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: 0x%lx "
5819  "share op: 0x%lx new_op: 0x%lx",
5820  type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
5821  share->key, (long) share, (long) pOp,
5822  (long) share->op, (long) share->new_op));
5823  break;
5825  /* fall through */
5827  /* fall through */
5829  /* ignore */
5830  return 0;
5831  default:
5832  sql_print_error("NDB Binlog: unknown non data event %d for %s. "
5833  "Ignoring...", (unsigned) type, share->key);
5834  return 0;
5835  }
5836 
5837  ndb_handle_schema_change(thd, injector_ndb, pOp, event_data);
5838  return 0;
5839 }
5840 
5841 /*
5842  Handle data events from the storage nodes
5843 */
5844 inline ndb_binlog_index_row *
5845 ndb_find_binlog_index_row(ndb_binlog_index_row **rows,
5846  uint orig_server_id, int flag)
5847 {
5848  ndb_binlog_index_row *row= *rows;
5849  if (opt_ndb_log_orig)
5850  {
5851  ndb_binlog_index_row *first= row, *found_id= 0;
5852  for (;;)
5853  {
5854  if (row->orig_server_id == orig_server_id)
5855  {
5856  /* */
5857  if (!flag || !row->orig_epoch)
5858  return row;
5859  if (!found_id)
5860  found_id= row;
5861  }
5862  if (row->orig_server_id == 0)
5863  break;
5864  row= row->next;
5865  if (row == NULL)
5866  {
5867  row= (ndb_binlog_index_row*)sql_alloc(sizeof(ndb_binlog_index_row));
5868  memset(row, 0, sizeof(ndb_binlog_index_row));
5869  row->next= first;
5870  *rows= row;
5871  if (found_id)
5872  {
5873  /*
5874  If we found index_row with same server id already
5875  that row will contain the current stats.
5876  Copy stats over to new and reset old.
5877  */
5878  row->n_inserts= found_id->n_inserts;
5879  row->n_updates= found_id->n_updates;
5880  row->n_deletes= found_id->n_deletes;
5881  found_id->n_inserts= 0;
5882  found_id->n_updates= 0;
5883  found_id->n_deletes= 0;
5884  }
5885  /* keep track of schema ops only on "first" index_row */
5886  row->n_schemaops= first->n_schemaops;
5887  first->n_schemaops= 0;
5888  break;
5889  }
5890  }
5891  row->orig_server_id= orig_server_id;
5892  }
5893  return row;
5894 }
5895 
5896 static int
5897 ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
5898  ndb_binlog_index_row **rows,
5899  injector::transaction &trans,
5900  unsigned &trans_row_count,
5901  unsigned &trans_slave_row_count)
5902 {
5903  Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5904  TABLE *table= event_data->shadow_table;
5905  NDB_SHARE *share= event_data->share;
5906  if (pOp != share->op)
5907  {
5908  return 0;
5909  }
5910 
5911  uint32 anyValue= pOp->getAnyValue();
5912  if (ndbcluster_anyvalue_is_reserved(anyValue))
5913  {
5914  if (!ndbcluster_anyvalue_is_nologging(anyValue))
5915  sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
5916  "event not logged",
5917  anyValue);
5918  return 0;
5919  }
5920  uint32 originating_server_id= ndbcluster_anyvalue_get_serverid(anyValue);
5921  bool log_this_slave_update = g_ndb_log_slave_updates;
5922  bool count_this_event = true;
5923 
5924  if (share == ndb_apply_status_share)
5925  {
5926  /*
5927  Note that option values are read without synchronisation w.r.t.
5928  thread setting option variable or epoch boundaries.
5929  */
5930  if (opt_ndb_log_apply_status ||
5931  opt_ndb_log_orig)
5932  {
5933  Uint32 ndb_apply_status_logging_server_id= originating_server_id;
5934  Uint32 ndb_apply_status_server_id= 0;
5935  Uint64 ndb_apply_status_epoch= 0;
5936  bool event_has_data = false;
5937 
5938  switch(pOp->getEventType())
5939  {
5940  case NDBEVENT::TE_INSERT:
5941  // fall through
5942  case NDBEVENT::TE_UPDATE:
5943  event_has_data = true;
5944  break;
5945  case NDBEVENT::TE_DELETE:
5946  break;
5947  default:
5948  /* We should REALLY never get here */
5949  abort();
5950  }
5951 
5952  if (likely( event_has_data ))
5953  {
5954  /* unpack data to fetch orig_server_id and orig_epoch */
5955  uint n_fields= table->s->fields;
5956  MY_BITMAP b;
5957  uint32 bitbuf[128 / (sizeof(uint32) * 8)];
5958  bitmap_init(&b, bitbuf, n_fields, FALSE);
5959  bitmap_set_all(&b);
5960  ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
5961  ndb_apply_status_server_id= (uint)((Field_long *)table->field[0])->val_int();
5962  ndb_apply_status_epoch= ((Field_longlong *)table->field[1])->val_int();
5963 
5964  if (opt_ndb_log_apply_status)
5965  {
5966  /*
5967  Determine if event came from our immediate Master server
5968  Ignore locally manually sourced and reserved events
5969  */
5970  if ((ndb_apply_status_logging_server_id != 0) &&
5971  (! ndbcluster_anyvalue_is_reserved(ndb_apply_status_logging_server_id)))
5972  {
5973  bool isFromImmediateMaster = (ndb_apply_status_server_id ==
5974  ndb_apply_status_logging_server_id);
5975 
5976  if (isFromImmediateMaster)
5977  {
5978  /*
5979  We log this event with our server-id so that it
5980  propagates back to the originating Master (our
5981  immediate Master)
5982  */
5983  assert(ndb_apply_status_logging_server_id != ::server_id);
5984 
5985  originating_server_id= 0; /* Will be set to our ::serverid below */
5986  }
5987  }
5988  }
5989 
5990  if (opt_ndb_log_orig)
5991  {
5992  /* store */
5993  ndb_binlog_index_row *row= ndb_find_binlog_index_row
5994  (rows, ndb_apply_status_server_id, 1);
5995  row->orig_epoch= ndb_apply_status_epoch;
5996  }
5997  }
5998  } // opt_ndb_log_apply_status || opt_ndb_log_orig)
5999 
6000  if (opt_ndb_log_apply_status)
6001  {
6002  /* We are logging ndb_apply_status changes
6003  * Don't count this event as making an epoch non-empty
6004  * Log this event in the Binlog
6005  */
6006  count_this_event = false;
6007  log_this_slave_update = true;
6008  }
6009  else
6010  {
6011  /* Not logging ndb_apply_status updates, discard this event now */
6012  return 0;
6013  }
6014  }
6015 
6016  if (originating_server_id == 0)
6017  originating_server_id= ::server_id;
6018  else
6019  {
6020  /* Track that we received a replicated row event */
6021  if (likely( count_this_event ))
6022  trans_slave_row_count++;
6023 
6024  if (!log_this_slave_update)
6025  {
6026  /*
6027  This event comes from a slave applier since it has an originating
6028  server id set. Since option to log slave updates is not set, skip it.
6029  */
6030  return 0;
6031  }
6032  }
6033 
6034  /*
6035  Start with logged_server_id as AnyValue in case it's a composite
6036  (server_id_bits < 31). This way any user-values are passed-through
6037  to the Binlog in the high bits of the event's Server Id.
6038  In future it may be useful to support *not* mapping composite
6039  AnyValues to/from Binlogged server-ids.
6040  */
6041  uint32 logged_server_id= anyValue;
6042  ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id);
6043 
6044  DBUG_ASSERT(trans.good());
6045  DBUG_ASSERT(table != 0);
6046 
6047  dbug_print_table("table", table);
6048 
6049  uint n_fields= table->s->fields;
6050  DBUG_PRINT("info", ("Assuming %u columns for table %s",
6051  n_fields, table->s->table_name.str));
6052  MY_BITMAP b;
6053  /* Potential buffer for the bitmap */
6054  uint32 bitbuf[128 / (sizeof(uint32) * 8)];
6055  const bool own_buffer = n_fields <= sizeof(bitbuf) * 8;
6056  bitmap_init(&b, own_buffer ? bitbuf : NULL, n_fields, FALSE);
6057  bitmap_set_all(&b);
6058 
6059  /*
6060  row data is already in table->record[0]
6061  As we told the NdbEventOperation to do this
6062  (saves moving data about many times)
6063  */
6064 
6065  /*
6066  for now malloc/free blobs buffer each time
6067  TODO if possible share single permanent buffer with handlers
6068  */
6069  uchar* blobs_buffer[2] = { 0, 0 };
6070  uint blobs_buffer_size[2] = { 0, 0 };
6071 
6072  ndb_binlog_index_row *row=
6073  ndb_find_binlog_index_row(rows, originating_server_id, 0);
6074 
6075  switch(pOp->getEventType())
6076  {
6077  case NDBEVENT::TE_INSERT:
6078  if (likely( count_this_event ))
6079  {
6080  row->n_inserts++;
6081  trans_row_count++;
6082  }
6083  DBUG_PRINT("info", ("INSERT INTO %s.%s",
6084  table->s->db.str, table->s->table_name.str));
6085  {
6086  int ret;
6087  if (share->flags & NSF_BLOB_FLAG)
6088  {
6089  my_ptrdiff_t ptrdiff= 0;
6090  ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
6091  blobs_buffer[0],
6092  blobs_buffer_size[0],
6093  ptrdiff);
6094  assert(ret == 0);
6095  }
6096  ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
6097  ret = trans.write_row(logged_server_id,
6098  injector::transaction::table(table, true),
6099  &b, n_fields, table->record[0]);
6100  assert(ret == 0);
6101  }
6102  break;
6103  case NDBEVENT::TE_DELETE:
6104  if (likely( count_this_event ))
6105  {
6106  row->n_deletes++;
6107  trans_row_count++;
6108  }
6109  DBUG_PRINT("info",("DELETE FROM %s.%s",
6110  table->s->db.str, table->s->table_name.str));
6111  {
6112  /*
6113  table->record[0] contains only the primary key in this case
6114  since we do not have an after image
6115  */
6116  int n;
6117  if (!get_binlog_full(share) && table->s->primary_key != MAX_KEY)
6118  n= 0; /*
6119  use the primary key only as it save time and space and
6120  it is the only thing needed to log the delete
6121  */
6122  else
6123  n= 1; /*
6124  we use the before values since we don't have a primary key
6125  since the mysql server does not handle the hidden primary
6126  key
6127  */
6128 
6129  int ret;
6130  if (share->flags & NSF_BLOB_FLAG)
6131  {
6132  my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
6133  ret = get_ndb_blobs_value(table, event_data->ndb_value[n],
6134  blobs_buffer[n],
6135  blobs_buffer_size[n],
6136  ptrdiff);
6137  assert(ret == 0);
6138  }
6139  ndb_unpack_record(table, event_data->ndb_value[n], &b, table->record[n]);
6140  DBUG_EXECUTE("info", print_records(table, table->record[n]););
6141  ret = trans.delete_row(logged_server_id,
6142  injector::transaction::table(table, true),
6143  &b, n_fields, table->record[n]);
6144  assert(ret == 0);
6145  }
6146  break;
6147  case NDBEVENT::TE_UPDATE:
6148  if (likely( count_this_event ))
6149  {
6150  row->n_updates++;
6151  trans_row_count++;
6152  }
6153  DBUG_PRINT("info", ("UPDATE %s.%s",
6154  table->s->db.str, table->s->table_name.str));
6155  {
6156  int ret;
6157  if (share->flags & NSF_BLOB_FLAG)
6158  {
6159  my_ptrdiff_t ptrdiff= 0;
6160  ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
6161  blobs_buffer[0],
6162  blobs_buffer_size[0],
6163  ptrdiff);
6164  assert(ret == 0);
6165  }
6166  ndb_unpack_record(table, event_data->ndb_value[0],
6167  &b, table->record[0]);
6168  DBUG_EXECUTE("info", print_records(table, table->record[0]););
6169  if (table->s->primary_key != MAX_KEY &&
6170  !get_binlog_use_update(share))
6171  {
6172  /*
6173  since table has a primary key, we can do a write
6174  using only after values
6175  */
6176  ret = trans.write_row(logged_server_id,
6177  injector::transaction::table(table, true),
6178  &b, n_fields, table->record[0]);// after values
6179  assert(ret == 0);
6180  }
6181  else
6182  {
6183  /*
6184  mysql server cannot handle the ndb hidden key and
6185  therefore needs the before image as well
6186  */
6187  if (share->flags & NSF_BLOB_FLAG)
6188  {
6189  my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
6190  ret = get_ndb_blobs_value(table, event_data->ndb_value[1],
6191  blobs_buffer[1],
6192  blobs_buffer_size[1],
6193  ptrdiff);
6194  assert(ret == 0);
6195  }
6196  ndb_unpack_record(table, event_data->ndb_value[1], &b, table->record[1]);
6197  DBUG_EXECUTE("info", print_records(table, table->record[1]););
6198  ret = trans.update_row(logged_server_id,
6199  injector::transaction::table(table, true),
6200  &b, n_fields,
6201  table->record[1], // before values
6202  table->record[0]);// after values
6203  assert(ret == 0);
6204  }
6205  }
6206  break;
6207  default:
6208  /* We should REALLY never get here. */
6209  DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
6210  break;
6211  }
6212 
6213  if (share->flags & NSF_BLOB_FLAG)
6214  {
6215  my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
6216  my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
6217  }
6218 
6219  if (!own_buffer)
6220  {
6221  bitmap_free(&b);
6222  }
6223 
6224  return 0;
6225 }
6226 
6227 //#define RUN_NDB_BINLOG_TIMER
6228 #ifdef RUN_NDB_BINLOG_TIMER
6229 class Timer
6230 {
6231 public:
6232  Timer() { start(); }
6233  void start() { gettimeofday(&m_start, 0); }
6234  void stop() { gettimeofday(&m_stop, 0); }
6235  ulong elapsed_ms()
6236  {
6237  return (ulong)
6238  (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
6239  ((longlong) m_stop.tv_usec -
6240  (longlong) m_start.tv_usec + 999) / 1000);
6241  }
6242 private:
6243  struct timeval m_start,m_stop;
6244 };
6245 #endif
6246 
6247 /****************************************************************
6248  Injector thread main loop
6249 ****************************************************************/
6250 
6251 static uchar *
6252 ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object,
6253  size_t *length,
6254  my_bool not_used __attribute__((unused)))
6255 {
6256  *length= schema_object->key_length;
6257  return (uchar*) schema_object->key;
6258 }
6259 
6260 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
6261  my_bool create_if_not_exists,
6262  my_bool have_lock)
6263 {
6264  NDB_SCHEMA_OBJECT *ndb_schema_object;
6265  uint length= (uint) strlen(key);
6266  DBUG_ENTER("ndb_get_schema_object");
6267  DBUG_PRINT("enter", ("key: '%s'", key));
6268 
6269  if (!have_lock)
6270  pthread_mutex_lock(&ndbcluster_mutex);
6271  while (!(ndb_schema_object=
6272  (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
6273  (const uchar*) key,
6274  length)))
6275  {
6276  if (!create_if_not_exists)
6277  {
6278  DBUG_PRINT("info", ("does not exist"));
6279  break;
6280  }
6281  if (!(ndb_schema_object=
6282  (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1,
6283  MYF(MY_WME | MY_ZEROFILL))))
6284  {
6285  DBUG_PRINT("info", ("malloc error"));
6286  break;
6287  }
6288  ndb_schema_object->key= (char *)(ndb_schema_object+1);
6289  memcpy(ndb_schema_object->key, key, length + 1);
6290  ndb_schema_object->key_length= length;
6291  if (my_hash_insert(&ndb_schema_objects, (uchar*) ndb_schema_object))
6292  {
6293  my_free((uchar*) ndb_schema_object, 0);
6294  break;
6295  }
6296  pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
6297  bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
6298  sizeof(ndb_schema_object->slock)*8, FALSE);
6299  bitmap_clear_all(&ndb_schema_object->slock_bitmap);
6300  break;
6301  }
6302  if (ndb_schema_object)
6303  {
6304  ndb_schema_object->use_count++;
6305  DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count));
6306  }
6307  if (!have_lock)
6308  pthread_mutex_unlock(&ndbcluster_mutex);
6309  DBUG_RETURN(ndb_schema_object);
6310 }
6311 
6312 
6313 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
6314  bool have_lock)
6315 {
6316  DBUG_ENTER("ndb_free_schema_object");
6317  DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key));
6318  if (!have_lock)
6319  pthread_mutex_lock(&ndbcluster_mutex);
6320  if (!--(*ndb_schema_object)->use_count)
6321  {
6322  DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
6323  my_hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
6324  pthread_mutex_destroy(&(*ndb_schema_object)->mutex);
6325  my_free((uchar*) *ndb_schema_object, MYF(0));
6326  *ndb_schema_object= 0;
6327  }
6328  else
6329  {
6330  DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
6331  }
6332  if (!have_lock)
6333  pthread_mutex_unlock(&ndbcluster_mutex);
6334  DBUG_VOID_RETURN;
6335 }
6336 
6337 
6338 static void
6339 remove_event_operations(Ndb* ndb)
6340 {
6341  DBUG_ENTER("remove_event_operations");
6342  NdbEventOperation *op;
6343  while ((op= ndb->getEventOperation()))
6344  {
6345  DBUG_ASSERT(!IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
6346  DBUG_PRINT("info", ("removing event operation on %s",
6347  op->getEvent()->getName()));
6348 
6349  Ndb_event_data *event_data= (Ndb_event_data *) op->getCustomData();
6350  DBUG_ASSERT(event_data);
6351 
6352  NDB_SHARE *share= event_data->share;
6353  DBUG_ASSERT(share != NULL);
6354  DBUG_ASSERT(share->op == op || share->new_op == op);
6355 
6356  delete event_data;
6357  op->setCustomData(NULL);
6358 
6359  pthread_mutex_lock(&share->mutex);
6360  share->op= 0;
6361  share->new_op= 0;
6362  pthread_mutex_unlock(&share->mutex);
6363 
6364  DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
6365  share->key, share->use_count));
6366  free_share(&share);
6367 
6368  ndb->dropEventOperation(op);
6369  }
6370  DBUG_VOID_RETURN;
6371 }
6372 
6373 extern long long g_event_data_count;
6374 extern long long g_event_nondata_count;
6375 extern long long g_event_bytes_count;
6376 
6377 void updateInjectorStats(Ndb* schemaNdb, Ndb* dataNdb)
6378 {
6379  /* Update globals to sum of totals from each listening
6380  * Ndb object
6381  */
6382  g_event_data_count =
6383  schemaNdb->getClientStat(Ndb::DataEventsRecvdCount) +
6384  dataNdb->getClientStat(Ndb::DataEventsRecvdCount);
6385  g_event_nondata_count =
6386  schemaNdb->getClientStat(Ndb::NonDataEventsRecvdCount) +
6387  dataNdb->getClientStat(Ndb::NonDataEventsRecvdCount);
6388  g_event_bytes_count =
6389  schemaNdb->getClientStat(Ndb::EventBytesRecvdCount) +
6390  dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
6391 }
6392 
6393 enum Binlog_thread_state
6394 {
6395  BCCC_running= 0,
6396  BCCC_exit= 1,
6397  BCCC_restart= 2
6398 };
6399 
6400 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
6401 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
6402 
6403 pthread_handler_t
6404 ndb_binlog_thread_func(void *arg)
6405 {
6406  THD *thd; /* needs to be first for thread_stack */
6407  Ndb *i_ndb= 0;
6408  Ndb *s_ndb= 0;
6409  Thd_ndb *thd_ndb=0;
6410  injector *inj= injector::instance();
6411  uint incident_id= 0;
6412  Binlog_thread_state do_ndbcluster_binlog_close_connection;
6413 
6419  bool do_incident = true;
6420 
6421 #ifdef RUN_NDB_BINLOG_TIMER
6422  Timer main_timer;
6423 #endif
6424 
6425  pthread_mutex_lock(&injector_mutex);
6426  /*
6427  Set up the Thread
6428  */
6429  my_thread_init();
6430  DBUG_ENTER("ndb_binlog_thread");
6431 
6432  thd= new THD; /* note that contructor of THD uses DBUG_ */
6433  THD_CHECK_SENTRY(thd);
6434 
6435  /* We need to set thd->thread_id before thd->store_globals, or it will
6436  set an invalid value for thd->variables.pseudo_thread_id.
6437  */
6438  mysql_mutex_lock(&LOCK_thread_count);
6439  thd->thread_id= thread_id++;
6440  mysql_mutex_unlock(&LOCK_thread_count);
6441 
6442  thd->thread_stack= (char*) &thd; /* remember where our stack is */
6443  if (thd->store_globals())
6444  {
6445  thd->cleanup();
6446  delete thd;
6447  ndb_binlog_thread_running= -1;
6448  pthread_mutex_unlock(&injector_mutex);
6449  pthread_cond_signal(&injector_cond);
6450 
6451  DBUG_LEAVE; // Must match DBUG_ENTER()
6452  my_thread_end();
6453  pthread_exit(0);
6454  return NULL; // Avoid compiler warnings
6455  }
6456  lex_start(thd);
6457 
6458  thd->init_for_queries();
6459  thd_set_command(thd, COM_DAEMON);
6460  thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
6461 #ifndef NDB_THD_HAS_NO_VERSION
6462  thd->version= refresh_version;
6463 #endif
6464  thd->client_capabilities= 0;
6465  thd->security_ctx->skip_grants();
6466  my_net_init(&thd->net, 0);
6467 
6468  // Ndb binlog thread always use row format
6469  thd->set_current_stmt_binlog_format_row();
6470 
6471  /*
6472  Set up ndb binlog
6473  */
6474  sql_print_information("Starting Cluster Binlog Thread");
6475 
6476  pthread_detach_this_thread();
6477  thd->real_id= pthread_self();
6478  mysql_mutex_lock(&LOCK_thread_count);
6479  add_global_thread(thd);
6480  mysql_mutex_unlock(&LOCK_thread_count);
6481  thd->lex->start_transaction_opt= 0;
6482 
6483 
6484 restart_cluster_failure:
6485  int have_injector_mutex_lock= 0;
6486  do_ndbcluster_binlog_close_connection= BCCC_exit;
6487 
6488  if (!(thd_ndb= Thd_ndb::seize(thd)))
6489  {
6490  sql_print_error("Could not allocate Thd_ndb object");
6491  ndb_binlog_thread_running= -1;
6492  pthread_mutex_unlock(&injector_mutex);
6493  pthread_cond_signal(&injector_cond);
6494  goto err;
6495  }
6496 
6497  if (!(s_ndb= new Ndb(g_ndb_cluster_connection, NDB_REP_DB)) ||
6498  s_ndb->init())
6499  {
6500  sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
6501  ndb_binlog_thread_running= -1;
6502  pthread_mutex_unlock(&injector_mutex);
6503  pthread_cond_signal(&injector_cond);
6504  goto err;
6505  }
6506 
6507  // empty database
6508  if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
6509  i_ndb->init())
6510  {
6511  sql_print_error("NDB Binlog: Getting Ndb object failed");
6512  ndb_binlog_thread_running= -1;
6513  pthread_mutex_unlock(&injector_mutex);
6514  pthread_cond_signal(&injector_cond);
6515  goto err;
6516  }
6517 
6518  /* init hash for schema object distribution */
6519  (void) my_hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0,
6520  (my_hash_get_key)ndb_schema_objects_get_key, 0, 0);
6521 
6522  /*
6523  Expose global reference to our ndb object.
6524 
6525  Used by both sql client thread and binlog thread to interact
6526  with the storage
6527  pthread_mutex_lock(&injector_mutex);
6528  */
6529  injector_thd= thd;
6530  injector_ndb= i_ndb;
6531  schema_ndb= s_ndb;
6532 
6533  if (opt_bin_log && opt_ndb_log_bin)
6534  {
6535  ndb_binlog_running= TRUE;
6536  }
6537 
6538  /* Thread start up completed */
6539  ndb_binlog_thread_running= 1;
6540  pthread_mutex_unlock(&injector_mutex);
6541  pthread_cond_signal(&injector_cond);
6542 
6543  /*
6544  wait for mysql server to start (so that the binlog is started
6545  and thus can receive the first GAP event)
6546  */
6547  mysql_mutex_lock(&LOCK_server_started);
6548  while (!mysqld_server_started)
6549  {
6550  struct timespec abstime;
6551  set_timespec(abstime, 1);
6552  mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
6553  &abstime);
6554  if (ndbcluster_terminating)
6555  {
6556  mysql_mutex_unlock(&LOCK_server_started);
6557  goto err;
6558  }
6559  }
6560  mysql_mutex_unlock(&LOCK_server_started);
6561  /*
6562  Main NDB Injector loop
6563  */
6564  while (do_incident && ndb_binlog_running)
6565  {
6566  /*
6567  check if it is the first log, if so we do not insert a GAP event
6568  as there is really no log to have a GAP in
6569  */
6570  if (incident_id == 0)
6571  {
6572  LOG_INFO log_info;
6573  mysql_bin_log.get_current_log(&log_info);
6574  int len= strlen(log_info.log_file_name);
6575  uint no= 0;
6576  if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
6577  no == 1)
6578  {
6579  /* this is the fist log, so skip GAP event */
6580  break;
6581  }
6582  }
6583 
6584  /*
6585  Always insert a GAP event as we cannot know what has happened
6586  in the cluster while not being connected.
6587  */
6588  LEX_STRING const msg[2]=
6589  {
6590  { C_STRING_WITH_LEN("mysqld startup") },
6591  { C_STRING_WITH_LEN("cluster disconnect")}
6592  };
6593  int ret = inj->record_incident(thd, INCIDENT_LOST_EVENTS,
6594  msg[incident_id]);
6595  assert(ret == 0); NDB_IGNORE_VALUE(ret);
6596  do_incident = false; // Don't report incident again, unless we get started
6597  break;
6598  }
6599  incident_id= 1;
6600  {
6601  thd->proc_info= "Waiting for ndbcluster to start";
6602 
6603  pthread_mutex_lock(&injector_mutex);
6604  while (!ndb_schema_share ||
6605  (ndb_binlog_running && !ndb_apply_status_share) ||
6606  !ndb_binlog_tables_inited)
6607  {
6608  if (!thd_ndb->valid_ndb())
6609  {
6610  /*
6611  Cluster has gone away before setup was completed.
6612  Keep lock on injector_mutex to prevent further
6613  usage of the injector_ndb, and restart binlog
6614  thread to get rid of any garbage on the ndb objects
6615  */
6616  have_injector_mutex_lock= 1;
6617  do_ndbcluster_binlog_close_connection= BCCC_restart;
6618  goto err;
6619  }
6620  /* ndb not connected yet */
6621  struct timespec abstime;
6622  set_timespec(abstime, 1);
6623  pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
6624  if (ndbcluster_binlog_terminating)
6625  {
6626  pthread_mutex_unlock(&injector_mutex);
6627  goto err;
6628  }
6629  }
6630  pthread_mutex_unlock(&injector_mutex);
6631 
6632  DBUG_ASSERT(ndbcluster_hton->slot != ~(uint)0);
6633  thd_set_thd_ndb(thd, thd_ndb);
6634  thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
6635  thd->query_id= 0; // to keep valgrind quiet
6636  }
6637 
6638  {
6639  // wait for the first event
6640  thd->proc_info= "Waiting for first event from ndbcluster";
6641  int schema_res, res;
6642  Uint64 schema_gci;
6643  do
6644  {
6645  DBUG_PRINT("info", ("Waiting for the first event"));
6646 
6647  if (ndbcluster_binlog_terminating)
6648  goto err;
6649 
6650  schema_res= s_ndb->pollEvents(100, &schema_gci);
6651  } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
6652  if (ndb_binlog_running)
6653  {
6654  Uint64 gci= i_ndb->getLatestGCI();
6655  while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
6656  {
6657  if (ndbcluster_binlog_terminating)
6658  goto err;
6659  res= i_ndb->pollEvents(10, &gci);
6660  }
6661  if (gci > schema_gci)
6662  {
6663  schema_gci= gci;
6664  }
6665  }
6666  // now check that we have epochs consistant with what we had before the restart
6667  DBUG_PRINT("info", ("schema_res: %d schema_gci: %u/%u", schema_res,
6668  (uint)(schema_gci >> 32),
6669  (uint)(schema_gci)));
6670  {
6671  i_ndb->flushIncompleteEvents(schema_gci);
6672  s_ndb->flushIncompleteEvents(schema_gci);
6673  if (schema_gci < ndb_latest_handled_binlog_epoch)
6674  {
6675  sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
6676  "ndb_latest_handled_binlog_epoch: %u/%u, while current epoch: %u/%u. "
6677  "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
6678  (uint)(ndb_latest_handled_binlog_epoch >> 32),
6679  (uint)(ndb_latest_handled_binlog_epoch),
6680  (uint)(schema_gci >> 32),
6681  (uint)(schema_gci));
6682  ndb_set_latest_trans_gci(0);
6683  ndb_latest_handled_binlog_epoch= 0;
6684  ndb_latest_applied_binlog_epoch= 0;
6685  ndb_latest_received_binlog_epoch= 0;
6686  }
6687  else if (ndb_latest_applied_binlog_epoch > 0)
6688  {
6689  sql_print_warning("NDB Binlog: cluster has reconnected. "
6690  "Changes to the database that occured while "
6691  "disconnected will not be in the binlog");
6692  }
6693  if (opt_ndb_extra_logging)
6694  {
6695  sql_print_information("NDB Binlog: starting log at epoch %u/%u",
6696  (uint)(schema_gci >> 32),
6697  (uint)(schema_gci));
6698  }
6699  }
6700  }
6701  /*
6702  binlog thread is ready to receive events
6703  - client threads may now start updating data, i.e. tables are
6704  no longer read only
6705  */
6706  ndb_binlog_is_ready= TRUE;
6707 
6708  if (opt_ndb_extra_logging)
6709  sql_print_information("NDB Binlog: ndb tables writable");
6710  close_cached_tables((THD*) 0, (TABLE_LIST*) 0, FALSE, FALSE, FALSE);
6711 
6712  /*
6713  Signal any waiting thread that ndb table setup is
6714  now complete
6715  */
6716  ndb_notify_tables_writable();
6717 
6718  {
6719  static char db[]= "";
6720  thd->db= db;
6721  }
6722  do_incident = true; // If we get disconnected again...do incident report
6723  do_ndbcluster_binlog_close_connection= BCCC_running;
6724  for ( ; !((ndbcluster_binlog_terminating ||
6725  do_ndbcluster_binlog_close_connection) &&
6726  ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci()) &&
6727  do_ndbcluster_binlog_close_connection != BCCC_restart; )
6728  {
6729 #ifndef DBUG_OFF
6730  if (do_ndbcluster_binlog_close_connection)
6731  {
6732  DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
6733  "ndb_latest_handled_binlog_epoch: %u/%u, "
6734  "*get_latest_trans_gci(): %u/%u",
6735  do_ndbcluster_binlog_close_connection,
6736  (uint)(ndb_latest_handled_binlog_epoch >> 32),
6737  (uint)(ndb_latest_handled_binlog_epoch),
6738  (uint)(ndb_get_latest_trans_gci() >> 32),
6739  (uint)(ndb_get_latest_trans_gci())));
6740  }
6741 #endif
6742 #ifdef RUN_NDB_BINLOG_TIMER
6743  main_timer.stop();
6744  sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
6745  main_timer.start();
6746 #endif
6747 
6748  /*
6749  now we don't want any events before next gci is complete
6750  */
6751  thd->proc_info= "Waiting for event from ndbcluster";
6752  thd->set_time();
6753 
6754  /* wait for event or 1000 ms */
6755  Uint64 gci= 0, schema_gci;
6756  int res= 0, tot_poll_wait= 1000;
6757  if (ndb_binlog_running)
6758  {
6759  res= i_ndb->pollEvents(tot_poll_wait, &gci);
6760  tot_poll_wait= 0;
6761  }
6762  int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
6763  ndb_latest_received_binlog_epoch= gci;
6764 
6765  while (gci > schema_gci && schema_res >= 0)
6766  {
6767  static char buf[64];
6768  thd->proc_info= "Waiting for schema epoch";
6769  my_snprintf(buf, sizeof(buf), "%s %u/%u(%u/%u)", thd->proc_info,
6770  (uint)(schema_gci >> 32),
6771  (uint)(schema_gci),
6772  (uint)(gci >> 32),
6773  (uint)(gci));
6774  thd->proc_info= buf;
6775  schema_res= s_ndb->pollEvents(10, &schema_gci);
6776  }
6777 
6778  if ((ndbcluster_binlog_terminating ||
6779  do_ndbcluster_binlog_close_connection) &&
6780  (ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci() ||
6781  !ndb_binlog_running))
6782  break; /* Shutting down server */
6783 
6784  MEM_ROOT **root_ptr=
6785  my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
6786  MEM_ROOT *old_root= *root_ptr;
6787  MEM_ROOT mem_root;
6788  init_sql_alloc(&mem_root, 4096, 0);
6789  List<Cluster_schema> post_epoch_log_list;
6790  List<Cluster_schema> post_epoch_unlock_list;
6791  *root_ptr= &mem_root;
6792 
6793  if (unlikely(schema_res > 0))
6794  {
6795  thd->proc_info= "Processing events from schema table";
6796  g_ndb_log_slave_updates= opt_log_slave_updates;
6797  s_ndb->
6798  setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6799  s_ndb->
6800  setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6801  NdbEventOperation *pOp= s_ndb->nextEvent();
6802  while (pOp != NULL)
6803  {
6804  if (!pOp->hasError())
6805  {
6806  ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
6807  &post_epoch_log_list,
6808  &post_epoch_unlock_list,
6809  &mem_root);
6810  DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
6811  s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6812  "<empty>"));
6813  DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
6814  i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6815  "<empty>"));
6816  if (i_ndb->getEventOperation() == NULL &&
6817  s_ndb->getEventOperation() == NULL &&
6818  do_ndbcluster_binlog_close_connection == BCCC_running)
6819  {
6820  DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
6821  do_ndbcluster_binlog_close_connection= BCCC_restart;
6822  if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
6823  {
6824  sql_print_error("NDB Binlog: latest transaction in epoch %u/%u not in binlog "
6825  "as latest received epoch is %u/%u",
6826  (uint)(ndb_get_latest_trans_gci() >> 32),
6827  (uint)(ndb_get_latest_trans_gci()),
6828  (uint)(ndb_latest_received_binlog_epoch >> 32),
6829  (uint)(ndb_latest_received_binlog_epoch));
6830  }
6831  }
6832  }
6833  else
6834  sql_print_error("NDB: error %lu (%s) on handling "
6835  "binlog schema event",
6836  (ulong) pOp->getNdbError().code,
6837  pOp->getNdbError().message);
6838  pOp= s_ndb->nextEvent();
6839  }
6840  updateInjectorStats(s_ndb, i_ndb);
6841  }
6842 
6843  if (!ndb_binlog_running)
6844  {
6845  /*
6846  Just consume any events, not used if no binlogging
6847  e.g. node failure events
6848  */
6849  Uint64 tmp_gci;
6850  if (i_ndb->pollEvents(0, &tmp_gci))
6851  {
6852  NdbEventOperation *pOp;
6853  while ((pOp= i_ndb->nextEvent()))
6854  {
6855  if ((unsigned) pOp->getEventType() >=
6856  (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
6857  {
6858  ndb_binlog_index_row row;
6859  ndb_binlog_thread_handle_non_data_event(thd, pOp, row);
6860  }
6861  }
6862  if (i_ndb->getEventOperation() == NULL &&
6863  s_ndb->getEventOperation() == NULL &&
6864  do_ndbcluster_binlog_close_connection == BCCC_running)
6865  {
6866  DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
6867  do_ndbcluster_binlog_close_connection= BCCC_restart;
6868  }
6869  }
6870  updateInjectorStats(s_ndb, i_ndb);
6871  }
6872  else if (res > 0 ||
6873  (ndb_log_empty_epochs() &&
6874  gci > ndb_latest_handled_binlog_epoch))
6875  {
6876  DBUG_PRINT("info", ("pollEvents res: %d", res));
6877  thd->proc_info= "Processing events";
6878  uchar apply_status_buf[512];
6879  TABLE *apply_status_table= NULL;
6880  if (ndb_apply_status_share)
6881  {
6882  /*
6883  We construct the buffer to write the apply status binlog
6884  event here, as the table->record[0] buffer is referenced
6885  by the apply status event operation, and will be filled
6886  with data at the nextEvent call if the first event should
6887  happen to be from the apply status table
6888  */
6889  Ndb_event_data *event_data= ndb_apply_status_share->event_data;
6890  if (!event_data)
6891  {
6892  DBUG_ASSERT(ndb_apply_status_share->op);
6893  event_data=
6894  (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
6895  DBUG_ASSERT(event_data);
6896  }
6897  apply_status_table= event_data->shadow_table;
6898 
6899  /*
6900  Intialize apply_status_table->record[0]
6901  */
6902  empty_record(apply_status_table);
6903 
6904  apply_status_table->field[0]->store((longlong)::server_id, true);
6905  /*
6906  gci is added later, just before writing to binlog as gci
6907  is unknown here
6908  */
6909  apply_status_table->field[2]->store("", 0, &my_charset_bin);
6910  apply_status_table->field[3]->store((longlong)0, true);
6911  apply_status_table->field[4]->store((longlong)0, true);
6912  DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength);
6913  memcpy(apply_status_buf, apply_status_table->record[0],
6914  apply_status_table->s->reclength);
6915  }
6916  NdbEventOperation *pOp= i_ndb->nextEvent();
6917  ndb_binlog_index_row _row;
6918  ndb_binlog_index_row *rows= &_row;
6919  injector::transaction trans;
6920  unsigned trans_row_count= 0;
6921  unsigned trans_slave_row_count= 0;
6922  if (!pOp)
6923  {
6924  /*
6925  Must be an empty epoch since the condition
6926  (ndb_log_empty_epochs() &&
6927  gci > ndb_latest_handled_binlog_epoch)
6928  must be true we write empty epoch into
6929  ndb_binlog_index
6930  */
6931  DBUG_PRINT("info", ("Writing empty epoch for gci %llu", gci));
6932  DBUG_PRINT("info", ("Initializing transaction"));
6933  inj->new_trans(thd, &trans);
6934  rows= &_row;
6935  memset(&_row, 0, sizeof(_row));
6936  thd->variables.character_set_client= &my_charset_latin1;
6937  goto commit_to_binlog;
6938  }
6939  while (pOp != NULL)
6940  {
6941  rows= &_row;
6942 #ifdef RUN_NDB_BINLOG_TIMER
6943  Timer gci_timer, write_timer;
6944  int event_count= 0;
6945  gci_timer.start();
6946 #endif
6947  gci= pOp->getGCI();
6948  DBUG_PRINT("info", ("Handling gci: %u/%u",
6949  (uint)(gci >> 32),
6950  (uint)(gci)));
6951  // sometimes get TE_ALTER with invalid table
6952  DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
6953  ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
6954  DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
6955 
6956  /* initialize some variables for this epoch */
6957  g_ndb_log_slave_updates= opt_log_slave_updates;
6958  i_ndb->
6959  setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6960  i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6961 
6962  memset(&_row, 0, sizeof(_row));
6963  thd->variables.character_set_client= &my_charset_latin1;
6964  DBUG_PRINT("info", ("Initializing transaction"));
6965  inj->new_trans(thd, &trans);
6966  trans_row_count= 0;
6967  trans_slave_row_count= 0;
6968  // pass table map before epoch
6969  {
6970  Uint32 iter= 0;
6971  const NdbEventOperation *gci_op;
6972  Uint32 event_types;
6973 
6974  if (!i_ndb->isConsistentGCI(gci))
6975  {
6976  char errmsg[64];
6977  uint end= sprintf(&errmsg[0],
6978  "Detected missing data in GCI %llu, "
6979  "inserting GAP event", gci);
6980  errmsg[end]= '\0';
6981  DBUG_PRINT("info",
6982  ("Detected missing data in GCI %llu, "
6983  "inserting GAP event", gci));
6984  LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
6985  inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
6986  }
6987  while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
6988  != NULL)
6989  {
6990  Ndb_event_data *event_data=
6991  (Ndb_event_data *) gci_op->getCustomData();
6992  NDB_SHARE *share= (event_data)?event_data->share:NULL;
6993  DBUG_PRINT("info", ("per gci_op: 0x%lx share: 0x%lx event_types: 0x%x",
6994  (long) gci_op, (long) share, event_types));
6995  // workaround for interface returning TE_STOP events
6996  // which are normally filtered out below in the nextEvent loop
6997  if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
6998  {
6999  DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
7000  gci_op->getEvent()->getTable()->getName()));
7001  continue;
7002  }
7003  // this should not happen
7004  if (share == NULL || event_data->shadow_table == NULL)
7005  {
7006  DBUG_PRINT("info", ("no share or table %s!",
7007  gci_op->getEvent()->getTable()->getName()));
7008  continue;
7009  }
7010  if (share == ndb_apply_status_share)
7011  {
7012  // skip this table, it is handled specially
7013  continue;
7014  }
7015  TABLE *table= event_data->shadow_table;
7016 #ifndef DBUG_OFF
7017  const LEX_STRING &name= table->s->table_name;
7018 #endif
7019  if ((event_types & (NdbDictionary::Event::TE_INSERT |
7022  {
7023  DBUG_PRINT("info", ("skipping non data event table: %.*s",
7024  (int) name.length, name.str));
7025  continue;
7026  }
7027  if (!trans.good())
7028  {
7029  DBUG_PRINT("info",
7030  ("Found new data event, initializing transaction"));
7031  inj->new_trans(thd, &trans);
7032  }
7033  DBUG_PRINT("info", ("use_table: %.*s, cols %u",
7034  (int) name.length, name.str,
7035  table->s->fields));
7036  injector::transaction::table tbl(table, true);
7037  int ret = trans.use_table(::server_id, tbl);
7038  assert(ret == 0); NDB_IGNORE_VALUE(ret);
7039  }
7040  }
7041  if (trans.good())
7042  {
7043  if (apply_status_table)
7044  {
7045 #ifndef DBUG_OFF
7046  const LEX_STRING& name= apply_status_table->s->table_name;
7047  DBUG_PRINT("info", ("use_table: %.*s",
7048  (int) name.length, name.str));
7049 #endif
7050  injector::transaction::table tbl(apply_status_table, true);
7051  int ret = trans.use_table(::server_id, tbl);
7052  assert(ret == 0); NDB_IGNORE_VALUE(ret);
7053 
7054  /* add the gci to the record */
7055  Field *field= apply_status_table->field[1];
7056  my_ptrdiff_t row_offset=
7057  (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]);
7058  field->move_field_offset(row_offset);
7059  field->store((longlong)gci, true);
7060  field->move_field_offset(-row_offset);
7061 
7062  trans.write_row(::server_id,
7063  injector::transaction::table(apply_status_table,
7064  true),
7065  &apply_status_table->s->all_set,
7066  apply_status_table->s->fields,
7067  apply_status_buf);
7068  }
7069  else
7070  {
7071  sql_print_error("NDB: Could not get apply status share");
7072  }
7073  }
7074 #ifdef RUN_NDB_BINLOG_TIMER
7075  write_timer.start();
7076 #endif
7077  do
7078  {
7079 #ifdef RUN_NDB_BINLOG_TIMER
7080  event_count++;
7081 #endif
7082  if (pOp->hasError() &&
7083  ndb_binlog_thread_handle_error(i_ndb, pOp) < 0)
7084  goto err;
7085 
7086 #ifndef DBUG_OFF
7087  {
7088  Ndb_event_data *event_data=
7089  (Ndb_event_data *) pOp->getCustomData();
7090  NDB_SHARE *share= (event_data)?event_data->share:NULL;
7091  DBUG_PRINT("info",
7092  ("EVENT TYPE: %d GCI: %u/%u last applied: %u/%u "
7093  "share: 0x%lx (%s.%s)", pOp->getEventType(),
7094  (uint)(gci >> 32),
7095  (uint)(gci),
7096  (uint)(ndb_latest_applied_binlog_epoch >> 32),
7097  (uint)(ndb_latest_applied_binlog_epoch),
7098  (long) share,
7099  share ? share->db : "'NULL'",
7100  share ? share->table_name : "'NULL'"));
7101  DBUG_ASSERT(share != 0);
7102  }
7103  // assert that there is consistancy between gci op list
7104  // and event list
7105  {
7106  Uint32 iter= 0;
7107  const NdbEventOperation *gci_op;
7108  Uint32 event_types;
7109  while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
7110  != NULL)
7111  {
7112  if (gci_op == pOp)
7113  break;
7114  }
7115  DBUG_ASSERT(gci_op == pOp);
7116  DBUG_ASSERT((event_types & pOp->getEventType()) != 0);
7117  }
7118 #endif
7119  if ((unsigned) pOp->getEventType() <
7120  (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
7121  ndb_binlog_thread_handle_data_event(i_ndb, pOp, &rows, trans, trans_row_count, trans_slave_row_count);
7122  else
7123  {
7124  ndb_binlog_thread_handle_non_data_event(thd, pOp, *rows);
7125  DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
7126  s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
7127  "<empty>"));
7128  DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
7129  i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
7130  "<empty>"));
7131  if (i_ndb->getEventOperation() == NULL &&
7132  s_ndb->getEventOperation() == NULL &&
7133  do_ndbcluster_binlog_close_connection == BCCC_running)
7134  {
7135  DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
7136  do_ndbcluster_binlog_close_connection= BCCC_restart;
7137  if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
7138  {
7139  sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
7140  "as latest received epoch is %lu",
7141  (ulong) ndb_get_latest_trans_gci(),
7142  (ulong) ndb_latest_received_binlog_epoch);
7143  }
7144  }
7145  }
7146 
7147  pOp= i_ndb->nextEvent();
7148  } while (pOp && pOp->getGCI() == gci);
7149 
7150  updateInjectorStats(s_ndb, i_ndb);
7151 
7152  /*
7153  note! pOp is not referring to an event in the next epoch
7154  or is == 0
7155  */
7156 #ifdef RUN_NDB_BINLOG_TIMER
7157  write_timer.stop();
7158 #endif
7159 
7160  while (trans.good())
7161  {
7162  if (!ndb_log_empty_epochs())
7163  {
7164  /*
7165  If
7166  - We did not add any 'real' rows to the Binlog AND
7167  - We did not apply any slave row updates, only
7168  ndb_apply_status updates
7169  THEN
7170  Don't write the Binlog transaction which just
7171  contains ndb_apply_status updates.
7172  (For cicular rep with log_apply_status, ndb_apply_status
7173  updates will propagate while some related, real update
7174  is propagating)
7175  */
7176  if ((trans_row_count == 0) &&
7177  (! (opt_ndb_log_apply_status &&
7178  trans_slave_row_count) ))
7179  {
7180  /* nothing to commit, rollback instead */
7181  if (int r= trans.rollback())
7182  {
7183  sql_print_error("NDB Binlog: "
7184  "Error during ROLLBACK of GCI %u/%u. Error: %d",
7185  uint(gci >> 32), uint(gci), r);
7186  /* TODO: Further handling? */
7187  }
7188  break;
7189  }
7190  }
7191  commit_to_binlog:
7192  thd->proc_info= "Committing events to binlog";
7193  injector::transaction::binlog_pos start= trans.start_pos();
7194  if (int r= trans.commit())
7195  {
7196  sql_print_error("NDB Binlog: "
7197  "Error during COMMIT of GCI. Error: %d",
7198  r);
7199  /* TODO: Further handling? */
7200  }
7201  rows->gci= (Uint32)(gci >> 32); // Expose gci hi/lo
7202  rows->epoch= gci;
7203  rows->master_log_file= start.file_name();
7204  rows->master_log_pos= start.file_pos();
7205 
7206  DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
7207  if (opt_ndb_log_binlog_index)
7208  {
7209  if (ndb_binlog_index_table__write_rows(thd, rows))
7210  {
7211  /*
7212  Writing to ndb_binlog_index failed, check if we are
7213  being killed and retry
7214  */
7215  if (thd->killed)
7216  {
7217  (void) mysql_mutex_lock(&LOCK_thread_count);
7218  volatile THD::killed_state killed= thd->killed;
7219  /* We are cleaning up, allow for flushing last epoch */
7220  thd->killed= THD::NOT_KILLED;
7221  ndb_binlog_index_table__write_rows(thd, rows);
7222  /* Restore kill flag */
7223  thd->killed= killed;
7224  (void) mysql_mutex_unlock(&LOCK_thread_count);
7225  }
7226  }
7227  }
7228  ndb_latest_applied_binlog_epoch= gci;
7229  break;
7230  }
7231  ndb_latest_handled_binlog_epoch= gci;
7232 
7233 #ifdef RUN_NDB_BINLOG_TIMER
7234  gci_timer.stop();
7235  sql_print_information("gci %ld event_count %d write time "
7236  "%ld(%d e/s), total time %ld(%d e/s)",
7237  (ulong)gci, event_count,
7238  write_timer.elapsed_ms(),
7239  (1000*event_count) / write_timer.elapsed_ms(),
7240  gci_timer.elapsed_ms(),
7241  (1000*event_count) / gci_timer.elapsed_ms());
7242 #endif
7243  }
7244  if(!i_ndb->isConsistent(gci))
7245  {
7246  char errmsg[64];
7247  uint end= sprintf(&errmsg[0],
7248  "Detected missing data in GCI %llu, "
7249  "inserting GAP event", gci);
7250  errmsg[end]= '\0';
7251  DBUG_PRINT("info",
7252  ("Detected missing data in GCI %llu, "
7253  "inserting GAP event", gci));
7254  LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
7255  inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
7256  }
7257  }
7258 
7259  ndb_binlog_thread_handle_schema_event_post_epoch(thd,
7260  &post_epoch_log_list,
7261  &post_epoch_unlock_list);
7262  free_root(&mem_root, MYF(0));
7263  *root_ptr= old_root;
7264  ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
7265  }
7266  err:
7267  if (do_ndbcluster_binlog_close_connection != BCCC_restart)
7268  {
7269  sql_print_information("Stopping Cluster Binlog");
7270  DBUG_PRINT("info",("Shutting down cluster binlog thread"));
7271  thd->proc_info= "Shutting down";
7272  }
7273  else
7274  {
7275  sql_print_information("Restarting Cluster Binlog");
7276  DBUG_PRINT("info",("Restarting cluster binlog thread"));
7277  thd->proc_info= "Restarting";
7278  }
7279  if (!have_injector_mutex_lock)
7280  pthread_mutex_lock(&injector_mutex);
7281  /* don't mess with the injector_ndb anymore from other threads */
7282  injector_thd= 0;
7283  injector_ndb= 0;
7284  schema_ndb= 0;
7285  pthread_mutex_unlock(&injector_mutex);
7286  thd->db= 0; // as not to try to free memory
7287 
7288  /*
7289  This will cause the util thread to start to try to initialize again
7290  via ndbcluster_setup_binlog_table_shares. But since injector_ndb is
7291  set to NULL it will not succeed until injector_ndb is reinitialized.
7292  */
7293  ndb_binlog_tables_inited= FALSE;
7294 
7295  if (ndb_apply_status_share)
7296  {
7297  /* ndb_share reference binlog extra free */
7298  DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
7299  ndb_apply_status_share->key,
7300  ndb_apply_status_share->use_count));
7301  free_share(&ndb_apply_status_share);
7302  ndb_apply_status_share= 0;
7303  }
7304  if (ndb_schema_share)
7305  {
7306  /* begin protect ndb_schema_share */
7307  pthread_mutex_lock(&ndb_schema_share_mutex);
7308  /* ndb_share reference binlog extra free */
7309  DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
7310  ndb_schema_share->key,
7311  ndb_schema_share->use_count));
7312  free_share(&ndb_schema_share);
7313  ndb_schema_share= 0;
7314  pthread_mutex_unlock(&ndb_schema_share_mutex);
7315  /* end protect ndb_schema_share */
7316  }
7317 
7318  /* remove all event operations */
7319  if (s_ndb)
7320  {
7321  remove_event_operations(s_ndb);
7322  delete s_ndb;
7323  s_ndb= 0;
7324  }
7325  if (i_ndb)
7326  {
7327  remove_event_operations(i_ndb);
7328  delete i_ndb;
7329  i_ndb= 0;
7330  }
7331 
7332  my_hash_free(&ndb_schema_objects);
7333 
7334  if (thd_ndb)
7335  {
7336  Thd_ndb::release(thd_ndb);
7337  thd_set_thd_ndb(thd, NULL);
7338  thd_ndb= NULL;
7339  }
7340 
7344  {
7345  if (opt_ndb_extra_logging > 9)
7346  sql_print_information("NDB Binlog: Release extra share references");
7347 
7348  pthread_mutex_lock(&ndbcluster_mutex);
7349  for (uint i= 0; i < ndbcluster_open_tables.records;)
7350  {
7351  NDB_SHARE * share = (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables,
7352  i);
7353  if (share->state != NSS_DROPPED)
7354  {
7355  /*
7356  The share kept by the server has not been freed, free it
7357  */
7358  share->state= NSS_DROPPED;
7359  /* ndb_share reference create free */
7360  DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
7361  share->key, share->use_count));
7362  free_share(&share, TRUE);
7363 
7368  i = 0;
7369  }
7370  else
7371  {
7372  i++;
7373  }
7374  }
7375  pthread_mutex_unlock(&ndbcluster_mutex);
7376  }
7377 
7378  close_cached_tables((THD*) 0, (TABLE_LIST*) 0, FALSE, FALSE, FALSE);
7379  if (opt_ndb_extra_logging > 15)
7380  {
7381  sql_print_information("NDB Binlog: remaining open tables: ");
7382  for (uint i= 0; i < ndbcluster_open_tables.records; i++)
7383  {
7384  NDB_SHARE* share = (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables,i);
7385  sql_print_information(" %s.%s state: %u use_count: %u",
7386  share->db,
7387  share->table_name,
7388  (uint)share->state,
7389  share->use_count);
7390  }
7391  }
7392 
7393  if (do_ndbcluster_binlog_close_connection == BCCC_restart)
7394  {
7395  pthread_mutex_lock(&injector_mutex);
7396  goto restart_cluster_failure;
7397  }
7398 
7399  net_end(&thd->net);
7400  thd->cleanup();
7401  delete thd;
7402 
7403  ndb_binlog_thread_running= -1;
7404  ndb_binlog_running= FALSE;
7405  (void) pthread_cond_signal(&injector_cond);
7406 
7407  DBUG_PRINT("exit", ("ndb_binlog_thread"));
7408 
7409  DBUG_LEAVE; // Must match DBUG_ENTER()
7410  my_thread_end();
7411  pthread_exit(0);
7412  return NULL; // Avoid compiler warnings
7413 }
7414 
7415 bool
7416 ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
7417  enum ha_stat_type stat_type)
7418 {
7419  char buf[IO_SIZE];
7420  uint buflen;
7421  ulonglong ndb_latest_epoch= 0;
7422  DBUG_ENTER("ndbcluster_show_status_binlog");
7423 
7424  pthread_mutex_lock(&injector_mutex);
7425  if (injector_ndb)
7426  {
7427  char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
7428  ndb_latest_epoch= injector_ndb->getLatestGCI();
7429  pthread_mutex_unlock(&injector_mutex);
7430 
7431  buflen=
7432  my_snprintf(buf, sizeof(buf),
7433  "latest_epoch=%s, "
7434  "latest_trans_epoch=%s, "
7435  "latest_received_binlog_epoch=%s, "
7436  "latest_handled_binlog_epoch=%s, "
7437  "latest_applied_binlog_epoch=%s",
7438  llstr(ndb_latest_epoch, buff1),
7439  llstr(ndb_get_latest_trans_gci(), buff2),
7440  llstr(ndb_latest_received_binlog_epoch, buff3),
7441  llstr(ndb_latest_handled_binlog_epoch, buff4),
7442  llstr(ndb_latest_applied_binlog_epoch, buff5));
7443  if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
7444  "binlog", strlen("binlog"),
7445  buf, buflen))
7446  DBUG_RETURN(TRUE);
7447  }
7448  else
7449  pthread_mutex_unlock(&injector_mutex);
7450  DBUG_RETURN(FALSE);
7451 }
7452 
7453 /*
7454  AnyValue carries ServerId or Reserved codes
7455  Bits from opt_server_id_bits to 30 may carry other data
7456  so we ignore them when reading/setting AnyValue.
7457 
7458  332 21 10 0
7459  10987654321098765432109876543210
7460  roooooooooooooooooooooooosssssss
7461 
7462  r = Reserved bit indicates whether
7463  bits 0-7+ have ServerId (0) or
7464  some special reserved code (1).
7465  o = Optional bits, depending on value
7466  of server-id-bits will be
7467  serverid bits or user-specific
7468  data
7469  s = Serverid bits or reserved codes
7470  At least 7 bits will be available
7471  for serverid or reserved codes
7472 
7473 */
7474 
7475 #define NDB_ANYVALUE_RESERVED_BIT 0x80000000
7476 #define NDB_ANYVALUE_RESERVED_MASK 0x8000007f
7477 
7478 #define NDB_ANYVALUE_NOLOGGING_CODE 0x8000007f
7479 
7480 #ifndef DBUG_OFF
7481 void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue)
7482 {
7483  /*
7484  Set userData part of AnyValue (if there is one) to
7485  all 1s to test that it is ignored
7486  */
7487  const Uint32 userDataMask = ~(opt_server_id_mask |
7488  NDB_ANYVALUE_RESERVED_BIT);
7489 
7490  anyValue |= userDataMask;
7491 }
7492 #endif
7493 
7494 bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue)
7495 {
7496  return ((anyValue & NDB_ANYVALUE_RESERVED_BIT) != 0);
7497 }
7498 
7499 bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue)
7500 {
7501  return ((anyValue & NDB_ANYVALUE_RESERVED_MASK) ==
7502  NDB_ANYVALUE_NOLOGGING_CODE);
7503 }
7504 
7505 void ndbcluster_anyvalue_set_nologging(Uint32& anyValue)
7506 {
7507  anyValue |= NDB_ANYVALUE_NOLOGGING_CODE;
7508 }
7509 
7510 void ndbcluster_anyvalue_set_normal(Uint32& anyValue)
7511 {
7512  /* Clear reserved bit and serverid bits */
7513  anyValue &= ~(NDB_ANYVALUE_RESERVED_BIT);
7514  anyValue &= ~(opt_server_id_mask);
7515 }
7516 
7517 bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId)
7518 {
7519  return ((serverId & ~opt_server_id_mask) == 0);
7520 }
7521 
7522 Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue)
7523 {
7524  assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
7525 
7526  return (anyValue & opt_server_id_mask);
7527 }
7528 
7529 void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId)
7530 {
7531  assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
7532  anyValue &= ~(opt_server_id_mask);
7533  anyValue |= (serverId & opt_server_id_mask);
7534 }
7535 
7536 #ifdef NDB_WITHOUT_SERVER_ID_BITS
7537 
7538 /* No --server-id-bits=<bits> -> implement constant opt_server_id_mask */
7539 ulong opt_server_id_mask = ~0;
7540 
7541 #endif
7542 
7543 #endif