MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_rli_pdb.cc
1 #include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */
2 #include "sql_priv.h"
3 #include "unireg.h"
4 #include "rpl_rli_pdb.h"
5 #include "rpl_slave.h"
6 #include "sql_string.h"
7 #include <hash.h>
8 
9 #ifndef DBUG_OFF
10  ulong w_rr= 0;
11  uint mts_debug_concurrent_access= 0;
12 #endif
13 
14 #define HASH_DYNAMIC_INIT 4
15 #define HASH_DYNAMIC_INCR 1
16 
17 using std::min;
18 
19 /*
20  Please every time you add a new field to the worker slave info, update
21  what follows. For now, this is just used to get the number of fields.
22 */
23 const char *info_slave_worker_fields []=
24 {
25  "id",
26  /*
27  These positions identify what has been executed. Notice that they are
28  redudant and only the group_master_log_name and group_master_log_pos
29  are really necessary. However, the additional information is kept to
30  ease debugging.
31  */
32  "group_relay_log_name",
33  "group_relay_log_pos",
34  "group_master_log_name",
35  "group_master_log_pos",
36 
37  /*
38  These positions identify what a worker knew about the coordinator at
39  the time a job was assigned. Notice that they are redudant and are
40  kept to ease debugging.
41  */
42  "checkpoint_relay_log_name",
43  "checkpoint_relay_log_pos",
44  "checkpoint_master_log_name",
45  "checkpoint_master_log_pos",
46 
47  /*
48  Identify the greatest job, i.e. group, processed by a worker.
49  */
50  "checkpoint_seqno",
51  /*
52  Maximum number of jobs that can be assigned to a worker. This
53  information is necessary to read the next entry.
54  */
55  "checkpoint_group_size",
56  /*
57  Bitmap used to identify what jobs were processed by a worker.
58  */
59  "checkpoint_group_bitmap"
60 };
61 
62 /*
63  Number of records in the mts partition hash below which
64  entries with zero usage are tolerated so could be quickly
65  recycled.
66 */
67 ulong mts_partition_hash_soft_max= 16;
68 
69 Slave_worker::Slave_worker(Relay_log_info *rli
70 #ifdef HAVE_PSI_INTERFACE
71  ,PSI_mutex_key *param_key_info_run_lock,
72  PSI_mutex_key *param_key_info_data_lock,
73  PSI_mutex_key *param_key_info_sleep_lock,
74  PSI_mutex_key *param_key_info_data_cond,
75  PSI_mutex_key *param_key_info_start_cond,
76  PSI_mutex_key *param_key_info_stop_cond,
77  PSI_mutex_key *param_key_info_sleep_cond
78 #endif
79  , uint param_id
80  )
81  : Relay_log_info(FALSE
82 #ifdef HAVE_PSI_INTERFACE
83  ,param_key_info_run_lock, param_key_info_data_lock,
84  param_key_info_sleep_lock,
85  param_key_info_data_cond, param_key_info_start_cond,
86  param_key_info_stop_cond, param_key_info_sleep_cond
87 #endif
88  , param_id + 1
89  ), c_rli(rli), id(param_id),
90  checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0),
91  checkpoint_seqno(0), running_status(NOT_RUNNING)
92 {
93  /*
94  In the future, it would be great if we use only one identifier.
95  So when factoring out this code, please, consider this.
96  */
97  DBUG_ASSERT(internal_id == id + 1);
98  checkpoint_relay_log_name[0]= 0;
99  checkpoint_master_log_name[0]= 0;
100  my_init_dynamic_array(&curr_group_exec_parts, sizeof(db_worker_hash_entry*),
101  SLAVE_INIT_DBS_IN_GROUP, 1);
102  mysql_mutex_init(key_mutex_slave_parallel_worker, &jobs_lock,
103  MY_MUTEX_INIT_FAST);
104  mysql_cond_init(key_cond_slave_parallel_worker, &jobs_cond, NULL);
105 }
106 
107 Slave_worker::~Slave_worker()
108 {
109  end_info();
110  if (jobs.inited_queue)
111  {
112  DBUG_ASSERT(jobs.Q.elements == jobs.size);
113  delete_dynamic(&jobs.Q);
114  }
115  delete_dynamic(&curr_group_exec_parts);
116  mysql_mutex_destroy(&jobs_lock);
117  mysql_cond_destroy(&jobs_cond);
118  info_thd= NULL;
119  set_rli_description_event(NULL);
120 }
121 
132 int Slave_worker::init_worker(Relay_log_info * rli, ulong i)
133 {
134  DBUG_ENTER("Slave_worker::init_worker");
135  DBUG_ASSERT(!rli->info_thd->is_error());
136  uint k;
137  Slave_job_item empty= {NULL};
138 
139  c_rli= rli;
140  if (rli_init_info(false) ||
141  DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false))
142  DBUG_RETURN(1);
143 
144  id= i;
145  curr_group_exec_parts.elements= 0;
146  relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
147  checkpoint_notified= FALSE; // the same as above
148  master_log_change_notified= false;// W learns master log during 1st group exec
149  bitmap_shifted= 0;
150  workers= c_rli->workers; // shallow copying is sufficient
151  wq_size_waits_cnt= groups_done= events_done= curr_jobs= 0;
152  usage_partition= 0;
153  end_group_sets_max_dbs= false;
154  gaq_index= last_group_done_index= c_rli->gaq->size; // out of range
155 
156  DBUG_ASSERT(!jobs.inited_queue);
157  jobs.avail= 0;
158  jobs.len= 0;
159  jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor
160  jobs.waited_overfill= 0;
161  jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max;
162  jobs.inited_queue= true;
163  curr_group_seen_begin= curr_group_seen_gtid= false;
164 
165  my_init_dynamic_array(&jobs.Q, sizeof(Slave_job_item), jobs.size, 0);
166  for (k= 0; k < jobs.size; k++)
167  insert_dynamic(&jobs.Q, (uchar*) &empty);
168 
169  DBUG_ASSERT(jobs.Q.elements == jobs.size);
170 
171  wq_overrun_cnt= excess_cnt= 0;
172  underrun_level= (ulong) ((rli->mts_worker_underrun_level * jobs.size) / 100.0);
173  // overrun level is symmetric to underrun (as underrun to the full queue)
174  overrun_level= jobs.size - underrun_level;
175 
176  DBUG_RETURN(0);
177 }
178 
199 int Slave_worker::rli_init_info(bool is_gaps_collecting_phase)
200 {
201  enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
202 
203  DBUG_ENTER("Slave_worker::rli_init_info");
204 
205  if (inited)
206  DBUG_RETURN(0);
207 
208  /*
209  Worker bitmap size depends on recovery mode.
210  If it is gaps collecting the bitmaps must be capable to accept
211  up to MTS_MAX_BITS_IN_GROUP of bits.
212  */
213  size_t num_bits= is_gaps_collecting_phase ?
214  MTS_MAX_BITS_IN_GROUP : c_rli->checkpoint_group;
215  /*
216  This checks if the repository was created before and thus there
217  will be values to be read. Please, do not move this call after
218  the handler->init_info().
219  */
220  return_check= check_info();
221  if (return_check == ERROR_CHECKING_REPOSITORY ||
222  (return_check == REPOSITORY_DOES_NOT_EXIST && is_gaps_collecting_phase))
223  goto err;
224 
225  if (handler->init_info())
226  goto err;
227 
228  bitmap_init(&group_executed, NULL, num_bits, FALSE);
229  bitmap_init(&group_shifted, NULL, num_bits, FALSE);
230 
231  if (is_gaps_collecting_phase &&
232  (DBUG_EVALUATE_IF("mts_slave_worker_init_at_gaps_fails", true, false) ||
233  read_info(handler)))
234  {
235  bitmap_free(&group_executed);
236  bitmap_free(&group_shifted);
237  goto err;
238  }
239  inited= 1;
240 
241  DBUG_RETURN(0);
242 
243 err:
244  // todo: handler->end_info(uidx, nidx);
245  inited= 0;
246  sql_print_error("Error reading slave worker configuration");
247  DBUG_RETURN(1);
248 }
249 
250 void Slave_worker::end_info()
251 {
252  DBUG_ENTER("Slave_worker::end_info");
253 
254  if (!inited)
255  DBUG_VOID_RETURN;
256 
257  if (handler)
258  handler->end_info();
259 
260  if (inited)
261  {
262  bitmap_free(&group_executed);
263  bitmap_free(&group_shifted);
264  }
265  inited = 0;
266 
267  DBUG_VOID_RETURN;
268 }
269 
270 int Slave_worker::flush_info(const bool force)
271 {
272  DBUG_ENTER("Slave_worker::flush_info");
273 
274  if (!inited)
275  DBUG_RETURN(0);
276 
277  /*
278  We update the sync_period at this point because only here we
279  now that we are handling a Slave_worker. This needs to be
280  update every time we call flush because the option may be
281  dinamically set.
282  */
283  handler->set_sync_period(sync_relayloginfo_period);
284 
285  if (write_info(handler))
286  goto err;
287 
288  if (handler->flush_info(force))
289  goto err;
290 
291  DBUG_RETURN(0);
292 
293 err:
294  sql_print_error("Error writing slave worker configuration");
295  DBUG_RETURN(1);
296 }
297 
298 bool Slave_worker::read_info(Rpl_info_handler *from)
299 {
300  DBUG_ENTER("Slave_worker::read_info");
301 
302  ulong temp_group_relay_log_pos= 0;
303  ulong temp_group_master_log_pos= 0;
304  ulong temp_checkpoint_relay_log_pos= 0;
305  ulong temp_checkpoint_master_log_pos= 0;
306  ulong temp_checkpoint_seqno= 0;
307  ulong nbytes= 0;
308  uchar *buffer= (uchar *) group_executed.bitmap;
309  int temp_internal_id= 0;
310 
311  if (from->prepare_info_for_read())
312  DBUG_RETURN(TRUE);
313 
314  if (from->get_info((int *) &temp_internal_id, (int) 0) ||
315  from->get_info(group_relay_log_name,
316  (size_t) sizeof(group_relay_log_name),
317  (char *) "") ||
318  from->get_info((ulong *) &temp_group_relay_log_pos,
319  (ulong) 0) ||
320  from->get_info(group_master_log_name,
321  (size_t) sizeof(group_master_log_name),
322  (char *) "") ||
323  from->get_info((ulong *) &temp_group_master_log_pos,
324  (ulong) 0) ||
325  from->get_info(checkpoint_relay_log_name,
326  (size_t) sizeof(checkpoint_relay_log_name),
327  (char *) "") ||
328  from->get_info((ulong *) &temp_checkpoint_relay_log_pos,
329  (ulong) 0) ||
330  from->get_info(checkpoint_master_log_name,
331  (size_t) sizeof(checkpoint_master_log_name),
332  (char *) "") ||
333  from->get_info((ulong *) &temp_checkpoint_master_log_pos,
334  (ulong) 0) ||
335  from->get_info((ulong *) &temp_checkpoint_seqno,
336  (ulong) 0) ||
337  from->get_info(&nbytes, (ulong) 0) ||
338  from->get_info(buffer, (size_t) nbytes,
339  (uchar *) 0))
340  DBUG_RETURN(TRUE);
341 
342  DBUG_ASSERT(nbytes <= no_bytes_in_map(&group_executed));
343 
344  internal_id=(uint) temp_internal_id;
345  group_relay_log_pos= temp_group_relay_log_pos;
346  group_master_log_pos= temp_group_master_log_pos;
347  checkpoint_relay_log_pos= temp_checkpoint_relay_log_pos;
348  checkpoint_master_log_pos= temp_checkpoint_master_log_pos;
349  checkpoint_seqno= temp_checkpoint_seqno;
350 
351  DBUG_RETURN(FALSE);
352 }
353 
354 bool Slave_worker::write_info(Rpl_info_handler *to)
355 {
356  DBUG_ENTER("Master_info::write_info");
357 
358  ulong nbytes= (ulong) no_bytes_in_map(&group_executed);
359  uchar *buffer= (uchar*) group_executed.bitmap;
360  DBUG_ASSERT(nbytes <= (c_rli->checkpoint_group + 7) / 8);
361 
362  if (to->prepare_info_for_write() ||
363  to->set_info((int) internal_id) ||
364  to->set_info(group_relay_log_name) ||
365  to->set_info((ulong) group_relay_log_pos) ||
366  to->set_info(group_master_log_name) ||
367  to->set_info((ulong) group_master_log_pos) ||
368  to->set_info(checkpoint_relay_log_name) ||
369  to->set_info((ulong) checkpoint_relay_log_pos) ||
370  to->set_info(checkpoint_master_log_name) ||
371  to->set_info((ulong) checkpoint_master_log_pos) ||
372  to->set_info((ulong) checkpoint_seqno) ||
373  to->set_info(nbytes) ||
374  to->set_info(buffer, (size_t) nbytes))
375  DBUG_RETURN(TRUE);
376 
377  DBUG_RETURN(FALSE);
378 }
379 
388 bool Slave_worker::reset_recovery_info()
389 {
390  DBUG_ENTER("Slave_worker::reset_recovery_info");
391 
392  set_group_master_log_name("");
393  set_group_master_log_pos(0);
394 
395  DBUG_RETURN(flush_info(true));
396 }
397 
398 size_t Slave_worker::get_number_worker_fields()
399 {
400  return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
401 }
402 
403 const char* Slave_worker::get_master_log_name()
404 {
405  Slave_job_group* ptr_g= c_rli->gaq->get_job_group(gaq_index);
406 
407  return (ptr_g->checkpoint_log_name != NULL) ?
408  ptr_g->checkpoint_log_name : checkpoint_master_log_name;
409 }
410 
411 bool Slave_worker::commit_positions(Log_event *ev, Slave_job_group* ptr_g, bool force)
412 {
413  DBUG_ENTER("Slave_worker::checkpoint_positions");
414 
415  /*
416  Initial value of checkpoint_master_log_name is learned from
417  group_master_log_name. The latter can be passed to Worker
418  at rare event of master binlog rotation.
419  This initialization is needed to provide to Worker info
420  on physical coordiates during execution of the very first group
421  after a rotation.
422  */
423  if (ptr_g->group_master_log_name != NULL)
424  {
425  strmake(group_master_log_name, ptr_g->group_master_log_name,
426  sizeof(group_master_log_name) - 1);
427  my_free(ptr_g->group_master_log_name);
428  ptr_g->group_master_log_name= NULL;
429  strmake(checkpoint_master_log_name, group_master_log_name,
430  sizeof(checkpoint_master_log_name) - 1);
431  }
432  if (ptr_g->checkpoint_log_name != NULL)
433  {
434  strmake(checkpoint_relay_log_name, ptr_g->checkpoint_relay_log_name,
435  sizeof(checkpoint_relay_log_name) - 1);
436  checkpoint_relay_log_pos= ptr_g->checkpoint_relay_log_pos;
437  strmake(checkpoint_master_log_name, ptr_g->checkpoint_log_name,
438  sizeof(checkpoint_master_log_name) - 1);
439  checkpoint_master_log_pos= ptr_g->checkpoint_log_pos;
440 
441  my_free(ptr_g->checkpoint_log_name);
442  ptr_g->checkpoint_log_name= NULL;
443  my_free(ptr_g->checkpoint_relay_log_name);
444  ptr_g->checkpoint_relay_log_name= NULL;
445 
446  bitmap_copy(&group_shifted, &group_executed);
447  bitmap_clear_all(&group_executed);
448  for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++)
449  {
450  if (bitmap_is_set(&group_shifted, pos))
451  bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
452  }
453  }
454  /*
455  Extracts an updated relay-log name to store in Worker's rli.
456  */
457  if (ptr_g->group_relay_log_name)
458  {
459  DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
460  <= sizeof(group_relay_log_name));
461  strmake(group_relay_log_name, ptr_g->group_relay_log_name,
462  sizeof(group_relay_log_name) - 1);
463  }
464 
465  DBUG_ASSERT(ptr_g->checkpoint_seqno <= (c_rli->checkpoint_group - 1));
466 
467  bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);
468  checkpoint_seqno= ptr_g->checkpoint_seqno;
469  group_relay_log_pos= ev->future_event_relay_log_pos;
470  group_master_log_pos= ev->log_pos;
471 
472  /*
473  Directly accessing c_rli->get_group_master_log_name() does not
474  represent a concurrency issue because the current code places
475  a synchronization point when master rotates.
476  */
477  strmake(group_master_log_name, c_rli->get_group_master_log_name(),
478  sizeof(group_master_log_name)-1);
479 
480  DBUG_PRINT("mts", ("Committing worker-id %lu group master log pos %llu "
481  "group master log name %s checkpoint sequence number %lu.",
482  id, group_master_log_pos, group_master_log_name, checkpoint_seqno));
483 
484  DBUG_EXECUTE_IF("mts_debug_concurrent_access",
485  {
486  mts_debug_concurrent_access++;
487  };
488  );
489 
490  DBUG_RETURN(flush_info(force));
491 }
492 
493 static HASH mapping_db_to_worker;
494 static bool inited_hash_workers= FALSE;
495 
496 #ifdef HAVE_PSI_INTERFACE
497 PSI_mutex_key key_mutex_slave_worker_hash;
498 PSI_cond_key key_cond_slave_worker_hash;
499 #endif
500 
501 static mysql_mutex_t slave_worker_hash_lock;
502 static mysql_cond_t slave_worker_hash_cond;
503 
504 
505 extern "C" uchar *get_key(const uchar *record, size_t *length,
506  my_bool not_used __attribute__((unused)))
507 {
508  DBUG_ENTER("get_key");
509 
510  db_worker_hash_entry *entry=(db_worker_hash_entry *) record;
511  *length= strlen(entry->db);
512 
513  DBUG_PRINT("info", ("get_key %s, %d", entry->db, (int) *length));
514 
515  DBUG_RETURN((uchar*) entry->db);
516 }
517 
518 
519 static void free_entry(db_worker_hash_entry *entry)
520 {
521  THD *c_thd= current_thd;
522 
523  DBUG_ENTER("free_entry");
524 
525  DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
526 
527  DBUG_ASSERT(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
528 
529  /*
530  Although assert is correct valgrind senses entry->worker can be freed.
531 
532  DBUG_ASSERT(entry->usage == 0 ||
533  !entry->worker || // last entry owner could have errored out
534  entry->worker->running_status != Slave_worker::RUNNING);
535  */
536 
537  mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
538  entry->temporary_tables= NULL;
539 
540  my_free((void *) entry->db);
541  my_free(entry);
542 
543  DBUG_VOID_RETURN;
544 }
545 
546 bool init_hash_workers(ulong slave_parallel_workers)
547 {
548  DBUG_ENTER("init_hash_workers");
549 
550  inited_hash_workers=
551  (my_hash_init(&mapping_db_to_worker, &my_charset_bin,
552  0, 0, 0, get_key,
553  (my_hash_free_key) free_entry, 0) == 0);
554  if (inited_hash_workers)
555  {
556 #ifdef HAVE_PSI_INTERFACE
557  mysql_mutex_init(key_mutex_slave_worker_hash, &slave_worker_hash_lock,
558  MY_MUTEX_INIT_FAST);
559  mysql_cond_init(key_cond_slave_worker_hash, &slave_worker_hash_cond, NULL);
560 #else
561  mysql_mutex_init(NULL, &slave_worker_hash_lock,
562  MY_MUTEX_INIT_FAST);
563  mysql_cond_init(NULL, &slave_worker_hash_cond, NULL);
564 #endif
565  }
566 
567  DBUG_RETURN (!inited_hash_workers);
568 }
569 
570 void destroy_hash_workers(Relay_log_info *rli)
571 {
572  DBUG_ENTER("destroy_hash_workers");
573  if (inited_hash_workers)
574  {
575  my_hash_free(&mapping_db_to_worker);
576  mysql_mutex_destroy(&slave_worker_hash_lock);
577  mysql_cond_destroy(&slave_worker_hash_cond);
578  inited_hash_workers= false;
579  }
580 
581  DBUG_VOID_RETURN;
582 }
583 
596 TABLE* mts_move_temp_table_to_entry(TABLE *table, THD *thd,
597  db_worker_hash_entry *entry)
598 {
599  TABLE *ret= table->next;
600 
601  if (table->prev)
602  {
603  table->prev->next= table->next;
604  if (table->prev->next)
605  table->next->prev= table->prev;
606  }
607  else
608  {
609  /* removing the first item from the list */
610  DBUG_ASSERT(table == thd->temporary_tables);
611 
612  thd->temporary_tables= table->next;
613  if (thd->temporary_tables)
614  table->next->prev= 0;
615  }
616  table->next= entry->temporary_tables;
617  table->prev= 0;
618  if (table->next)
619  table->next->prev= table;
620  entry->temporary_tables= table;
621 
622  return ret;
623 }
624 
625 
638 TABLE* mts_move_temp_tables_to_thd(THD *thd, TABLE *temporary_tables)
639 {
640  TABLE *table= temporary_tables;
641  if (!table)
642  return NULL;
643 
644  // accept only the list head
645  DBUG_ASSERT(!temporary_tables->prev);
646 
647  // walk along the source list and associate the tables with thd
648  do
649  {
650  table->in_use= thd;
651  } while(table->next && (table= table->next));
652 
653  // link the former list against the tail of the source list
654  if (thd->temporary_tables)
655  thd->temporary_tables->prev= table;
656  table->next= thd->temporary_tables;
657  thd->temporary_tables= temporary_tables;
658 
659  return thd->temporary_tables;
660 }
661 
671 static void move_temp_tables_to_entry(THD* thd, db_worker_hash_entry* entry)
672 {
673  for (TABLE *table= thd->temporary_tables; table;)
674  {
675  if (strcmp(table->s->db.str, entry->db) == 0)
676  {
677  // table pointer is shifted inside the function
678  table= mts_move_temp_table_to_entry(table, thd, entry);
679  }
680  else
681  {
682  table= table->next;
683  }
684  }
685 }
686 
687 
743 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
744  db_worker_hash_entry **ptr_entry,
745  bool need_temp_tables, Slave_worker *last_worker)
746 {
747  uint i;
748  DYNAMIC_ARRAY *workers= &rli->workers;
749 
750  /*
751  A dynamic array to store the mapping_db_to_worker hash elements
752  that needs to be deleted, since deleting the hash entires while
753  iterating over it is wrong.
754  */
755  DYNAMIC_ARRAY hash_element;
756  THD *thd= rli->info_thd;
757 
758  DBUG_ENTER("get_slave_worker");
759 
760  DBUG_ASSERT(!rli->last_assigned_worker ||
761  rli->last_assigned_worker == last_worker);
762 
763  if (!inited_hash_workers)
764  DBUG_RETURN(NULL);
765 
766  db_worker_hash_entry *entry= NULL;
767  my_hash_value_type hash_value;
768  uchar dblength= (uint) strlen(dbname);
769 
770 
771  // Search in CGAP
772  for (i= 0; i < rli->curr_group_assigned_parts.elements; i++)
773  {
774  entry= * (db_worker_hash_entry **)
775  dynamic_array_ptr(&rli->curr_group_assigned_parts, i);
776  if ((uchar) entry->db_len != dblength)
777  continue;
778  else
779  if (strncmp(entry->db, const_cast<char*>(dbname), dblength) == 0)
780  {
781  *ptr_entry= entry;
782  DBUG_RETURN(last_worker);
783  }
784  }
785 
786  DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
787 
788  hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
789  dblength);
790 
791  mysql_mutex_lock(&slave_worker_hash_lock);
792 
793  entry= (db_worker_hash_entry *)
794  my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
795  (uchar*) dbname, dblength);
796  if (!entry)
797  {
798  /*
799  The database name was not found which means that a worker never
800  processed events from that database. In such case, we need to
801  map the database to a worker my inserting an entry into the
802  hash map.
803  */
804  my_bool ret;
805  char *db= NULL;
806 
807  mysql_mutex_unlock(&slave_worker_hash_lock);
808 
809  DBUG_PRINT("info", ("Inserting %s, %d", dbname, dblength));
810  /*
811  Allocate an entry to be inserted and if the operation fails
812  an error is returned.
813  */
814  if (!(db= (char *) my_malloc((size_t) dblength + 1, MYF(0))))
815  goto err;
816  if (!(entry= (db_worker_hash_entry *)
817  my_malloc(sizeof(db_worker_hash_entry), MYF(0))))
818  {
819  my_free(db);
820  goto err;
821  }
822  strmov(db, dbname);
823  entry->db= db;
824  entry->db_len= strlen(db);
825  entry->usage= 1;
826  entry->temporary_tables= NULL;
827  /*
828  Unless \exists the last assigned Worker, get a free worker based
829  on a policy described in the function get_least_occupied_worker().
830  */
831  mysql_mutex_lock(&slave_worker_hash_lock);
832 
833  entry->worker= (!last_worker) ?
834  get_least_occupied_worker(workers) : last_worker;
835  entry->worker->usage_partition++;
836  if (mapping_db_to_worker.records > mts_partition_hash_soft_max)
837  {
838  /*
839  remove zero-usage (todo: rare or long ago scheduled) records.
840  Store the element of the hash in a dynamic array after checking whether
841  the usage of the hash entry is 0 or not. We later free it from the HASH.
842  */
843  my_init_dynamic_array(&hash_element, sizeof(db_worker_hash_entry *),
844  HASH_DYNAMIC_INIT, HASH_DYNAMIC_INCR);
845  for (uint i= 0; i < mapping_db_to_worker.records; i++)
846  {
847  DBUG_ASSERT(!entry->temporary_tables || !entry->temporary_tables->prev);
848  DBUG_ASSERT(!thd->temporary_tables || !thd->temporary_tables->prev);
849 
850  db_worker_hash_entry *entry=
851  (db_worker_hash_entry*) my_hash_element(&mapping_db_to_worker, i);
852 
853  if (entry->usage == 0)
854  {
855  mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
856  entry->temporary_tables= NULL;
857 
858  /* Push the element in the dynamic array*/
859  push_dynamic(&hash_element, (uchar*) &entry);
860  }
861  }
862 
863  /* Delete the hash element based on the usage */
864  for (uint i=0; i < hash_element.elements; i++)
865  {
866  db_worker_hash_entry *temp_entry= *(db_worker_hash_entry **) dynamic_array_ptr(&hash_element, i);
867  my_hash_delete(&mapping_db_to_worker, (uchar*) temp_entry);
868  }
869  /* Deleting the dynamic array */
870  delete_dynamic(&hash_element);
871  }
872 
873  ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
874 
875  if (ret)
876  {
877  my_free(db);
878  my_free(entry);
879  entry= NULL;
880  goto err;
881  }
882  DBUG_PRINT("info", ("Inserted %s, %d", entry->db, (int) strlen(entry->db)));
883  }
884  else
885  {
886  /* There is a record. Either */
887  if (entry->usage == 0)
888  {
889  entry->worker= (!last_worker) ?
890  get_least_occupied_worker(workers) : last_worker;
891  entry->worker->usage_partition++;
892  entry->usage++;
893  }
894  else if (entry->worker == last_worker || !last_worker)
895  {
896 
897  DBUG_ASSERT(entry->worker);
898 
899  entry->usage++;
900  }
901  else
902  {
903  // The case APH contains a W_d != W_c != NULL assigned to
904  // D-partition represents
905  // the hashing conflict and is handled as the following:
906  PSI_stage_info old_stage;
907 
908  DBUG_ASSERT(last_worker != NULL &&
909  rli->curr_group_assigned_parts.elements > 0);
910 
911  // future assignenment and marking at the same time
912  entry->worker= last_worker;
913  // loop while a user thread is stopping Coordinator gracefully
914  do
915  {
916  thd->ENTER_COND(&slave_worker_hash_cond,
917  &slave_worker_hash_lock,
918  &stage_slave_waiting_worker_to_release_partition,
919  &old_stage);
920  mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
921  } while (entry->usage != 0 && !thd->killed);
922 
923  thd->EXIT_COND(&old_stage);
924  if (thd->killed)
925  {
926  entry= NULL;
927  goto err;
928  }
929  mysql_mutex_lock(&slave_worker_hash_lock);
930  entry->usage= 1;
931  entry->worker->usage_partition++;
932  }
933  }
934 
935  /*
936  relocation belonging to db temporary tables from C to W via entry
937  */
938  if (entry->usage == 1 && need_temp_tables)
939  {
940  if (!entry->temporary_tables)
941  {
942  if (entry->db_len != 0)
943  {
944  move_temp_tables_to_entry(thd, entry);
945  }
946  else
947  {
948  entry->temporary_tables= thd->temporary_tables;
949  thd->temporary_tables= NULL;
950  }
951  }
952 #ifndef DBUG_OFF
953  else
954  {
955  // all entries must have been emptied from temps by the caller
956 
957  for (TABLE *table= thd->temporary_tables; table; table= table->next)
958  {
959  DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
960  }
961  }
962 #endif
963  }
964  mysql_mutex_unlock(&slave_worker_hash_lock);
965 
966  DBUG_ASSERT(entry);
967 
968 err:
969  if (entry)
970  {
971  DBUG_PRINT("info",
972  ("Updating %s with worker %lu", entry->db, entry->worker->id));
973  insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) &entry);
974  *ptr_entry= entry;
975  }
976  DBUG_RETURN(entry ? entry->worker : NULL);
977 }
978 
992 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *ws)
993 {
994  long usage= LONG_MAX;
995  Slave_worker **ptr_current_worker= NULL, *worker= NULL;
996  ulong i= 0;
997 
998  DBUG_ENTER("get_least_occupied_worker");
999 
1000  DBUG_EXECUTE_IF("mts_distribute_round_robin",
1001  {
1002  worker=
1003  *((Slave_worker **)
1004  dynamic_array_ptr(ws, w_rr % ws->elements));
1005  sql_print_information("Chosing worker id %lu, the following "
1006  "is going to be %lu", worker->id,
1007  w_rr % ws->elements);
1008  DBUG_RETURN(worker);
1009  });
1010 
1011  for (i= 0; i< ws->elements; i++)
1012  {
1013  ptr_current_worker= (Slave_worker **) dynamic_array_ptr(ws, i);
1014  if ((*ptr_current_worker)->usage_partition <= usage)
1015  {
1016  worker= *ptr_current_worker;
1017  usage= (*ptr_current_worker)->usage_partition;
1018  }
1019  }
1020 
1021  DBUG_ASSERT(worker != NULL);
1022 
1023  DBUG_RETURN(worker);
1024 }
1025 
1037 void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
1038 {
1039  DBUG_ENTER("Slave_worker::slave_worker_ends_group");
1040 
1041  if (!error)
1042  {
1043  Slave_committed_queue *gaq= c_rli->gaq;
1044  Slave_job_group *ptr_g= gaq->get_job_group(gaq_index);
1045 
1046  DBUG_ASSERT(gaq_index == ev->mts_group_idx);
1047 
1048  // first ever group must have relay log name
1049  DBUG_ASSERT(last_group_done_index != c_rli->gaq->size ||
1050  ptr_g->group_relay_log_name != NULL);
1051  DBUG_ASSERT(ptr_g->worker_id == id);
1052 
1053  if (ev->get_type_code() != XID_EVENT)
1054  {
1055  commit_positions(ev, ptr_g, false);
1056  DBUG_EXECUTE_IF("crash_after_commit_and_update_pos",
1057  sql_print_information("Crashing crash_after_commit_and_update_pos.");
1058  flush_info(TRUE);
1059  DBUG_SUICIDE();
1060  );
1061  }
1062 
1063  ptr_g->group_master_log_pos= group_master_log_pos;
1064  ptr_g->group_relay_log_pos= group_relay_log_pos;
1065 
1066  ptr_g->done= 1; // GAQ index is available to C now
1067 
1068  last_group_done_index= gaq_index;
1069  reset_gaq_index();
1070  groups_done++;
1071  }
1072  else
1073  {
1074  // tagging as exiting so Coordinator won't be able synchronize with it
1075  mysql_mutex_lock(&jobs_lock);
1076  running_status= ERROR_LEAVING;
1077  mysql_mutex_unlock(&jobs_lock);
1078 
1079  // Killing Coordinator to indicate eventual consistency error
1080  mysql_mutex_lock(&c_rli->info_thd->LOCK_thd_data);
1081  c_rli->info_thd->awake(THD::KILL_QUERY);
1082  mysql_mutex_unlock(&c_rli->info_thd->LOCK_thd_data);
1083  }
1084 
1085  /*
1086  Cleanup relating to the last executed group regardless of error.
1087  */
1088  DYNAMIC_ARRAY *ep= &curr_group_exec_parts;
1089 
1090  for (uint i= 0; i < ep->elements; i++)
1091  {
1092  db_worker_hash_entry *entry=
1093  *((db_worker_hash_entry **) dynamic_array_ptr(ep, i));
1094 
1095  mysql_mutex_lock(&slave_worker_hash_lock);
1096 
1097  DBUG_ASSERT(entry);
1098 
1099  entry->usage --;
1100 
1101  DBUG_ASSERT(entry->usage >= 0);
1102 
1103  if (entry->usage == 0)
1104  {
1105  usage_partition--;
1106  /*
1107  The detached entry's temp table list, possibly updated, remains
1108  with the entry at least until time Coordinator will deallocate it
1109  from the hash, that is either due to stop or extra size of the hash.
1110  */
1111  DBUG_ASSERT(usage_partition >= 0);
1112  DBUG_ASSERT(this->info_thd->temporary_tables == 0);
1113  DBUG_ASSERT(!entry->temporary_tables ||
1114  !entry->temporary_tables->prev);
1115 
1116  if (entry->worker != this) // Coordinator is waiting
1117  {
1118 #ifndef DBUG_OFF
1119  // TODO: open it! DBUG_ASSERT(usage_partition || !entry->worker->jobs.len);
1120 #endif
1121  DBUG_PRINT("info",
1122  ("Notifying entry %p release by worker %lu", entry, this->id));
1123 
1124  mysql_cond_signal(&slave_worker_hash_cond);
1125  }
1126  }
1127  else
1128  DBUG_ASSERT(usage_partition != 0);
1129 
1130  mysql_mutex_unlock(&slave_worker_hash_lock);
1131  }
1132 
1133  if (ep->elements > ep->max_element)
1134  {
1135  // reallocate to lessen mem
1136  ep->elements= ep->max_element;
1137  ep->max_element= 0;
1138  freeze_size(ep); // restores max_element
1139  }
1140  ep->elements= 0;
1141 
1142  curr_group_seen_gtid= curr_group_seen_begin= false;
1143 
1144  if (error)
1145  {
1146  // Awakening Coordinator that could be waiting for entry release
1147  mysql_mutex_lock(&slave_worker_hash_lock);
1148  mysql_cond_signal(&slave_worker_hash_cond);
1149  mysql_mutex_unlock(&slave_worker_hash_lock);
1150  }
1151 
1152  DBUG_VOID_RETURN;
1153 }
1154 
1155 
1167 ulong circular_buffer_queue::de_queue(uchar *val)
1168 {
1169  ulong ret;
1170  if (entry == size)
1171  {
1172  DBUG_ASSERT(len == 0);
1173  return (ulong) -1;
1174  }
1175 
1176  ret= entry;
1177  get_dynamic(&Q, val, entry);
1178  len--;
1179 
1180  // pre boundary cond
1181  if (avail == size)
1182  avail= entry;
1183  entry= (entry + 1) % size;
1184 
1185  // post boundary cond
1186  if (avail == entry)
1187  entry= size;
1188 
1189  DBUG_ASSERT(entry == size ||
1190  (len == (avail >= entry)? (avail - entry) :
1191  (size + avail - entry)));
1192  DBUG_ASSERT(avail != entry);
1193 
1194  return ret;
1195 }
1196 
1203 ulong circular_buffer_queue::de_tail(uchar *val)
1204 {
1205  if (entry == size)
1206  {
1207  DBUG_ASSERT(len == 0);
1208  return (ulong) -1;
1209  }
1210 
1211  avail= (entry + len - 1) % size;
1212  get_dynamic(&Q, val, avail);
1213  len--;
1214 
1215  // post boundary cond
1216  if (avail == entry)
1217  entry= size;
1218 
1219  DBUG_ASSERT(entry == size ||
1220  (len == (avail >= entry)? (avail - entry) :
1221  (size + avail - entry)));
1222  DBUG_ASSERT(avail != entry);
1223 
1224  return avail;
1225 }
1226 
1231 ulong circular_buffer_queue::en_queue(void *item)
1232 {
1233  ulong ret;
1234  if (avail == size)
1235  {
1236  DBUG_ASSERT(avail == Q.elements);
1237  return (ulong) -1;
1238  }
1239 
1240  // store
1241 
1242  ret= avail;
1243  set_dynamic(&Q, (uchar*) item, avail);
1244 
1245 
1246  // pre-boundary cond
1247  if (entry == size)
1248  entry= avail;
1249 
1250  avail= (avail + 1) % size;
1251  len++;
1252 
1253  // post-boundary cond
1254  if (avail == entry)
1255  avail= size;
1256 
1257  DBUG_ASSERT(avail == entry ||
1258  len == (avail >= entry) ?
1259  (avail - entry) : (size + avail - entry));
1260  DBUG_ASSERT(avail != entry);
1261 
1262  return ret;
1263 }
1264 
1265 void* circular_buffer_queue::head_queue()
1266 {
1267  uchar *ret= NULL;
1268  if (entry == size)
1269  {
1270  DBUG_ASSERT(len == 0);
1271  }
1272  else
1273  {
1274  get_dynamic(&Q, (uchar*) ret, entry);
1275  }
1276  return (void*) ret;
1277 }
1278 
1290 bool circular_buffer_queue::gt(ulong i, ulong k)
1291 {
1292  DBUG_ASSERT(i < size && k < size);
1293  DBUG_ASSERT(avail != entry);
1294 
1295  if (i >= entry)
1296  if (k >= entry)
1297  return i > k;
1298  else
1299  return FALSE;
1300  else
1301  if (k >= entry)
1302  return TRUE;
1303  else
1304  return i > k;
1305 }
1306 
1307 #ifndef DBUG_OFF
1308 bool Slave_committed_queue::count_done(Relay_log_info* rli)
1309 {
1310  ulong i, k, cnt= 0;
1311 
1312  for (i= entry, k= 0; k < len; i= (i + 1) % size, k++)
1313  {
1314  Slave_job_group *ptr_g;
1315 
1316  ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
1317 
1318  if (ptr_g->worker_id != (ulong) -1 && ptr_g->done)
1319  cnt++;
1320  }
1321 
1322  DBUG_ASSERT(cnt <= size);
1323 
1324  DBUG_PRINT("mts", ("Checking if it can simulate a crash:"
1325  " mts_checkpoint_group %u counter %lu parallel slaves %lu\n",
1326  opt_mts_checkpoint_group, cnt, rli->slave_parallel_workers));
1327 
1328  return (cnt == (rli->slave_parallel_workers * opt_mts_checkpoint_group));
1329 }
1330 #endif
1331 
1332 
1354 ulong Slave_committed_queue::move_queue_head(DYNAMIC_ARRAY *ws)
1355 {
1356  ulong i, cnt= 0;
1357 
1358  for (i= entry; i != avail && !empty(); cnt++, i= (i + 1) % size)
1359  {
1360  Slave_worker *w_i;
1361  Slave_job_group *ptr_g, g;
1362  char grl_name[FN_REFLEN];
1363  ulong ind;
1364 
1365 #ifndef DBUG_OFF
1366  if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
1367  cnt == opt_mts_checkpoint_period)
1368  return cnt;
1369 #endif
1370 
1371  grl_name[0]= 0;
1372  ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
1373 
1374  /*
1375  The current job has not been processed or it was not
1376  even assigned, this means there is a gap.
1377  */
1378  if (ptr_g->worker_id == MTS_WORKER_UNDEF || !ptr_g->done)
1379  break; /* gap at i'th */
1380 
1381  /* Worker-id domain guard */
1382  compile_time_assert(MTS_WORKER_UNDEF > MTS_MAX_WORKERS);
1383 
1384  get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
1385 
1386  /*
1387  Memorizes the latest valid group_relay_log_name.
1388  */
1389  if (ptr_g->group_relay_log_name)
1390  {
1391  strcpy(grl_name, ptr_g->group_relay_log_name);
1392  my_free(ptr_g->group_relay_log_name);
1393  /*
1394  It is important to mark the field as freed.
1395  */
1396  ptr_g->group_relay_log_name= NULL;
1397  }
1398 
1399  /*
1400  Removes the job from the (G)lobal (A)ssigned (Q)ueue.
1401  */
1402  ind= de_queue((uchar*) &g);
1403 
1404  /*
1405  Stores the memorized name into the result struct. Note that we
1406  take care of the pointer first and then copy the other elements
1407  by assigning the structures.
1408  */
1409  if (grl_name[0] != 0)
1410  {
1411  strcpy(lwm.group_relay_log_name, grl_name);
1412  }
1413  g.group_relay_log_name= lwm.group_relay_log_name;
1414  lwm= g;
1415 
1416  DBUG_ASSERT(ind == i);
1417  DBUG_ASSERT(!ptr_g->group_relay_log_name);
1418  DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
1419 #ifndef DBUG_OFF
1420  {
1421  ulonglong l;
1422  get_dynamic(&last_done, (uchar *) &l, w_i->id);
1423  /*
1424  There must be some progress otherwise we should have
1425  exit the loop earlier.
1426  */
1427  DBUG_ASSERT(l < ptr_g->total_seqno);
1428  }
1429 #endif
1430  /*
1431  This is used to calculate the last time each worker has
1432  processed events.
1433  */
1434  set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id);
1435  }
1436 
1437  DBUG_ASSERT(cnt <= size);
1438 
1439  return cnt;
1440 }
1441 
1447 void Slave_committed_queue::free_dynamic_items()
1448 {
1449  ulong i, k;
1450  for (i= entry, k= 0; k < len; i= (i + 1) % size, k++)
1451  {
1452  Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
1453  if (ptr_g->group_relay_log_name)
1454  {
1455  my_free(ptr_g->group_relay_log_name);
1456  }
1457  if (ptr_g->checkpoint_log_name)
1458  {
1459  my_free(ptr_g->checkpoint_log_name);
1460  }
1461  if (ptr_g->checkpoint_relay_log_name)
1462  {
1463  my_free(ptr_g->checkpoint_relay_log_name);
1464  }
1465  if (ptr_g->group_master_log_name)
1466  {
1467  my_free(ptr_g->group_master_log_name);
1468  }
1469  }
1470  DBUG_ASSERT((avail == size /* full */ || entry == size /* empty */) ||
1471  i == avail /* all occupied are processed */);
1472 }
1473 
1474 
1475 void Slave_worker::do_report(loglevel level, int err_code, const char *msg,
1476  va_list args) const
1477 {
1478  char buff_coord[MAX_SLAVE_ERRMSG];
1479  char buff_gtid[Gtid::MAX_TEXT_LENGTH + 1];
1480  const char* log_name= const_cast<Slave_worker*>(this)->get_master_log_name();
1481  ulonglong log_pos= const_cast<Slave_worker*>(this)->get_master_log_pos();
1482  const Gtid_specification *gtid_next= &info_thd->variables.gtid_next;
1483 
1484  if (gtid_next->type == GTID_GROUP)
1485  {
1486  global_sid_lock->rdlock();
1487  gtid_next->to_string(global_sid_map, buff_gtid);
1488  global_sid_lock->unlock();
1489  }
1490  else
1491  {
1492  buff_gtid[0]= 0;
1493  }
1494 
1495  sprintf(buff_coord,
1496  "Worker %lu failed executing transaction '%s' at "
1497  "master log %s, end_log_pos %llu",
1498  id, buff_gtid, log_name, log_pos);
1499  c_rli->va_report(level, err_code, buff_coord, msg, args);
1500 }
1501 
1527 int wait_for_workers_to_finish(Relay_log_info const *rli, Slave_worker *ignore)
1528 {
1529  uint ret= 0;
1530  HASH *hash= &mapping_db_to_worker;
1531  THD *thd= rli->info_thd;
1532  bool cant_sync= FALSE;
1533  char llbuf[22];
1534 
1535  DBUG_ENTER("wait_for_workers_to_finish");
1536 
1537  llstr(const_cast<Relay_log_info*>(rli)->get_event_relay_log_pos(), llbuf);
1538  if (log_warnings > 1)
1539  sql_print_information("Coordinator and workers enter synchronization procedure "
1540  "when scheduling event relay-log: %s pos: %s",
1541  const_cast<Relay_log_info*>(rli)->get_event_relay_log_name(),
1542  llbuf);
1543 
1544  for (uint i= 0, ret= 0; i < hash->records; i++)
1545  {
1546  db_worker_hash_entry *entry;
1547 
1548  mysql_mutex_lock(&slave_worker_hash_lock);
1549 
1550  entry= (db_worker_hash_entry*) my_hash_element(hash, i);
1551 
1552  DBUG_ASSERT(entry);
1553 
1554  // the ignore Worker retains its active resources
1555  if (ignore && entry->worker == ignore && entry->usage > 0)
1556  {
1557  mysql_mutex_unlock(&slave_worker_hash_lock);
1558  continue;
1559  }
1560 
1561  if (entry->usage > 0 && !thd->killed)
1562  {
1563  PSI_stage_info old_stage;
1564  Slave_worker *w_entry= entry->worker;
1565 
1566  entry->worker= NULL; // mark Worker to signal when usage drops to 0
1567  thd->ENTER_COND(&slave_worker_hash_cond,
1568  &slave_worker_hash_lock,
1569  &stage_slave_waiting_worker_to_release_partition,
1570  &old_stage);
1571  do
1572  {
1573  mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
1574  DBUG_PRINT("info",
1575  ("Either got awakened of notified: "
1576  "entry %p, usage %lu, worker %lu",
1577  entry, entry->usage, w_entry->id));
1578  } while (entry->usage != 0 && !thd->killed);
1579  entry->worker= w_entry; // restoring last association, needed only for assert
1580  thd->EXIT_COND(&old_stage);
1581  ret++;
1582  }
1583  else
1584  {
1585  mysql_mutex_unlock(&slave_worker_hash_lock);
1586  }
1587  // resources relocation
1588  mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
1589  entry->temporary_tables= NULL;
1590  if (entry->worker->running_status != Slave_worker::RUNNING)
1591  cant_sync= TRUE;
1592  }
1593 
1594  if (!ignore)
1595  {
1596  if (log_warnings > 1)
1597  sql_print_information("Coordinator synchronized with Workers, "
1598  "waited entries: %d, cant_sync: %d",
1599  ret, cant_sync);
1600 
1601  const_cast<Relay_log_info*>(rli)->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
1602  }
1603 
1604  DBUG_RETURN(!cant_sync ? ret : -1);
1605 }
1606 
1607 
1608 // returns the next available! (TODO: incompatible to circurla_buff method!!!)
1609 static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
1610 {
1611  if (jobs->avail == jobs->size)
1612  {
1613  DBUG_ASSERT(jobs->avail == jobs->Q.elements);
1614  return -1;
1615  }
1616 
1617  // store
1618 
1619  set_dynamic(&jobs->Q, (uchar*) item, jobs->avail);
1620 
1621  // pre-boundary cond
1622  if (jobs->entry == jobs->size)
1623  jobs->entry= jobs->avail;
1624 
1625  jobs->avail= (jobs->avail + 1) % jobs->size;
1626  jobs->len++;
1627 
1628  // post-boundary cond
1629  if (jobs->avail == jobs->entry)
1630  jobs->avail= jobs->size;
1631  DBUG_ASSERT(jobs->avail == jobs->entry ||
1632  jobs->len == (jobs->avail >= jobs->entry) ?
1633  (jobs->avail - jobs->entry) : (jobs->size + jobs->avail - jobs->entry));
1634  return jobs->avail;
1635 }
1636 
1640 static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
1641 {
1642  if (jobs->entry == jobs->size)
1643  {
1644  DBUG_ASSERT(jobs->len == 0);
1645  ret->data= NULL; // todo: move to caller
1646  return NULL;
1647  }
1648  get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry);
1649 
1650  DBUG_ASSERT(ret->data); // todo: move to caller
1651 
1652  return ret;
1653 }
1654 
1655 
1659 Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
1660 {
1661  if (jobs->entry == jobs->size)
1662  {
1663  DBUG_ASSERT(jobs->len == 0);
1664  return NULL;
1665  }
1666  get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry);
1667  jobs->len--;
1668 
1669  // pre boundary cond
1670  if (jobs->avail == jobs->size)
1671  jobs->avail= jobs->entry;
1672  jobs->entry= (jobs->entry + 1) % jobs->size;
1673 
1674  // post boundary cond
1675  if (jobs->avail == jobs->entry)
1676  jobs->entry= jobs->size;
1677 
1678  DBUG_ASSERT(jobs->entry == jobs->size ||
1679  (jobs->len == (jobs->avail >= jobs->entry) ?
1680  (jobs->avail - jobs->entry) :
1681  (jobs->size + jobs->avail - jobs->entry)));
1682 
1683  return ret;
1684 }
1685 
1697 bool append_item_to_jobs(slave_job_item *job_item,
1698  Slave_worker *worker, Relay_log_info *rli)
1699 {
1700  THD *thd= rli->info_thd;
1701  int ret= -1;
1702  ulong ev_size= ((Log_event*) (job_item->data))->data_written;
1703  ulonglong new_pend_size;
1704  PSI_stage_info old_stage;
1705 
1706 
1707  DBUG_ASSERT(thd == current_thd);
1708 
1709  if (ev_size > rli->mts_pending_jobs_size_max)
1710  {
1711  char llbuff[22];
1712  llstr(rli->get_event_relay_log_pos(), llbuff);
1713  my_error(ER_MTS_EVENT_BIGGER_PENDING_JOBS_SIZE_MAX, MYF(0),
1714  ((Log_event*) (job_item->data))->get_type_str(),
1715  rli->get_event_relay_log_name(), llbuff, ev_size,
1716  rli->mts_pending_jobs_size_max);
1717  /* Waiting in slave_stop_workers() avoidance */
1718  rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
1719  return ret;
1720  }
1721 
1722  mysql_mutex_lock(&rli->pending_jobs_lock);
1723  new_pend_size= rli->mts_pending_jobs_size + ev_size;
1724  // C waits basing on *data* sizes in the queues
1725  while (new_pend_size > rli->mts_pending_jobs_size_max)
1726  {
1727  rli->mts_wq_oversize= TRUE;
1728  rli->wq_size_waits_cnt++; // waiting due to the total size
1729  thd->ENTER_COND(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
1730  &stage_slave_waiting_worker_to_free_events, &old_stage);
1731  mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
1732  thd->EXIT_COND(&old_stage);
1733  if (thd->killed)
1734  return true;
1735  if (log_warnings > 1 && (rli->wq_size_waits_cnt % 10 == 1))
1736  sql_print_information("Multi-threaded slave: Coordinator has waited "
1737  "%lu times hitting slave_pending_jobs_size_max; "
1738  "current event size = %lu.",
1739  rli->wq_size_waits_cnt, ev_size);
1740  mysql_mutex_lock(&rli->pending_jobs_lock);
1741 
1742  new_pend_size= rli->mts_pending_jobs_size + ev_size;
1743  }
1744  rli->pending_jobs++;
1745  rli->mts_pending_jobs_size= new_pend_size;
1746  rli->mts_events_assigned++;
1747 
1748  mysql_mutex_unlock(&rli->pending_jobs_lock);
1749 
1750  /*
1751  Sleep unless there is an underrunning Worker and the current Worker
1752  queue is empty or filled lightly (not more than underrun level).
1753  */
1754  if (rli->mts_wq_underrun_w_id == MTS_WORKER_UNDEF &&
1755  worker->jobs.len > worker->underrun_level)
1756  {
1757  /*
1758  todo: experiment with weight to get a good approximation formula.
1759  Max possible nap time is choosen 1 ms.
1760  The bigger the excessive overrun counter the longer the nap.
1761  */
1762  ulong nap_weight= rli->mts_wq_excess_cnt + 1;
1763  /*
1764  Nap time is a product of a weight factor and the basic nap unit.
1765  The weight factor is proportional to the worker queues overrun excess
1766  counter. For example when there were only one overruning Worker
1767  the max nap_weight as 0.1 * worker->jobs.size would be
1768  about 1600 so the max nap time is approx 0.008 secs.
1769  Such value is not reachable because of min().
1770  Notice, granularity of sleep depends on the resolution of the software
1771  clock, High-Resolution Timer (HRT) configuration. Without HRT
1772  the precision of wake-up through @c select() may be greater or
1773  equal 1 ms. So don't expect the nap last a prescribed fraction of 1 ms
1774  in such case.
1775  */
1776  my_sleep(min<ulong>(1000, nap_weight * rli->mts_coordinator_basic_nap));
1777  rli->mts_wq_no_underrun_cnt++;
1778  }
1779 
1780  mysql_mutex_lock(&worker->jobs_lock);
1781 
1782  // possible WQ overfill
1783  while (worker->running_status == Slave_worker::RUNNING && !thd->killed &&
1784  (ret= en_queue(&worker->jobs, job_item)) == -1)
1785  {
1786  thd->ENTER_COND(&worker->jobs_cond, &worker->jobs_lock,
1787  &stage_slave_waiting_worker_queue, &old_stage);
1788  worker->jobs.overfill= TRUE;
1789  worker->jobs.waited_overfill++;
1790  rli->mts_wq_overfill_cnt++;
1791  mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
1792  thd->EXIT_COND(&old_stage);
1793 
1794  mysql_mutex_lock(&worker->jobs_lock);
1795  }
1796  if (ret != -1)
1797  {
1798  worker->curr_jobs++;
1799  if (worker->jobs.len == 1)
1800  mysql_cond_signal(&worker->jobs_cond);
1801 
1802  mysql_mutex_unlock(&worker->jobs_lock);
1803  }
1804  else
1805  {
1806  mysql_mutex_unlock(&worker->jobs_lock);
1807 
1808  mysql_mutex_lock(&rli->pending_jobs_lock);
1809  rli->pending_jobs--; // roll back of the prev incr
1810  rli->mts_pending_jobs_size -= ev_size;
1811  mysql_mutex_unlock(&rli->pending_jobs_lock);
1812  }
1813 
1814  return (-1 != ret ? false : true);
1815 }
1816 
1817 
1828 struct slave_job_item* pop_jobs_item(Slave_worker *worker, Slave_job_item *job_item)
1829 {
1830  THD *thd= worker->info_thd;
1831 
1832  mysql_mutex_lock(&worker->jobs_lock);
1833 
1834  while (!job_item->data && !thd->killed &&
1835  worker->running_status == Slave_worker::RUNNING)
1836  {
1837  PSI_stage_info old_stage;
1838 
1839  head_queue(&worker->jobs, job_item);
1840  if (job_item->data == NULL)
1841  {
1842  worker->wq_empty_waits++;
1843  thd->ENTER_COND(&worker->jobs_cond, &worker->jobs_lock,
1844  &stage_slave_waiting_event_from_coordinator,
1845  &old_stage);
1846  mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
1847  thd->EXIT_COND(&old_stage);
1848  mysql_mutex_lock(&worker->jobs_lock);
1849  }
1850  }
1851  if (job_item->data)
1852  worker->curr_jobs--;
1853 
1854  mysql_mutex_unlock(&worker->jobs_lock);
1855 
1856  thd_proc_info(worker->info_thd, "Executing event");
1857  return job_item;
1858 }
1859 
1860 
1876 int slave_worker_exec_job(Slave_worker *worker, Relay_log_info *rli)
1877 {
1878  int error= 0;
1879  struct slave_job_item item= {NULL}, *job_item= &item;
1880  THD *thd= worker->info_thd;
1881  Log_event *ev= NULL;
1882  bool part_event= FALSE;
1883 
1884  DBUG_ENTER("slave_worker_exec_job");
1885 
1886  job_item= pop_jobs_item(worker, job_item);
1887  if (thd->killed || worker->running_status != Slave_worker::RUNNING)
1888  {
1889  // de-queueing and decrement counters is in the caller's exit branch
1890  error= -1;
1891  goto err;
1892  }
1893  ev= static_cast<Log_event*>(job_item->data);
1894  thd->server_id = ev->server_id;
1895  thd->set_time();
1896  thd->lex->current_select= 0;
1897  if (!ev->when.tv_sec)
1898  ev->when.tv_sec= my_time(0);
1899  ev->thd= thd; // todo: assert because up to this point, ev->thd == 0
1900  ev->worker= worker;
1901 
1902  DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", worker->id, job_item, ev, thd));
1903 
1904  if (ev->starts_group())
1905  {
1906  worker->curr_group_seen_begin= true; // The current group is started with B-event
1907  worker->end_group_sets_max_dbs= true;
1908  }
1909  else if (!is_gtid_event(ev))
1910  {
1911  if ((part_event=
1912  ev->contains_partition_info(worker->end_group_sets_max_dbs)))
1913  {
1914  uint num_dbs= ev->mts_number_dbs();
1915  DYNAMIC_ARRAY *ep= &worker->curr_group_exec_parts;
1916 
1917  if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
1918  num_dbs= 1;
1919 
1920  DBUG_ASSERT(num_dbs > 0);
1921 
1922  for (uint k= 0; k < num_dbs; k++)
1923  {
1924  bool found= FALSE;
1925 
1926  for (uint i= 0; i < ep->elements && !found; i++)
1927  {
1928  found=
1929  *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
1930  ev->mts_assigned_partitions[k];
1931  }
1932  if (!found)
1933  {
1934  /*
1935  notice, can't assert
1936  DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == worker);
1937  since entry could be marked as wanted by other worker.
1938  */
1939  insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
1940  }
1941  }
1942  worker->end_group_sets_max_dbs= false;
1943  }
1944  }
1945 
1946  worker->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
1947  worker->set_master_log_pos(ev->log_pos);
1948  worker->set_gaq_index(ev->mts_group_idx);
1949  error= ev->do_apply_event_worker(worker);
1950  if (ev->ends_group() || (!worker->curr_group_seen_begin &&
1951  /*
1952  p-events of B/T-less {p,g} group (see
1953  legends of Log_event::get_slave_worker)
1954  obviously can't commit.
1955  */
1956  part_event && !is_gtid_event(ev)))
1957  {
1958  DBUG_PRINT("slave_worker_exec_job:",
1959  (" commits GAQ index %lu, last committed %lu",
1960  ev->mts_group_idx, worker->last_group_done_index));
1961  worker->slave_worker_ends_group(ev, error); /* last done sets post exec */
1962 
1963 #ifndef DBUG_OFF
1964  DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group"
1965  " %u processed %lu debug %d\n", worker->id, opt_mts_checkpoint_group,
1966  worker->groups_done,
1967  DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
1968  if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
1969  opt_mts_checkpoint_group == worker->groups_done)
1970  {
1971  DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id));
1972  while (true) my_sleep(6000000);
1973  }
1974 #endif
1975  }
1976 
1977  mysql_mutex_lock(&worker->jobs_lock);
1978  de_queue(&worker->jobs, job_item);
1979 
1980  /* possible overfill */
1981  if (worker->jobs.len == worker->jobs.size - 1 && worker->jobs.overfill == TRUE)
1982  {
1983  worker->jobs.overfill= FALSE;
1984  // todo: worker->hungry_cnt++;
1985  mysql_cond_signal(&worker->jobs_cond);
1986  }
1987  mysql_mutex_unlock(&worker->jobs_lock);
1988 
1989  /* statistics */
1990 
1991  /* todo: convert to rwlock/atomic write */
1992  mysql_mutex_lock(&rli->pending_jobs_lock);
1993 
1994  rli->pending_jobs--;
1995  rli->mts_pending_jobs_size -= ev->data_written;
1996  DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
1997 
1998  /*
1999  The positive branch is underrun: number of pending assignments
2000  is less than underrun level.
2001  Zero of jobs.len has to reset underrun w_id as the worker may get
2002  the next piece of assignement in a long time.
2003  */
2004  if (worker->underrun_level > worker->jobs.len && worker->jobs.len != 0)
2005  {
2006  rli->mts_wq_underrun_w_id= worker->id;
2007  } else if (rli->mts_wq_underrun_w_id == worker->id)
2008  {
2009  // reset only own marking
2010  rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
2011  }
2012 
2013  /*
2014  Overrun handling.
2015  Incrementing the Worker private and the total excess counter corresponding
2016  to number of events filled above the overrun_level.
2017  The increment amount to the total counter is a difference between
2018  the current and the previous private excess (worker->wq_overrun_cnt).
2019  When the current queue length drops below overrun_level the global
2020  counter is decremented, the local is reset.
2021  */
2022  if (worker->overrun_level < worker->jobs.len)
2023  {
2024  ulong last_overrun= worker->wq_overrun_cnt;
2025  ulong excess_delta;
2026 
2027  /* current overrun */
2028  worker->wq_overrun_cnt= worker->jobs.len - worker->overrun_level;
2029  excess_delta= worker->wq_overrun_cnt - last_overrun;
2030  worker->excess_cnt+= excess_delta;
2031  rli->mts_wq_excess_cnt+= excess_delta;
2032  rli->mts_wq_overrun_cnt++; // statistics
2033 
2034  // guarding correctness of incrementing in case of the only one Worker
2035  DBUG_ASSERT(rli->workers.elements != 1 ||
2036  rli->mts_wq_excess_cnt == worker->wq_overrun_cnt);
2037  }
2038  else if (worker->excess_cnt > 0)
2039  {
2040  // When level drops below the total excess is decremented by the
2041  // value of the worker's contribution to the total excess.
2042  rli->mts_wq_excess_cnt-= worker->excess_cnt;
2043  worker->excess_cnt= 0;
2044  worker->wq_overrun_cnt= 0; // and the local is reset
2045 
2046  DBUG_ASSERT(rli->mts_wq_excess_cnt >= 0);
2047  DBUG_ASSERT(rli->mts_wq_excess_cnt == 0 || rli->workers.elements > 1);
2048 
2049  }
2050 
2051  /* coordinator can be waiting */
2052  if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
2053  rli->mts_wq_oversize) // TODO: unit/general test wq_oversize
2054  {
2055  rli->mts_wq_oversize= FALSE;
2056  mysql_cond_signal(&rli->pending_jobs_cond);
2057  }
2058 
2059  mysql_mutex_unlock(&rli->pending_jobs_lock);
2060 
2061  worker->events_done++;
2062 
2063 err:
2064  if (error)
2065  {
2066  if (log_warnings > 1)
2067  sql_print_information("Worker %lu is exiting: killed %i, error %i, "
2068  "running_status %d",
2069  worker->id, thd->killed, thd->is_error(),
2070  worker->running_status);
2071  worker->slave_worker_ends_group(ev, error);
2072  }
2073 
2074  // todo: simulate delay in delete
2075  if (ev && ev->worker && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
2076  {
2077  delete ev;
2078  }
2079 
2080 
2081  DBUG_RETURN(error);
2082 }