MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbEventOperationImpl.hpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #ifndef NdbEventOperationImpl_H
19 #define NdbEventOperationImpl_H
20 
21 #include <NdbEventOperation.hpp>
22 #include <signaldata/SumaImpl.hpp>
23 #include <transporter/TransporterDefinitions.hpp>
24 #include <NdbRecAttr.hpp>
25 #include <AttributeHeader.hpp>
26 #include <UtilBuffer.hpp>
27 
28 #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
29 //#define EVENT_DEBUG
30 #ifdef EVENT_DEBUG
31 #define DBUG_ENTER_EVENT(A) DBUG_ENTER(A)
32 #define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
33 #define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
34 #define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
35 #define DBUG_DUMP_EVENT(A,B,C) DBUG_DUMP(A,B,C)
36 #else
37 #define DBUG_ENTER_EVENT(A)
38 #define DBUG_RETURN_EVENT(A) return(A)
39 #define DBUG_VOID_RETURN_EVENT return
40 #define DBUG_PRINT_EVENT(A,B)
41 #define DBUG_DUMP_EVENT(A,B,C)
42 #endif
43 
44 #undef NDB_EVENT_VERIFY_SIZE
45 #ifdef VM_TRACE
46 // not much effect on performance, leave on
47 #define NDB_EVENT_VERIFY_SIZE
48 #endif
49 
51 
53 {
54  union {
55  SubTableData *sdata;
56  Uint32 *memory;
57  };
58  LinearSectionPtr ptr[3];
59  unsigned sz;
60  NdbEventOperationImpl *m_event_op;
61 
62  /*
63  * Blobs are stored in blob list (m_next_blob) where each entry
64  * is list of parts (m_next). TODO order by part number
65  *
66  * Processed data (m_used_data, m_free_data) keeps the old blob
67  * list intact. It is reconsumed when new data items are needed.
68  *
69  * Data item lists keep track of item count and sum(sz) and
70  * these include both main items and blob parts.
71  */
72 
73  EventBufData *m_next; // Next wrt to global order or Next blob part
74  EventBufData *m_next_blob; // First part in next blob
75 
76  EventBufData *m_next_hash; // Next in per-GCI hash
77  Uint32 m_pkhash; // PK hash (without op) for fast compare
78 
79  EventBufData() {}
80 
81  /*
82  * Main item does not include summary of parts (space / performance
83  * tradeoff). The summary is needed when moving single data item.
84  * It is not needed when moving entire list.
85  */
86  void get_full_size(Uint32 & full_count, Uint32 & full_sz) const {
87  full_count = 1;
88  full_sz = sz;
89  if (m_next_blob != 0)
90  add_part_size(full_count, full_sz);
91  }
92  void add_part_size(Uint32 & full_count, Uint32 & full_sz) const;
93 };
94 
96 {
97 public:
100 
101  // remove first and return its size
102  void remove_first(Uint32 & full_count, Uint32 & full_sz);
103  // for remove+append avoid double call to get_full_size()
104  void append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz);
105  // append data and insert data but ignore Gci_op list
106  void append_used_data(EventBufData *data);
107  // append data and insert data into Gci_op list with add_gci_op
108  void append_data(EventBufData *data);
109  // append list to another, will call move_gci_ops
110  void append_list(EventBufData_list *list, Uint64 gci);
111 
112  int is_empty();
113 
114  EventBufData *m_head, *m_tail;
115  Uint32 m_count;
116  Uint32 m_sz;
117 
118  /*
119  distinct ops per gci (assume no hash needed)
120 
121  list may be in 2 versions
122 
123  1. single list with on gci only
124  - one linear array
125  Gci_op *m_gci_op_list;
126  Uint32 m_gci_op_count;
127  Uint32 m_gci_op_alloc != 0;
128 
129  2. multi list with several gcis
130  - linked list of gci's
131  - one linear array per gci
132  Gci_ops *m_gci_ops_list;
133  Gci_ops *m_gci_ops_list_tail;
134  Uint32 m_is_not_multi_list == 0;
135 
136  */
137  struct Gci_op // 1 + 2
138  {
140  Uint32 event_types;
141  };
142  struct Gci_ops // 2
143  {
144  Gci_ops()
145  : m_gci(0),
146  m_consistent(true),
147  m_gci_op_list(NULL),
148  m_next(NULL),
149  m_gci_op_count(0)
150  {};
151  ~Gci_ops() {};
152 
153  Uint64 m_gci;
154  bool m_consistent;
155  Gci_op *m_gci_op_list;
156  Gci_ops *m_next;
157  Uint32 m_gci_op_count;
158  };
159  union
160  {
161  Gci_op *m_gci_op_list; // 1
162  Gci_ops *m_gci_ops_list; // 2
163  };
164  union
165  {
166  Uint32 m_gci_op_count; // 1
167  Gci_ops *m_gci_ops_list_tail;// 2
168  };
169  union
170  {
171  Uint32 m_gci_op_alloc; // 1
172  Uint32 m_is_not_multi_list; // 2
173  };
174  Gci_ops *first_gci_ops();
175  Gci_ops *delete_next_gci_ops();
176  // case 1 above; add Gci_op to single list
177  void add_gci_op(Gci_op g);
178 private:
179  // case 2 above; move single list or multi list from
180  // one list to another
181  void move_gci_ops(EventBufData_list *list, Uint64 gci);
182 };
183 
184 inline
185 EventBufData_list::EventBufData_list()
186  : m_head(0), m_tail(0),
187  m_count(0),
188  m_sz(0),
189  m_gci_op_list(NULL),
190  m_gci_ops_list_tail(0),
191  m_gci_op_alloc(0)
192 {
193  DBUG_ENTER_EVENT("EventBufData_list::EventBufData_list");
194  DBUG_PRINT_EVENT("info", ("this: %p", this));
195  DBUG_VOID_RETURN_EVENT;
196 }
197 
198 inline
199 EventBufData_list::~EventBufData_list()
200 {
201  DBUG_ENTER_EVENT("EventBufData_list::~EventBufData_list");
202  DBUG_PRINT_EVENT("info", ("this: %p m_is_not_multi_list: %u",
203  this, m_is_not_multi_list));
204  if (m_is_not_multi_list)
205  {
206  DBUG_PRINT_EVENT("info", ("delete m_gci_op_list: %p", m_gci_op_list));
207  delete [] m_gci_op_list;
208  }
209  else
210  {
211  Gci_ops *op = first_gci_ops();
212  while (op)
213  op = delete_next_gci_ops();
214  }
215  DBUG_VOID_RETURN_EVENT;
216 }
217 
218 inline
219 int EventBufData_list::is_empty()
220 {
221  return m_head == 0;
222 }
223 
224 inline
225 void EventBufData_list::remove_first(Uint32 & full_count, Uint32 & full_sz)
226 {
227  m_head->get_full_size(full_count, full_sz);
228 #ifdef VM_TRACE
229  assert(m_count >= full_count);
230  assert(m_sz >= full_sz);
231 #endif
232  m_count -= full_count;
233  m_sz -= full_sz;
234  m_head = m_head->m_next;
235  if (m_head == 0)
236  m_tail = 0;
237 }
238 
239 inline
240 void EventBufData_list::append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz)
241 {
242  data->m_next = 0;
243  if (m_tail)
244  m_tail->m_next = data;
245  else
246  {
247 #ifdef VM_TRACE
248  assert(m_head == 0);
249  assert(m_count == 0);
250  assert(m_sz == 0);
251 #endif
252  m_head = data;
253  }
254  m_tail = data;
255 
256  m_count += full_count;
257  m_sz += full_sz;
258 }
259 
260 inline
261 void EventBufData_list::append_used_data(EventBufData *data)
262 {
263  Uint32 full_count, full_sz;
264  data->get_full_size(full_count, full_sz);
265  append_used_data(data, full_count, full_sz);
266 }
267 
268 inline
269 void EventBufData_list::append_data(EventBufData *data)
270 {
271  Gci_op g = { data->m_event_op,
272  1 << SubTableData::getOperation(data->sdata->requestInfo) };
273  add_gci_op(g);
274 
275  append_used_data(data);
276 }
277 
279 EventBufData_list::first_gci_ops()
280 {
281  assert(!m_is_not_multi_list);
282  return m_gci_ops_list;
283 }
284 
286 EventBufData_list::delete_next_gci_ops()
287 {
288  assert(!m_is_not_multi_list);
289  Gci_ops *first = m_gci_ops_list;
290  m_gci_ops_list = first->m_next;
291  if (first->m_gci_op_list)
292  {
293  DBUG_PRINT_EVENT("info", ("this: %p delete m_gci_op_list: %p",
294  this, first->m_gci_op_list));
295  delete [] first->m_gci_op_list;
296  }
297  delete first;
298  if (m_gci_ops_list == 0)
299  m_gci_ops_list_tail = 0;
300  return m_gci_ops_list;
301 }
302 
303 // GCI bucket has also a hash over data, with key event op, table PK.
304 // It can only be appended to and is invalid after remove_first().
306 {
307 public:
308  struct Pos { // search result
309  Uint32 index; // index into hash array
310  EventBufData* data; // non-zero if found
311  Uint32 pkhash; // PK hash
312  };
313 
314  static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
315  static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]);
316 
317  void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
318  void append(Pos& hpos, EventBufData* data);
319 
320  enum { GCI_EVENT_HASH_SIZE = 101 };
321  EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
322 };
323 
324 inline
325 void EventBufData_hash::append(Pos& hpos, EventBufData* data)
326 {
327  data->m_next_hash = m_hash[hpos.index];
328  m_hash[hpos.index] = data;
329 }
330 
332 {
333  enum State
334  {
335  GC_COMPLETE = 0x1, // GCI is complete, but waiting for out of order
336  GC_INCONSISTENT = 0x2 // GCI might be missing event data
337  ,GC_CHANGE_CNT = 0x4 // Change m_total_buckets
338  };
339 
340 
341  Uint16 m_state;
342  Uint16 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
343  Uint64 m_gci; // GCI
344  EventBufData_list m_data;
345  EventBufData_hash m_data_hash;
346 };
347 
349 {
350  char data[sizeof(Gci_container)];
351 };
352 
354 public:
356  Ndb *theNdb,
357  const char* eventName);
358  NdbEventOperationImpl(Ndb *theNdb,
359  NdbEventImpl& evnt);
360  void init(NdbEventImpl& evnt);
362  NdbEventOperationImpl& operator=(const NdbEventOperationImpl&); //unimplemented
364 
365  NdbEventOperation::State getState();
366 
367  int execute();
368  int execute_nolock();
369  int stop();
370  NdbRecAttr *getValue(const char *colName, char *aValue, int n);
371  NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
372  NdbBlob *getBlobHandle(const char *colName, int n);
373  NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
374  Uint32 get_blob_part_no(bool hasDist);
375  int readBlobParts(char* buf, NdbBlob* blob,
376  Uint32 part, Uint32 count, Uint16* lenLoc);
377  int receive_event();
378  bool tableNameChanged() const;
379  bool tableFrmChanged() const;
380  bool tableFragmentationChanged() const;
381  bool tableRangeListChanged() const;
382  Uint64 getGCI();
383  Uint32 getAnyValue() const;
384  Uint64 getLatestGCI();
385  Uint64 getTransId() const;
386  bool execSUB_TABLE_DATA(const NdbApiSignal * signal,
387  const LinearSectionPtr ptr[3]);
388 
389  NdbDictionary::Event::TableEvent getEventType();
390 
391  void print();
392  void printAll();
393 
394  NdbEventOperation *m_facade;
395  Uint32 m_magic_number;
396 
397  const NdbError & getNdbError() const;
398  NdbError m_error;
399 
400  Ndb *m_ndb;
401  NdbEventImpl *m_eventImpl;
402 
403  NdbRecAttr *theFirstPkAttrs[2];
404  NdbRecAttr *theCurrentPkAttrs[2];
405  NdbRecAttr *theFirstDataAttrs[2];
406  NdbRecAttr *theCurrentDataAttrs[2];
407 
408  NdbBlob* theBlobList;
409  NdbEventOperationImpl* theBlobOpList; // in main op, list of blob ops
410  NdbEventOperationImpl* theMainOp; // in blob op, the main op
411  int theBlobVersion; // in blob op, NDB_BLOB_V1 or NDB_BLOB_V2
412 
413  NdbEventOperation::State m_state; /* note connection to mi_type */
414  Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING
415  * else same as in EventImpl
416  */
417  Uint32 m_eventId;
418  Uint32 m_oid;
419 
420  /*
421  when parsed gci > m_stop_gci it is safe to drop operation
422  as kernel will not have any more references
423  */
424  Uint64 m_stop_gci;
425 
426  /*
427  m_ref_count keeps track of outstanding references to an event
428  operation impl object. To make sure that the object is not
429  deleted too early.
430 
431  If on dropEventOperation there are still references to an
432  object it is queued for delete in NdbEventBuffer::m_dropped_ev_op
433 
434  the following references exists for a _non_ blob event op:
435  * user reference
436  - add - NdbEventBuffer::createEventOperation
437  - remove - NdbEventBuffer::dropEventOperation
438  * kernel reference
439  - add - execute_nolock
440  - remove - TE_STOP, TE_CLUSTER_FAILURE
441  * blob reference
442  - add - execute_nolock on blob event
443  - remove - TE_STOP, TE_CLUSTER_FAILURE on blob event
444  * gci reference
445  - add - insertDataL/add_gci_op
446  - remove - NdbEventBuffer::deleteUsedEventOperations
447 
448  the following references exists for a blob event op:
449  * kernel reference
450  - add - execute_nolock
451  - remove - TE_STOP, TE_CLUSTER_FAILURE
452  */
453 
454  int m_ref_count;
455  bool m_mergeEvents;
456 
457  EventBufData *m_data_item;
458 
459  void *m_custom_data;
460  int m_has_error;
461 
462  Uint32 m_fragmentId;
463  UtilBuffer m_buffer;
464 
465  // Bit mask for what has changed in a table (for TE_ALTER event)
466  Uint32 m_change_mask;
467 
468 #ifdef VM_TRACE
469  Uint32 m_data_done_count;
470  Uint32 m_data_count;
471 #endif
472 
473  // managed by the ndb object
474  NdbEventOperationImpl *m_next;
475  NdbEventOperationImpl *m_prev;
476 private:
477  void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz);
478 };
479 
481 public:
483  ~NdbEventBuffer();
484 
485  Uint32 m_total_buckets;
486  Uint16 m_min_gci_index;
487  Uint16 m_max_gci_index;
488  Vector<Uint64> m_known_gci;
489  Vector<Gci_container_pod> m_active_gci;
490  STATIC_CONST( ACTIVE_GCI_DIRECTORY_SIZE = 4 );
491  STATIC_CONST( ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1 );
492 
493  NdbEventOperation *createEventOperation(const char* eventName,
494  NdbError &);
495  NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt,
496  NdbError &);
497  void dropEventOperation(NdbEventOperation *);
498  static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
499 
500  void add_drop_lock() { NdbMutex_Lock(m_add_drop_mutex); }
501  void add_drop_unlock() { NdbMutex_Unlock(m_add_drop_mutex); }
502  void lock() { NdbMutex_Lock(m_mutex); }
503  void unlock() { NdbMutex_Unlock(m_mutex); }
504 
505  void add_op();
506  void remove_op();
507  void init_gci_containers();
508 
509  // accessed from the "receive thread"
510  int insertDataL(NdbEventOperationImpl *op,
511  const SubTableData * const sdata, Uint32 len,
512  LinearSectionPtr ptr[3]);
513  void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const, Uint32 len,
514  int complete_cluster_failure= 0);
515  void complete_outof_order_gcis();
516 
517  void report_node_failure_completed(Uint32 node_id);
518 
519  // used by user thread
520  Uint64 getLatestGCI();
521  Uint32 getEventId(int bufferId);
522 
523  int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
524  int flushIncompleteEvents(Uint64 gci);
525  NdbEventOperation *nextEvent();
526  bool isConsistent(Uint64& gci);
527  bool isConsistentGCI(Uint64 gci);
528 
529  NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
530  Uint32* event_types);
531  void deleteUsedEventOperations(Uint64 last_consumed_gci);
532 
533  NdbEventOperationImpl *move_data();
534 
535  // routines to copy/merge events
536  EventBufData* alloc_data();
537  int alloc_mem(EventBufData* data,
538  LinearSectionPtr ptr[3],
539  Uint32 * change_sz);
540  void dealloc_mem(EventBufData* data,
541  Uint32 * change_sz);
542  int copy_data(const SubTableData * const sdata, Uint32 len,
543  LinearSectionPtr ptr[3],
544  EventBufData* data,
545  Uint32 * change_sz);
546  int merge_data(const SubTableData * const sdata, Uint32 len,
547  LinearSectionPtr ptr[3],
548  EventBufData* data,
549  Uint32 * change_sz);
550  int get_main_data(Gci_container* bucket,
552  EventBufData* blob_data);
553  void add_blob_data(Gci_container* bucket,
554  EventBufData* main_data,
555  EventBufData* blob_data);
556 
557  void free_list(EventBufData_list &list);
558 
559  void reportStatus();
560 
561  // Global Mutex used for some things
562  static NdbMutex *p_add_drop_mutex;
563 
564 #ifdef VM_TRACE
565  const char *m_latest_command;
566  Uint64 m_flush_gci;
567 #endif
568 
569  Ndb *m_ndb;
570 
571  // "latest gci" variables updated in receiver thread
572  Uint64 m_latestGCI; // latest GCI completed in order
573  Uint64 m_latest_complete_GCI; // latest complete GCI (in case of outof order)
574  Uint64 m_highest_sub_gcp_complete_GCI; // highest gci seen in api
575  // "latest gci" variables updated in user thread
576  Uint64 m_latest_poll_GCI; // latest gci handed over to user thread
577 
578  bool m_startup_hack;
579 
580  NdbMutex *m_mutex;
581  struct NdbCondition *p_cond;
582 
583  // receive thread
584  Gci_container m_complete_data;
585  EventBufData *m_free_data;
586 #ifdef VM_TRACE
587  Uint32 m_free_data_count;
588 #endif
589  Uint32 m_free_data_sz;
590 
591  // user thread
592  EventBufData_list m_available_data;
593  EventBufData_list m_used_data;
594 
595  unsigned m_total_alloc; // total allocated memory
596 
597  // threshholds to report status
598  unsigned m_free_thresh, m_min_free_thresh, m_max_free_thresh;
599  unsigned m_gci_slip_thresh;
600 
601  NdbError m_error;
602 
603 #ifdef VM_TRACE
604  static void verify_size(const EventBufData* data, Uint32 count, Uint32 sz);
605  static void verify_size(const EventBufData_list & list);
606 #endif
607 
608 private:
609  void insert_event(NdbEventOperationImpl* impl,
610  SubTableData &data,
611  LinearSectionPtr *ptr,
612  Uint32 &oid_ref);
613 
614  int expand(unsigned sz);
615 
616  // all allocated data
617  struct EventBufData_chunk
618  {
619  unsigned sz;
620  EventBufData data[1];
621  };
622  Vector<EventBufData_chunk *> m_allocated_data;
623  unsigned m_sz;
624 
625  /*
626  dropped event operations (dropEventOperation) that have not yet
627  been deleted because of outstanding m_ref_count
628 
629  check for delete is done on occations when the ref_count may have
630  changed by calling deleteUsedEventOperations:
631  - nextEvent - each time the user has completed processing a gci
632  */
633  NdbEventOperationImpl *m_dropped_ev_op;
634 
635  Uint32 m_active_op_count;
636  NdbMutex *m_add_drop_mutex;
637 
638  inline Gci_container* find_bucket(Uint64 gci){
639  Uint32 pos = (Uint32)(gci & ACTIVE_GCI_MASK);
640  Gci_container *bucket= ((Gci_container*)(m_active_gci.getBase())) + pos;
641  if(likely(gci == bucket->m_gci))
642  return bucket;
643 
644  return find_bucket_chained(gci);
645  }
646 
647 #ifdef VM_TRACE
648  void verify_known_gci(bool allowempty);
649 #endif
650  Gci_container* find_bucket_chained(Uint64 gci);
651  void complete_bucket(Gci_container*);
652  bool find_max_known_gci(Uint64 * res) const;
653  void resize_known_gci();
654 
655  Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_alive_node_bit_mask;
656 
657  void handle_change_nodegroup(const SubGcpCompleteRep*);
658 
659 public:
660  void set_total_buckets(Uint32);
661 };
662 
663 inline
665 NdbEventBuffer::getEventOperationImpl(NdbEventOperation* tOp)
666 {
667  return &tOp->m_impl;
668 }
669 
670 inline void
671 NdbEventOperationImpl::receive_data(NdbRecAttr *r,
672  const Uint32 *data,
673  Uint32 sz)
674 {
675  r->receive_data(data,sz);
676 #if 0
677  if (sz)
678  {
679  assert((r->attrSize() * r->arraySize() + 3) >> 2 == sz);
680  r->theNULLind= 0;
681  memcpy(r->aRef(), data, 4 * sz);
682  return;
683  }
684  r->theNULLind= 1;
685 #endif
686 }
687 
688 #endif