MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbEventOperationImpl.cpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #include <ndb_global.h>
20 #include <kernel_types.h>
21 
22 #include "API.hpp"
23 #include <NdbOut.hpp>
24 
25 #include <signaldata/CreateEvnt.hpp>
26 #include <signaldata/SumaImpl.hpp>
27 #include <SimpleProperties.hpp>
28 #include <Bitmask.hpp>
29 #include <AttributeHeader.hpp>
30 #include <AttributeList.hpp>
31 #include <NdbError.hpp>
32 #include <BaseString.hpp>
33 #include <UtilBuffer.hpp>
34 #include <portlib/NdbMem.h>
35 #include <signaldata/AlterTable.hpp>
36 #include "ndb_internal.hpp"
37 
38 #include <EventLogger.hpp>
39 extern EventLogger * g_eventLogger;
40 
41 #define TOTAL_BUCKETS_INIT (1 << 15)
42 static Gci_container_pod g_empty_gci_container;
43 
44 #if defined(VM_TRACE) && defined(NOT_USED)
45 static void
46 print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
47 {
48  printf("addr=%p gci{hi/lo}hi=%u/%u op=%d\n", (void*)sdata,
49  sdata->gci_hi, sdata->gci_lo,
50  SubTableData::getOperation(sdata->requestInfo));
51  for (int i = 0; i <= 2; i++) {
52  printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
53  for (int j = 0; (uint) j < ptr[i].sz; j++)
54  printf("%08x ", ptr[i].p[j]);
55  printf("\n");
56  }
57 }
58 #endif
59 
60 // EventBufData
61 
62 void
63 EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz) const
64 {
65  Uint32 tmp_count = 0;
66  Uint32 tmp_sz = 0;
67  const EventBufData* data2 = m_next_blob;
68  while (data2 != 0) {
69  tmp_count++;
70  tmp_sz += data2->sz;
71  const EventBufData* data3 = data2->m_next;
72  while (data3 != 0) {
73  tmp_count++;
74  tmp_sz += data3->sz;
75  data3 = data3->m_next;
76  }
77  data2 = data2->m_next_blob;
78  }
79  full_count += tmp_count;
80  full_sz += tmp_sz;
81 }
82 
83 /*
84  * Class NdbEventOperationImpl
85  *
86  *
87  */
88 
89 // todo handle several ndb objects
90 // todo free allocated data when closing NdbEventBuffer
91 
92 NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &f,
93  Ndb *theNdb,
94  const char* eventName) :
95  NdbEventOperation(*this),
96  m_facade(&f),
97  m_ndb(theNdb),
98  m_state(EO_ERROR),
99  m_oid(~(Uint32)0)
100 {
101  DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl");
102 
103  assert(m_ndb != NULL);
104  NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
105  assert(myDict != NULL);
106 
107  const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
108  if (!myEvnt)
109  {
110  m_error.code= myDict->getNdbError().code;
111  DBUG_VOID_RETURN;
112  }
113 
114  init(myEvnt->m_impl);
115  DBUG_VOID_RETURN;
116 }
117 
118 NdbEventOperationImpl::NdbEventOperationImpl(Ndb *theNdb,
119  NdbEventImpl& evnt) :
120  NdbEventOperation(*this),
121  m_facade(this),
122  m_ndb(theNdb),
123  m_state(EO_ERROR),
124  m_oid(~(Uint32)0)
125 {
126  DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl [evnt]");
127  init(evnt);
128  DBUG_VOID_RETURN;
129 }
130 
131 void
132 NdbEventOperationImpl::init(NdbEventImpl& evnt)
133 {
134  DBUG_ENTER("NdbEventOperationImpl::init");
135 
136  m_magic_number = 0;
137  mi_type = 0;
138  m_change_mask = 0;
139 #ifdef VM_TRACE
140  m_data_done_count = 0;
141  m_data_count = 0;
142 #endif
143  m_next = 0;
144  m_prev = 0;
145 
146  m_eventId = 0;
147  theFirstPkAttrs[0] = NULL;
148  theCurrentPkAttrs[0] = NULL;
149  theFirstPkAttrs[1] = NULL;
150  theCurrentPkAttrs[1] = NULL;
151  theFirstDataAttrs[0] = NULL;
152  theCurrentDataAttrs[0] = NULL;
153  theFirstDataAttrs[1] = NULL;
154  theCurrentDataAttrs[1] = NULL;
155 
156  theBlobList = NULL;
157  theBlobOpList = NULL;
158  theMainOp = NULL;
159  theBlobVersion = 0;
160 
161  m_data_item= NULL;
162  m_eventImpl = NULL;
163 
164  m_custom_data= 0;
165  m_has_error= 1;
166 
167  // we should lookup id in Dictionary, TODO
168  // also make sure we only have one listener on each event
169 
170  m_eventImpl = &evnt;
171 
172  m_eventId = m_eventImpl->m_eventId;
173 
174  m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this);
175 
176  m_state= EO_CREATED;
177 
178  m_stop_gci = 0;
179 #ifdef ndb_event_stores_merge_events_flag
180  m_mergeEvents = m_eventImpl->m_mergeEvents;
181 #else
182  m_mergeEvents = false;
183 #endif
184  m_ref_count = 0;
185  DBUG_PRINT("info", ("m_ref_count = 0 for op: 0x%lx", (long) this));
186 
187  m_has_error= 0;
188 
189  DBUG_PRINT("exit",("this: 0x%lx oid: %u", (long) this, m_oid));
190  DBUG_VOID_RETURN;
191 }
192 
193 NdbEventOperationImpl::~NdbEventOperationImpl()
194 {
195  DBUG_ENTER("NdbEventOperationImpl::~NdbEventOperationImpl");
196  m_magic_number= 0;
197 
198  if (m_oid == ~(Uint32)0)
199  DBUG_VOID_RETURN;
200 
201  stop();
202 
203  if (theMainOp == NULL)
204  {
205  NdbEventOperationImpl* tBlobOp = theBlobOpList;
206  while (tBlobOp != NULL)
207  {
208  NdbEventOperationImpl *op = tBlobOp;
209  tBlobOp = tBlobOp->m_next;
210  delete op;
211  }
212  }
213 
214  m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
215  DBUG_PRINT("exit",("this: %p/%p oid: %u main: %p",
216  this, m_facade, m_oid, theMainOp));
217 
218  if (m_eventImpl)
219  {
220  delete m_eventImpl->m_facade;
221  m_eventImpl= 0;
222  }
223 
224  DBUG_VOID_RETURN;
225 }
226 
229 {
230  return m_state;
231 }
232 
233 NdbRecAttr*
234 NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
235 {
236  DBUG_ENTER("NdbEventOperationImpl::getValue");
237  if (m_state != EO_CREATED) {
238  ndbout_c("NdbEventOperationImpl::getValue may only be called between "
239  "instantiation and execute()");
240  DBUG_RETURN(NULL);
241  }
242 
243  NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
244 
245  if (tAttrInfo == NULL) {
246  ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);
247  DBUG_RETURN(NULL);
248  }
249 
250  DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));
251 }
252 
253 NdbRecAttr*
254 NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n)
255 {
256  DBUG_ENTER("NdbEventOperationImpl::getValue");
257  // Insert Attribute Id into ATTRINFO part.
258 
259  NdbRecAttr **theFirstAttr;
260  NdbRecAttr **theCurrentAttr;
261 
262  if (tAttrInfo->getPrimaryKey())
263  {
264  theFirstAttr = &theFirstPkAttrs[n];
265  theCurrentAttr = &theCurrentPkAttrs[n];
266  }
267  else
268  {
269  theFirstAttr = &theFirstDataAttrs[n];
270  theCurrentAttr = &theCurrentDataAttrs[n];
271  }
272 
273  /************************************************************************
274  * Get a Receive Attribute object and link it into the operation object.
275  ************************************************************************/
276  NdbRecAttr *tAttr = m_ndb->getRecAttr();
277  if (tAttr == NULL) {
278  exit(-1);
279  //setErrorCodeAbort(4000);
280  DBUG_RETURN(NULL);
281  }
282 
283  /**********************************************************************
284  * Now set the attribute identity and the pointer to the data in
285  * the RecAttr object
286  * Also set attribute size, array size and attribute type
287  ********************************************************************/
288  if (tAttr->setup(tAttrInfo, aValue)) {
289  //setErrorCodeAbort(4000);
290  m_ndb->releaseRecAttr(tAttr);
291  exit(-1);
292  DBUG_RETURN(NULL);
293  }
294  //theErrorLine++;
295 
296  tAttr->setUNDEFINED();
297 
298  // We want to keep the list sorted to make data insertion easier later
299 
300  if (*theFirstAttr == NULL) {
301  *theFirstAttr = tAttr;
302  *theCurrentAttr = tAttr;
303  tAttr->next(NULL);
304  } else {
305  Uint32 tAttrId = tAttrInfo->m_attrId;
306  if (tAttrId > (*theCurrentAttr)->attrId()) { // right order
307  (*theCurrentAttr)->next(tAttr);
308  tAttr->next(NULL);
309  *theCurrentAttr = tAttr;
310  } else if ((*theFirstAttr)->next() == NULL || // only one in list
311  (*theFirstAttr)->attrId() > tAttrId) {// or first
312  tAttr->next(*theFirstAttr);
313  *theFirstAttr = tAttr;
314  } else { // at least 2 in list and not first and not last
315  NdbRecAttr *p = *theFirstAttr;
316  NdbRecAttr *p_next = p->next();
317  while (tAttrId > p_next->attrId()) {
318  p = p_next;
319  p_next = p->next();
320  }
321  if (tAttrId == p_next->attrId()) { // Using same attribute twice
322  tAttr->release(); // do I need to do this?
323  m_ndb->releaseRecAttr(tAttr);
324  exit(-1);
325  DBUG_RETURN(NULL);
326  }
327  // this is it, between p and p_next
328  p->next(tAttr);
329  tAttr->next(p_next);
330  }
331  }
332  DBUG_RETURN(tAttr);
333 }
334 
335 NdbBlob*
336 NdbEventOperationImpl::getBlobHandle(const char *colName, int n)
337 {
338  DBUG_ENTER("NdbEventOperationImpl::getBlobHandle (colName)");
339 
340  assert(m_mergeEvents);
341 
342  if (m_state != EO_CREATED) {
343  ndbout_c("NdbEventOperationImpl::getBlobHandle may only be called between "
344  "instantiation and execute()");
345  DBUG_RETURN(NULL);
346  }
347 
348  NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
349 
350  if (tAttrInfo == NULL) {
351  ndbout_c("NdbEventOperationImpl::getBlobHandle attribute %s not found",colName);
352  DBUG_RETURN(NULL);
353  }
354 
355  NdbBlob* bh = getBlobHandle(tAttrInfo, n);
356  DBUG_RETURN(bh);
357 }
358 
359 NdbBlob*
360 NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
361 {
362  DBUG_ENTER("NdbEventOperationImpl::getBlobHandle");
363  DBUG_PRINT("info", ("attr=%s post/pre=%d", tAttrInfo->m_name.c_str(), n));
364 
365  // as in NdbOperation, create only one instance
366  NdbBlob* tBlob = theBlobList;
367  NdbBlob* tLastBlob = NULL;
368  while (tBlob != NULL) {
369  if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n)
370  DBUG_RETURN(tBlob);
371  tLastBlob = tBlob;
372  tBlob = tBlob->theNext;
373  }
374 
375  NdbEventOperationImpl* tBlobOp = NULL;
376 
377  const bool is_tinyblob = (tAttrInfo->getPartSize() == 0);
378  assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL));
379 
380  if (! is_tinyblob) {
381  // blob event name
382  char bename[MAX_TAB_NAME_SIZE];
383  NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
384 
385  // find blob event op if any (it serves both post and pre handles)
386  tBlobOp = theBlobOpList;
387  NdbEventOperationImpl* tLastBlopOp = NULL;
388  while (tBlobOp != NULL) {
389  if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
390  break;
391  }
392  tLastBlopOp = tBlobOp;
393  tBlobOp = tBlobOp->m_next;
394  }
395 
396  DBUG_PRINT("info", ("%s blob event op for %s",
397  tBlobOp ? " reuse" : " create", bename));
398 
399  // create blob event op if not found
400  if (tBlobOp == NULL) {
401  // get blob event
402  NdbDictionaryImpl& dict =
403  NdbDictionaryImpl::getImpl(*m_ndb->getDictionary());
404  NdbEventImpl* blobEvnt =
405  dict.getBlobEvent(*this->m_eventImpl, tAttrInfo->m_column_no);
406  if (blobEvnt == NULL) {
407  m_error.code = dict.m_error.code;
408  DBUG_RETURN(NULL);
409  }
410 
411  // create blob event operation
412  tBlobOp =
413  m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
414  if (tBlobOp == NULL)
415  DBUG_RETURN(NULL);
416 
417  // pointer to main table op
418  tBlobOp->theMainOp = this;
419  tBlobOp->m_mergeEvents = m_mergeEvents;
420  tBlobOp->theBlobVersion = tAttrInfo->m_blobVersion;
421 
422  // to hide blob op it is linked under main op, not under m_ndb
423  if (tLastBlopOp == NULL)
424  theBlobOpList = tBlobOp;
425  else
426  tLastBlopOp->m_next = tBlobOp;
427  tBlobOp->m_next = NULL;
428  }
429  }
430 
431  tBlob = m_ndb->getNdbBlob();
432  if (tBlob == NULL) {
433  m_error.code = m_ndb->getNdbError().code;
434  DBUG_RETURN(NULL);
435  }
436 
437  // calls getValue on inline and blob part
438  if (tBlob->atPrepare(this, tBlobOp, tAttrInfo, n) == -1) {
439  m_error.code = tBlob->getNdbError().code;
440  m_ndb->releaseNdbBlob(tBlob);
441  DBUG_RETURN(NULL);
442  }
443 
444  // add to list end
445  if (tLastBlob == NULL)
446  theBlobList = tBlob;
447  else
448  tLastBlob->theNext = tBlob;
449  tBlob->theNext = NULL;
450  DBUG_RETURN(tBlob);
451 }
452 
453 Uint32
454 NdbEventOperationImpl::get_blob_part_no(bool hasDist)
455 {
456  assert(theBlobVersion == 1 || theBlobVersion == 2);
457  assert(theMainOp != NULL);
458  const NdbTableImpl* mainTable = theMainOp->m_eventImpl->m_tableImpl;
459  assert(m_data_item != NULL);
460  LinearSectionPtr (&ptr)[3] = m_data_item->ptr;
461 
462  uint pos = 0; // PK and possibly DIST to skip
463 
464  if (unlikely(theBlobVersion == 1)) {
465  pos += AttributeHeader(ptr[0].p[0]).getDataSize();
466  assert(hasDist);
467  pos += AttributeHeader(ptr[0].p[1]).getDataSize();
468  } else {
469  uint n = mainTable->m_noOfKeys;
470  uint i;
471  for (i = 0; i < n; i++) {
472  pos += AttributeHeader(ptr[0].p[i]).getDataSize();
473  }
474  if (hasDist)
475  pos += AttributeHeader(ptr[0].p[n]).getDataSize();
476  }
477 
478  assert(pos < ptr[1].sz);
479  Uint32 no = ptr[1].p[pos];
480  return no;
481 }
482 
483 int
484 NdbEventOperationImpl::readBlobParts(char* buf, NdbBlob* blob,
485  Uint32 part, Uint32 count, Uint16* lenLoc)
486 {
487  DBUG_ENTER_EVENT("NdbEventOperationImpl::readBlobParts");
488  DBUG_PRINT_EVENT("info", ("part=%u count=%u post/pre=%d",
489  part, count, blob->theEventBlobVersion));
490 
491  NdbEventOperationImpl* blob_op = blob->theBlobEventOp;
492  const bool hasDist = (blob->theStripeSize != 0);
493 
494  EventBufData* main_data = m_data_item;
495  DBUG_PRINT_EVENT("info", ("main_data=%p", main_data));
496  assert(main_data != NULL);
497 
498  // search for blob parts list head
499  EventBufData* head;
500  assert(m_data_item != NULL);
501  head = m_data_item->m_next_blob;
502  while (head != NULL)
503  {
504  if (head->m_event_op == blob_op)
505  {
506  DBUG_PRINT_EVENT("info", ("found blob parts head %p", head));
507  break;
508  }
509  head = head->m_next_blob;
510  }
511 
512  Uint32 nparts = 0;
513  Uint32 noutside = 0;
514  EventBufData* data = head;
515  // XXX optimize using part no ordering
516  while (data != NULL)
517  {
518  /*
519  * Hack part no directly out of buffer since it is not returned
520  * in pre data (PK buglet). For part data use receive_event().
521  * This means extra copy. XXX fix
522  */
523  blob_op->m_data_item = data;
524  int r = blob_op->receive_event();
525  assert(r > 0);
526  // XXX should be: no = blob->theBlobEventPartValue
527  Uint32 no = blob_op->get_blob_part_no(hasDist);
528 
529  DBUG_PRINT_EVENT("info", ("part_data=%p part no=%u part", data, no));
530 
531  if (part <= no && no < part + count)
532  {
533  DBUG_PRINT_EVENT("info", ("part within read range"));
534 
535  const char* src = blob->theBlobEventDataBuf.data;
536  Uint32 sz = 0;
537  if (blob->theFixedDataFlag) {
538  sz = blob->thePartSize;
539  } else {
540  const uchar* p = (const uchar*)blob->theBlobEventDataBuf.data;
541  sz = p[0] + (p[1] << 8);
542  src += 2;
543  }
544  memcpy(buf + (no - part) * sz, src, sz);
545  nparts++;
546  if (lenLoc != NULL) {
547  assert(count == 1);
548  *lenLoc = sz;
549  } else {
550  assert(sz == blob->thePartSize);
551  }
552  }
553  else
554  {
555  DBUG_PRINT_EVENT("info", ("part outside read range"));
556  noutside++;
557  }
558  data = data->m_next;
559  }
560  if (unlikely(nparts != count))
561  {
562  ndbout_c("nparts: %u count: %u noutside: %u", nparts, count, noutside);
563  }
564  assert(nparts == count);
565 
566  DBUG_RETURN_EVENT(0);
567 }
568 
569 int
571 {
572  DBUG_ENTER("NdbEventOperationImpl::execute");
573  m_ndb->theEventBuffer->add_drop_lock();
574  int r = execute_nolock();
575  m_ndb->theEventBuffer->add_drop_unlock();
576  DBUG_RETURN(r);
577 }
578 
579 int
580 NdbEventOperationImpl::execute_nolock()
581 {
582  DBUG_ENTER("NdbEventOperationImpl::execute_nolock");
583  DBUG_PRINT("info", ("this=%p type=%s", this, !theMainOp ? "main" : "blob"));
584 
585  NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
586  if (!myDict) {
587  m_error.code= m_ndb->getNdbError().code;
588  DBUG_RETURN(-1);
589  }
590 
591  bool schemaTrans = false;
592  if (m_ndb->theEventBuffer->m_total_buckets == TOTAL_BUCKETS_INIT)
593  {
594  int res = NdbDictionaryImpl::getImpl(* myDict).beginSchemaTrans(false);
595  if (res != 0)
596  {
597  switch(myDict->getNdbError().code){
598  case 711:
599  case 763:
600  // ignore;
601  break;
602  default:
603  m_error.code= myDict->getNdbError().code;
604  DBUG_RETURN(-1);
605  }
606  }
607  else
608  {
609  schemaTrans = true;
610  }
611  }
612 
613  if (theFirstPkAttrs[0] == NULL &&
614  theFirstDataAttrs[0] == NULL) { // defaults to get all
615  }
616 
617  m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
618  m_state= EO_EXECUTING;
619  mi_type= m_eventImpl->mi_type;
620  m_ndb->theEventBuffer->add_op();
621  // add kernel reference
622  // removed on TE_STOP, TE_CLUSTER_FAILURE, or error below
623  m_ref_count++;
624  m_stop_gci= ~(Uint64)0;
625  DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
626  Uint32 buckets = 0;
627  int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this,
628  buckets);
629  if (r == 0)
630  {
631  /* Pre-7.0 kernel nodes do not return the number of buckets
632  * Assume it's == theNoOfDBnodes as was the case in 6.3
633  */
634  if (buckets == ~ (Uint32)0)
635  buckets = m_ndb->theImpl->theNoOfDBnodes;
636 
637  m_ndb->theEventBuffer->set_total_buckets(buckets);
638  if (schemaTrans)
639  {
640  schemaTrans = false;
641  myDict->endSchemaTrans(1);
642  }
643 
644  if (theMainOp == NULL) {
645  DBUG_PRINT("info", ("execute blob ops"));
646  NdbEventOperationImpl* blob_op = theBlobOpList;
647  while (blob_op != NULL) {
648  r = blob_op->execute_nolock();
649  if (r != 0) {
650  // since main op is running and possibly some blob ops as well
651  // we can't just reset the main op. Instead return with error,
652  // main op (and blob ops) will be cleaned up when user calls
653  // dropEventOperation
654  m_error.code= myDict->getNdbError().code;
655  DBUG_RETURN(r);
656  }
657  blob_op = blob_op->m_next;
658  }
659  }
660  if (r == 0)
661  {
662  DBUG_RETURN(0);
663  }
664  }
665  // Error
666  // remove kernel reference
667  // added above
668  m_ref_count--;
669  m_stop_gci = 0;
670  DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
671  m_state= EO_ERROR;
672  mi_type= 0;
673  m_magic_number= 0;
674  m_error.code= myDict->getNdbError().code;
675  m_ndb->theEventBuffer->remove_op();
676 
677  if (schemaTrans)
678  {
679  schemaTrans = false;
680  myDict->endSchemaTrans(1);
681  }
682 
683  DBUG_RETURN(r);
684 }
685 
686 int
687 NdbEventOperationImpl::stop()
688 {
689  DBUG_ENTER("NdbEventOperationImpl::stop");
690  int i;
691 
692  for (i=0 ; i<2; i++) {
693  NdbRecAttr *p = theFirstPkAttrs[i];
694  while (p) {
695  NdbRecAttr *p_next = p->next();
696  m_ndb->releaseRecAttr(p);
697  p = p_next;
698  }
699  theFirstPkAttrs[i]= 0;
700  }
701  for (i=0 ; i<2; i++) {
702  NdbRecAttr *p = theFirstDataAttrs[i];
703  while (p) {
704  NdbRecAttr *p_next = p->next();
705  m_ndb->releaseRecAttr(p);
706  p = p_next;
707  }
708  theFirstDataAttrs[i]= 0;
709  }
710 
711  if (m_state != EO_EXECUTING)
712  {
713  DBUG_RETURN(-1);
714  }
715 
716  NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
717  if (!myDict) {
718  m_error.code= m_ndb->getNdbError().code;
719  DBUG_RETURN(-1);
720  }
721 
722  m_ndb->theEventBuffer->add_drop_lock();
723  int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*this);
724  m_ndb->theEventBuffer->remove_op();
725  m_state= EO_DROPPED;
726  mi_type= 0;
727  if (r == 0) {
728  if (m_stop_gci == 0)
729  {
730  // response from old kernel
731  Uint64 gci= m_ndb->theEventBuffer->m_highest_sub_gcp_complete_GCI;
732  if (gci)
733  {
734  // calculate a "safe" gci in the future to remove event op.
735  gci += Uint64(3) << 32;
736  }
737  else
738  {
739  // set highest value to ensure that operation does not get dropped
740  // too early. Note '-1' as ~Uint64(0) indicates active event
741  gci = ~Uint64(0)-1;
742  }
743  m_stop_gci = gci;
744  }
745  m_ndb->theEventBuffer->add_drop_unlock();
746  DBUG_RETURN(0);
747  }
748  //Error
749  m_error.code= NdbDictionaryImpl::getImpl(*myDict).m_error.code;
750  m_state= EO_ERROR;
751  m_ndb->theEventBuffer->add_drop_unlock();
752  DBUG_RETURN(r);
753 }
754 
756 {
757  return (bool)AlterTableReq::getNameFlag(m_change_mask);
758 }
759 
761 {
762  return (bool)AlterTableReq::getFrmFlag(m_change_mask);
763 }
764 
766 {
767  return (bool)AlterTableReq::getFragDataFlag(m_change_mask);
768 }
769 
771 {
772  return (bool)AlterTableReq::getRangeListFlag(m_change_mask);
773 }
774 
775 Uint64
776 NdbEventOperationImpl::getGCI()
777 {
778  Uint32 gci_hi = m_data_item->sdata->gci_hi;
779  Uint32 gci_lo = m_data_item->sdata->gci_lo;
780  return gci_lo | (Uint64(gci_hi) << 32);
781 }
782 
783 Uint32
785 {
786  return m_data_item->sdata->anyValue;
787 }
788 
789 Uint64
790 NdbEventOperationImpl::getLatestGCI()
791 {
792  return m_ndb->theEventBuffer->getLatestGCI();
793 }
794 
795 Uint64
797 {
798  /* Return 64 bit composite */
799  Uint32 transId1 = m_data_item->sdata->transId1;
800  Uint32 transId2 = m_data_item->sdata->transId2;
801  return Uint64(transId1) << 32 | transId2;
802 }
803 
804 bool
805 NdbEventOperationImpl::execSUB_TABLE_DATA(const NdbApiSignal * signal,
806  const LinearSectionPtr ptr[3])
807 {
808  DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA");
809  const SubTableData * const sdata=
810  CAST_CONSTPTR(SubTableData, signal->getDataPtr());
811 
812  if(signal->isFirstFragment()){
813  m_fragmentId = signal->getFragmentId();
814  m_buffer.grow(4 * sdata->totalLen);
815  } else {
816  if(m_fragmentId != signal->getFragmentId()){
817  abort();
818  }
819  }
820 
821  const Uint32 i = SubTableData::DICT_TAB_INFO;
822  DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u",
823  4 * ptr[i].sz, m_fragmentId));
824  m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
825 
826  if(!signal->isLastFragment()){
827  DBUG_RETURN(FALSE);
828  }
829 
830  DBUG_RETURN(TRUE);
831 }
832 
833 
834 int
835 NdbEventOperationImpl::receive_event()
836 {
837  Uint32 operation=
838  SubTableData::getOperation(m_data_item->sdata->requestInfo);
839  if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
840  {
841  DBUG_ENTER("NdbEventOperationImpl::receive_event");
842  DBUG_PRINT("info",("sdata->operation %u this: %p", operation, this));
843  m_ndb->theImpl->incClientStat(Ndb::NonDataEventsRecvdCount, 1);
844  if (operation == NdbDictionary::Event::_TE_ALTER)
845  {
846  // Parse the new table definition and
847  // create a table object
848  NdbDictInterface::Tx tx_unused;
849  NdbError error;
850  int warn;
851  NdbDictInterface dif(tx_unused, error, warn);
852  NdbTableImpl *at;
853  m_change_mask = m_data_item->sdata->changeMask;
854  error.code = dif.parseTableInfo(&at,
855  (Uint32*)m_buffer.get_data(),
856  m_buffer.length() / 4,
857  true);
858  m_buffer.clear();
859  if (unlikely(error.code))
860  {
861  DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u",
862  error.code));
863  ndbout_c("Failed to parse DictTabInfo error %u", error.code);
864  DBUG_RETURN(1);
865  }
866  at->buildColumnHash();
867 
868  NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
869  m_eventImpl->m_tableImpl = at;
870 
871  DBUG_PRINT("info", ("switching table impl 0x%lx -> 0x%lx",
872  (long) tmp_table_impl, (long) at));
873 
874  // change the rec attrs to refer to the new table object
875  int i;
876  for (i = 0; i < 2; i++)
877  {
878  NdbRecAttr *p = theFirstPkAttrs[i];
879  while (p)
880  {
881  int no = p->getColumn()->getColumnNo();
882  NdbColumnImpl *tAttrInfo = at->getColumn(no);
883  DBUG_PRINT("info", ("rec_attr: 0x%lx "
884  "switching column impl 0x%lx -> 0x%lx",
885  (long) p, (long) p->m_column, (long) tAttrInfo));
886  p->m_column = tAttrInfo;
887  p = p->next();
888  }
889  }
890  for (i = 0; i < 2; i++)
891  {
892  NdbRecAttr *p = theFirstDataAttrs[i];
893  while (p)
894  {
895  int no = p->getColumn()->getColumnNo();
896  NdbColumnImpl *tAttrInfo = at->getColumn(no);
897  DBUG_PRINT("info", ("rec_attr: 0x%lx "
898  "switching column impl 0x%lx -> 0x%lx",
899  (long) p, (long) p->m_column, (long) tAttrInfo));
900  p->m_column = tAttrInfo;
901  p = p->next();
902  }
903  }
904  // change the blobHandle's to refer to the new table object.
905  NdbBlob *p = theBlobList;
906  while (p)
907  {
908  int no = p->getColumn()->getColumnNo();
909  NdbColumnImpl *tAttrInfo = at->getColumn(no);
910  DBUG_PRINT("info", ("blob_handle: 0x%lx "
911  "switching column impl 0x%lx -> 0x%lx",
912  (long) p, (long) p->theColumn, (long) tAttrInfo));
913  p->theColumn = tAttrInfo;
914  p = p->next();
915  }
916  if (tmp_table_impl)
917  delete tmp_table_impl;
918  }
919  DBUG_RETURN(1);
920  }
921 
922  DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
923  DBUG_PRINT_EVENT("info",("sdata->operation %u this: %p", operation, this));
924  // now move the data into the RecAttrs
925  m_ndb->theImpl->incClientStat(Ndb::DataEventsRecvdCount, 1);
926 
927  int is_insert= operation == NdbDictionary::Event::_TE_INSERT;
928 
929  Uint32 *aAttrPtr = m_data_item->ptr[0].p;
930  Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
931  Uint32 *aDataPtr = m_data_item->ptr[1].p;
932 
933  DBUG_DUMP_EVENT("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
934  DBUG_DUMP_EVENT("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
935 
936  // copy data into the RecAttr's
937  // we assume that the respective attribute lists are sorted
938 
939  // first the pk's
940  {
941  NdbRecAttr *tAttr= theFirstPkAttrs[0];
942  NdbRecAttr *tAttr1= theFirstPkAttrs[1];
943  while(tAttr)
944  {
945  assert(aAttrPtr < aAttrEndPtr);
946  unsigned tDataSz= AttributeHeader(*aAttrPtr).getByteSize();
947  assert(tAttr->attrId() ==
948  AttributeHeader(*aAttrPtr).getAttributeId());
949  receive_data(tAttr, aDataPtr, tDataSz);
950  if (!is_insert)
951  receive_data(tAttr1, aDataPtr, tDataSz);
952  else
953  tAttr1->setUNDEFINED(); // do not leave unspecified
954  tAttr1= tAttr1->next();
955  // next
956  aAttrPtr++;
957  aDataPtr+= (tDataSz + 3) >> 2;
958  tAttr= tAttr->next();
959  }
960  }
961 
962  NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
963 
964  Uint32 tRecAttrId;
965  Uint32 tAttrId;
966  Uint32 tDataSz;
967  int hasSomeData= (operation != NdbDictionary::Event::_TE_UPDATE);
968  while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
969  tRecAttrId = tWorkingRecAttr->attrId();
970  tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
971  tDataSz = AttributeHeader(*aAttrPtr).getByteSize();
972 
973  while (tAttrId > tRecAttrId) {
974  DBUG_PRINT_EVENT("info",("undef [%u] %u 0x%x [%u] 0x%x",
975  tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
976  tWorkingRecAttr->setUNDEFINED();
977  tWorkingRecAttr = tWorkingRecAttr->next();
978  if (tWorkingRecAttr == NULL)
979  break;
980  tRecAttrId = tWorkingRecAttr->attrId();
981  }
982  if (tWorkingRecAttr == NULL)
983  break;
984 
985  if (tAttrId == tRecAttrId) {
986  hasSomeData=1;
987 
988  DBUG_PRINT_EVENT("info",("set [%u] %u 0x%x [%u] 0x%x",
989  tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
990 
991  receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
992  tWorkingRecAttr = tWorkingRecAttr->next();
993  }
994  aAttrPtr++;
995  aDataPtr += (tDataSz + 3) >> 2;
996  }
997 
998  while (tWorkingRecAttr != NULL) {
999  tRecAttrId = tWorkingRecAttr->attrId();
1000  //printf("set undefined [%u] %u %u [%u]\n",
1001  // tAttrId, tDataSz, *aDataPtr, tRecAttrId);
1002  tWorkingRecAttr->setUNDEFINED();
1003  tWorkingRecAttr = tWorkingRecAttr->next();
1004  }
1005 
1006  tWorkingRecAttr = theFirstDataAttrs[1];
1007  aDataPtr = m_data_item->ptr[2].p;
1008  Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
1009  while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
1010  tRecAttrId = tWorkingRecAttr->attrId();
1011  tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
1012  tDataSz = AttributeHeader(*aDataPtr).getByteSize();
1013  aDataPtr++;
1014  while (tAttrId > tRecAttrId) {
1015  tWorkingRecAttr->setUNDEFINED();
1016  tWorkingRecAttr = tWorkingRecAttr->next();
1017  if (tWorkingRecAttr == NULL)
1018  break;
1019  tRecAttrId = tWorkingRecAttr->attrId();
1020  }
1021  if (tWorkingRecAttr == NULL)
1022  break;
1023  if (tAttrId == tRecAttrId) {
1024  assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
1025  hasSomeData=1;
1026 
1027  receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
1028  tWorkingRecAttr = tWorkingRecAttr->next();
1029  }
1030  aDataPtr += (tDataSz + 3) >> 2;
1031  }
1032  while (tWorkingRecAttr != NULL) {
1033  tWorkingRecAttr->setUNDEFINED();
1034  tWorkingRecAttr = tWorkingRecAttr->next();
1035  }
1036 
1037  if (hasSomeData)
1038  {
1039  DBUG_RETURN_EVENT(1);
1040  }
1041 
1042  DBUG_RETURN_EVENT(0);
1043 }
1044 
1046 NdbEventOperationImpl::getEventType()
1047 {
1049  (1 << SubTableData::getOperation(m_data_item->sdata->requestInfo));
1050 }
1051 
1052 
1053 
1054 void
1055 NdbEventOperationImpl::print()
1056 {
1057  int i;
1058  ndbout << "EventId " << m_eventId << "\n";
1059 
1060  for (i = 0; i < 2; i++) {
1061  NdbRecAttr *p = theFirstPkAttrs[i];
1062  ndbout << " %u " << i;
1063  while (p) {
1064  ndbout << " : " << p->attrId() << " = " << *p;
1065  p = p->next();
1066  }
1067  ndbout << "\n";
1068  }
1069  for (i = 0; i < 2; i++) {
1070  NdbRecAttr *p = theFirstDataAttrs[i];
1071  ndbout << " %u " << i;
1072  while (p) {
1073  ndbout << " : " << p->attrId() << " = " << *p;
1074  p = p->next();
1075  }
1076  ndbout << "\n";
1077  }
1078 }
1079 
1080 void
1081 NdbEventOperationImpl::printAll()
1082 {
1083  Uint32 *aAttrPtr = m_data_item->ptr[0].p;
1084  Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
1085  Uint32 *aDataPtr = m_data_item->ptr[1].p;
1086 
1087  //tRecAttr->setup(tAttrInfo, aValue)) {
1088 
1089  Uint32 tAttrId;
1090  Uint32 tDataSz;
1091  for (; aAttrPtr < aAttrEndPtr; ) {
1092  tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
1093  tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
1094 
1095  aAttrPtr++;
1096  aDataPtr += tDataSz;
1097  }
1098 }
1099 
1100 /*
1101  * Class NdbEventBuffer
1102  * Each Ndb object has a Object.
1103  */
1104 NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
1105  m_total_buckets(TOTAL_BUCKETS_INIT),
1106  m_min_gci_index(0),
1107  m_max_gci_index(0),
1108  m_ndb(ndb),
1109  m_latestGCI(0), m_latest_complete_GCI(0),
1110  m_highest_sub_gcp_complete_GCI(0),
1111  m_latest_poll_GCI(0),
1112  m_total_alloc(0),
1113  m_free_thresh(0),
1114  m_min_free_thresh(0),
1115  m_max_free_thresh(0),
1116  m_gci_slip_thresh(0),
1117  m_dropped_ev_op(0),
1118  m_active_op_count(0),
1119  m_add_drop_mutex(0)
1120 {
1121 #ifdef VM_TRACE
1122  m_latest_command= "NdbEventBuffer::NdbEventBuffer";
1123  m_flush_gci = 0;
1124 #endif
1125 
1126  if ((p_cond = NdbCondition_Create()) == NULL) {
1127  ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
1128  exit(-1);
1129  }
1130  m_mutex = 0; // Set in Ndb::init()
1131 
1132  // ToDo set event buffer size
1133  // pre allocate event data array
1134  m_sz= 0;
1135 #ifdef VM_TRACE
1136  m_free_data_count= 0;
1137 #endif
1138  m_free_data= 0;
1139  m_free_data_sz= 0;
1140 
1141  // get reference to mutex managed by current connection
1142  m_add_drop_mutex=
1143  m_ndb->theImpl->m_ndb_cluster_connection.m_event_add_drop_mutex;
1144 
1145  // initialize lists
1146  bzero(&g_empty_gci_container, sizeof(Gci_container));
1147  init_gci_containers();
1148 
1149  m_alive_node_bit_mask.clear();
1150 }
1151 
1152 NdbEventBuffer::~NdbEventBuffer()
1153 {
1154  // todo lock? what if receive thread writes here?
1155  NdbEventOperationImpl* op= m_dropped_ev_op;
1156  while ((op = m_dropped_ev_op))
1157  {
1158  m_dropped_ev_op = m_dropped_ev_op->m_next;
1159  delete op->m_facade;
1160  }
1161 
1162  unsigned j;
1163  Uint32 sz= m_active_gci.size();
1164  Gci_container* array = (Gci_container*)m_active_gci.getBase();
1165  for(j = 0; j < sz; j++)
1166  {
1167  array[j].~Gci_container();
1168  }
1169 
1170  for (j= 0; j < m_allocated_data.size(); j++)
1171  {
1172  unsigned sz= m_allocated_data[j]->sz;
1173  EventBufData *data= m_allocated_data[j]->data;
1174  EventBufData *end_data= data+sz;
1175  for (; data < end_data; data++)
1176  {
1177  if (data->sdata)
1178  NdbMem_Free(data->sdata);
1179  }
1180  NdbMem_Free((char*)m_allocated_data[j]);
1181  }
1182 
1183  NdbCondition_Destroy(p_cond);
1184 }
1185 
1186 void
1187 NdbEventBuffer::add_op()
1188 {
1189  if(m_active_op_count == 0)
1190  {
1191  init_gci_containers();
1192  }
1193  m_active_op_count++;
1194 }
1195 
1196 void
1197 NdbEventBuffer::remove_op()
1198 {
1199  m_active_op_count--;
1200 }
1201 
1202 void
1203 NdbEventBuffer::init_gci_containers()
1204 {
1205  m_startup_hack = true;
1206  bzero(&m_complete_data, sizeof(m_complete_data));
1207  m_latest_complete_GCI = m_latestGCI = m_latest_poll_GCI = 0;
1208  m_active_gci.clear();
1209  m_active_gci.fill(3, g_empty_gci_container);
1210  m_min_gci_index = m_max_gci_index = 1;
1211  Uint64 gci = 0;
1212  m_known_gci.clear();
1213  m_known_gci.fill(7, gci);
1214 }
1215 
1216 int NdbEventBuffer::expand(unsigned sz)
1217 {
1218  unsigned alloc_size=
1219  sizeof(EventBufData_chunk) +(sz-1)*sizeof(EventBufData);
1220  EventBufData_chunk *chunk_data=
1221  (EventBufData_chunk *)NdbMem_Allocate(alloc_size);
1222 
1223  chunk_data->sz= sz;
1224  m_allocated_data.push_back(chunk_data);
1225 
1226  EventBufData *data= chunk_data->data;
1227  EventBufData *end_data= data+sz;
1228  EventBufData *last_data= m_free_data;
1229 
1230  bzero((void*)data, sz*sizeof(EventBufData));
1231  for (; data < end_data; data++)
1232  {
1233  data->m_next= last_data;
1234  last_data= data;
1235  }
1236  m_free_data= last_data;
1237 
1238  m_sz+= sz;
1239 #ifdef VM_TRACE
1240  m_free_data_count+= sz;
1241 #endif
1242  return 0;
1243 }
1244 
1245 int
1246 NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
1247 {
1248  int ret= 1;
1249 #ifdef VM_TRACE
1250  const char *m_latest_command_save= m_latest_command;
1251  m_latest_command= "NdbEventBuffer::pollEvents";
1252 #endif
1253 
1254  NdbMutex_Lock(m_mutex);
1255  NdbEventOperationImpl *ev_op= move_data();
1256  if (unlikely(ev_op == 0 && aMillisecondNumber))
1257  {
1258  NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
1259  ev_op= move_data();
1260  if (unlikely(ev_op == 0))
1261  ret= 0;
1262  }
1263  m_latest_poll_GCI= m_latestGCI;
1264 #ifdef VM_TRACE
1265  if (ev_op)
1266  {
1267  // m_mutex is locked
1268  // update event ops data counters
1269  ev_op->m_data_count-= ev_op->m_data_done_count;
1270  ev_op->m_data_done_count= 0;
1271  }
1272  m_latest_command= m_latest_command_save;
1273 #endif
1274  if (unlikely(ev_op == 0))
1275  {
1276  /*
1277  gci's consumed up until m_latest_poll_GCI, so we can free all
1278  dropped event operations stopped up until that gci
1279  */
1280  deleteUsedEventOperations(m_latest_poll_GCI);
1281  }
1282  NdbMutex_Unlock(m_mutex); // we have moved the data
1283 
1284  if (latestGCI)
1285  *latestGCI= m_latest_poll_GCI;
1286 
1287  return ret;
1288 }
1289 
1290 int
1292 {
1296  Uint64 * array = m_known_gci.getBase();
1297  Uint32 mask = m_known_gci.size() - 1;
1298  Uint32 minpos = m_min_gci_index;
1299  Uint32 maxpos = m_max_gci_index;
1300 
1301  g_eventLogger->info("Flushing incomplete GCI:s < %u/%u",
1302  Uint32(gci >> 32), Uint32(gci));
1303  while (minpos != maxpos && array[minpos] < gci)
1304  {
1305  Gci_container* tmp = find_bucket(array[minpos]);
1306  assert(tmp);
1307  assert(maxpos == m_max_gci_index);
1308 
1309  if(!tmp->m_data.is_empty())
1310  {
1311  free_list(tmp->m_data);
1312  }
1313  tmp->~Gci_container();
1314  bzero(tmp, sizeof(Gci_container));
1315  minpos = (minpos + 1) & mask;
1316  }
1317 
1318  m_min_gci_index = minpos;
1319 
1320 #ifdef VM_TRACE
1321  m_flush_gci = gci;
1322 #endif
1323 
1324  return 0;
1325 }
1326 
1328 NdbEventBuffer::nextEvent()
1329 {
1330  DBUG_ENTER_EVENT("NdbEventBuffer::nextEvent");
1331 #ifdef VM_TRACE
1332  const char *m_latest_command_save= m_latest_command;
1333 #endif
1334 
1335  if (m_used_data.m_count > 1024)
1336  {
1337 #ifdef VM_TRACE
1338  m_latest_command= "NdbEventBuffer::nextEvent (lock)";
1339 #endif
1340  NdbMutex_Lock(m_mutex);
1341  // return m_used_data to m_free_data
1342  free_list(m_used_data);
1343 
1344  NdbMutex_Unlock(m_mutex);
1345  }
1346 #ifdef VM_TRACE
1347  m_latest_command= "NdbEventBuffer::nextEvent";
1348 #endif
1349 
1350  EventBufData *data;
1351  Uint64 gci= 0;
1352  while ((data= m_available_data.m_head))
1353  {
1354  NdbEventOperationImpl *op= data->m_event_op;
1355 
1356  /*
1357  * The data was not associated with an event operation,
1358  * possibly a dummy event list marking missing data
1359  */
1360  if (!op && !isConsistent(gci))
1361  {
1362  DBUG_PRINT_EVENT("info", ("detected inconsistent gci %u", gci));
1363  DBUG_RETURN_EVENT(0);
1364  }
1365 
1366  DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op));
1367 
1368  /*
1369  * If merge is on, blob part sub-events must not be seen on this level.
1370  * If merge is not on, there are no blob part sub-events.
1371  */
1372  assert(op->theMainOp == NULL);
1373 
1374  // set NdbEventOperation data
1375  op->m_data_item= data;
1376 
1377  // remove item from m_available_data and return size
1378  Uint32 full_count, full_sz;
1379  m_available_data.remove_first(full_count, full_sz);
1380 
1381  // add it to used list
1382  m_used_data.append_used_data(data, full_count, full_sz);
1383 
1384  m_ndb->theImpl->incClientStat(Ndb::EventBytesRecvdCount, full_sz);
1385 
1386 #ifdef VM_TRACE
1387  op->m_data_done_count++;
1388 #endif
1389 
1390  if (op->m_state == NdbEventOperation::EO_EXECUTING)
1391  {
1392  int r= op->receive_event();
1393  if (r > 0)
1394  {
1395 #ifdef VM_TRACE
1396  m_latest_command= m_latest_command_save;
1397 #endif
1398  NdbBlob* tBlob = op->theBlobList;
1399  while (tBlob != NULL)
1400  {
1401  (void)tBlob->atNextEvent();
1402  tBlob = tBlob->theNext;
1403  }
1404  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1405  while (gci_ops && op->getGCI() > gci_ops->m_gci)
1406  {
1407  gci_ops = m_available_data.delete_next_gci_ops();
1408  }
1409  if (!gci_ops->m_consistent)
1410  DBUG_RETURN_EVENT(0);
1411  assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
1412  // to return TE_NUL it should be made into data event
1413  if (SubTableData::getOperation(data->sdata->requestInfo) ==
1414  NdbDictionary::Event::_TE_NUL)
1415  {
1416  DBUG_PRINT_EVENT("info", ("skip _TE_NUL"));
1417  continue;
1418  }
1419  DBUG_RETURN_EVENT(op->m_facade);
1420  }
1421  // the next event belonged to an event op that is no
1422  // longer valid, skip to next
1423  continue;
1424  }
1425 #ifdef VM_TRACE
1426  m_latest_command= m_latest_command_save;
1427 #endif
1428  }
1429  m_error.code= 0;
1430 #ifdef VM_TRACE
1431  m_latest_command= m_latest_command_save;
1432 #endif
1433 
1434  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1435  while (gci_ops)
1436  {
1437  gci_ops = m_available_data.delete_next_gci_ops();
1438  }
1439  /*
1440  gci's consumed up until m_latest_poll_GCI, so we can free all
1441  dropped event operations stopped up until that gci
1442  */
1443  if (m_dropped_ev_op)
1444  {
1445  NdbMutex_Lock(m_mutex);
1446  deleteUsedEventOperations(m_latest_poll_GCI);
1447  NdbMutex_Unlock(m_mutex);
1448  }
1449  DBUG_RETURN_EVENT(0);
1450 }
1451 
1452 bool
1453 NdbEventBuffer::isConsistent(Uint64& gci)
1454 {
1455  DBUG_ENTER("NdbEventBuffer::isConsistent");
1456  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1457  while (gci_ops)
1458  {
1459  if (!gci_ops->m_consistent)
1460  {
1461  gci = gci_ops->m_gci;
1462  DBUG_RETURN(false);
1463  }
1464  gci_ops = gci_ops->m_next;
1465  }
1466 
1467  DBUG_RETURN(true);
1468 }
1469 
1470 bool
1471 NdbEventBuffer::isConsistentGCI(Uint64 gci)
1472 {
1473  DBUG_ENTER("NdbEventBuffer::isConsistentGCI");
1474  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1475  while (gci_ops)
1476  {
1477  if (gci_ops->m_gci == gci && !gci_ops->m_consistent)
1478  DBUG_RETURN(false);
1479  gci_ops = gci_ops->m_next;
1480  }
1481 
1482  DBUG_RETURN(true);
1483 }
1484 
1485 
1487 NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
1488 {
1489  DBUG_ENTER("NdbEventBuffer::getGCIEventOperations");
1490  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
1491  if (*iter < gci_ops->m_gci_op_count)
1492  {
1493  EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
1494  if (event_types != NULL)
1495  *event_types = g.event_types;
1496  DBUG_PRINT("info", ("gci: %u g.op: 0x%lx g.event_types: 0x%lx",
1497  (unsigned)gci_ops->m_gci, (long) g.op,
1498  (long) g.event_types));
1499  DBUG_RETURN(g.op);
1500  }
1501  DBUG_RETURN(NULL);
1502 }
1503 
1504 void
1505 NdbEventBuffer::deleteUsedEventOperations(Uint64 last_consumed_gci)
1506 {
1507  NdbEventOperationImpl *op= m_dropped_ev_op;
1508  while (op && op->m_stop_gci)
1509  {
1510  if (last_consumed_gci > op->m_stop_gci)
1511  {
1512  while (op)
1513  {
1514  NdbEventOperationImpl *next_op= op->m_next;
1515  op->m_stop_gci= 0;
1516  op->m_ref_count--;
1517  if (op->m_ref_count == 0)
1518  {
1519  if (op->m_next)
1520  op->m_next->m_prev = op->m_prev;
1521  if (op->m_prev)
1522  op->m_prev->m_next = op->m_next;
1523  else
1524  m_dropped_ev_op = op->m_next;
1525  delete op->m_facade;
1526  }
1527  op = next_op;
1528  }
1529  break;
1530  }
1531  op = op->m_next;
1532  }
1533 }
1534 
1535 #ifdef VM_TRACE
1536 static
1537 NdbOut&
1538 operator<<(NdbOut& out, const Gci_container& gci)
1539 {
1540  out << "[ GCI: " << (gci.m_gci >> 32) << "/" << (gci.m_gci & 0xFFFFFFFF)
1541  << " state: " << hex << gci.m_state
1542  << " head: " << hex << gci.m_data.m_head
1543  << " tail: " << hex << gci.m_data.m_tail
1544 #ifdef VM_TRACE
1545  << " cnt: " << dec << gci.m_data.m_count
1546 #endif
1547  << " gcp: " << dec << gci.m_gcp_complete_rep_count
1548  << "]";
1549  return out;
1550 }
1551 
1552 static
1553 NdbOut&
1554 operator<<(NdbOut& out, const Gci_container_pod& gci)
1555 {
1556  Gci_container* ptr = (Gci_container*)&gci;
1557  out << *ptr;
1558  return out;
1559 }
1560 #endif
1561 
1562 void
1563 NdbEventBuffer::resize_known_gci()
1564 {
1565  Uint32 minpos = m_min_gci_index;
1566  Uint32 maxpos = m_max_gci_index;
1567  Uint32 mask = m_known_gci.size() - 1;
1568 
1569  Uint64 fill = 0;
1570  Uint32 newsize = 2 * (mask + 1);
1571  m_known_gci.fill(newsize - 1, fill);
1572  Uint64 * array = m_known_gci.getBase();
1573 
1574  if (0)
1575  {
1576  printf("before (%u): ", minpos);
1577  for (Uint32 i = minpos; i != maxpos; i = (i + 1) & mask)
1578  printf("%u/%u ",
1579  Uint32(array[i] >> 32),
1580  Uint32(array[i]));
1581  printf("\n");
1582  }
1583 
1584  Uint32 idx = mask + 1; // Store eveything in "new" part of buffer
1585  if (0) printf("swapping ");
1586  while (minpos != maxpos)
1587  {
1588  if (0) printf("%u-%u ", minpos, idx);
1589  Uint64 tmp = array[idx];
1590  array[idx] = array[minpos];
1591  array[minpos] = tmp;
1592 
1593  idx++;
1594  minpos = (minpos + 1) & mask; // NOTE old mask
1595  }
1596  if (0) printf("\n");
1597 
1598  minpos = m_min_gci_index = mask + 1;
1599  maxpos = m_max_gci_index = idx;
1600  assert(minpos < maxpos);
1601 
1602  if (0)
1603  {
1604  ndbout_c("resize_known_gci from %u to %u", (mask + 1), newsize);
1605  printf("after: ");
1606  for (Uint32 i = minpos; i < maxpos; i++)
1607  {
1608  printf("%u/%u ",
1609  Uint32(array[i] >> 32),
1610  Uint32(array[i]));
1611  }
1612  printf("\n");
1613  }
1614 
1615 #ifdef VM_TRACE
1616  Uint64 gci = array[minpos];
1617  for (Uint32 i = minpos + 1; i<maxpos; i++)
1618  {
1619  assert(array[i] > gci);
1620  gci = array[i];
1621  }
1622 #endif
1623 }
1624 
1625 #ifdef VM_TRACE
1626 void
1627 NdbEventBuffer::verify_known_gci(bool allowempty)
1628 {
1629  Uint32 minpos = m_min_gci_index;
1630  Uint32 maxpos = m_max_gci_index;
1631  Uint32 mask = m_known_gci.size() - 1;
1632 
1633  Uint32 line;
1634 #define MMASSERT(x) { if (!(x)) { line = __LINE__; goto fail; }}
1635  if (m_min_gci_index == m_max_gci_index)
1636  {
1637  MMASSERT(allowempty);
1638  for (Uint32 i = 0; i<m_active_gci.size(); i++)
1639  MMASSERT(((Gci_container*)(m_active_gci.getBase()+i))->m_gci == 0);
1640  return;
1641  }
1642 
1643  {
1644  Uint64 last = m_known_gci[minpos];
1645  MMASSERT(last > m_latestGCI);
1646  MMASSERT(find_bucket(last) != 0);
1647  MMASSERT(maxpos == m_max_gci_index);
1648 
1649  minpos = (minpos + 1) & mask;
1650  while (minpos != maxpos)
1651  {
1652  MMASSERT(m_known_gci[minpos] > last);
1653  last = m_known_gci[minpos];
1654  MMASSERT(find_bucket(last) != 0);
1655  MMASSERT(maxpos == m_max_gci_index);
1656  minpos = (minpos + 1) & mask;
1657  }
1658  }
1659 
1660  {
1661  Gci_container* bucktets = (Gci_container*)(m_active_gci.getBase());
1662  for (Uint32 i = 0; i<m_active_gci.size(); i++)
1663  {
1664  if (bucktets[i].m_gci)
1665  {
1666  bool found = false;
1667  for (Uint32 j = m_min_gci_index; j != m_max_gci_index;
1668  j = (j + 1) & mask)
1669  {
1670  if (m_known_gci[j] == bucktets[i].m_gci)
1671  {
1672  found = true;
1673  break;
1674  }
1675  }
1676  if (!found)
1677  ndbout_c("%u/%u not found",
1678  Uint32(bucktets[i].m_gci >> 32),
1679  Uint32(bucktets[i].m_gci));
1680  MMASSERT(found == true);
1681  }
1682  }
1683  }
1684 
1685  return;
1686 fail:
1687  ndbout_c("assertion at %d", line);
1688  printf("known gci: ");
1689  for (Uint32 i = m_min_gci_index; i != m_max_gci_index; i = (i + 1) & mask)
1690  {
1691  printf("%u/%u ", Uint32(m_known_gci[i] >> 32), Uint32(m_known_gci[i]));
1692  }
1693 
1694  printf("\nContainers");
1695  for (Uint32 i = 0; i<m_active_gci.size(); i++)
1696  ndbout << m_active_gci[i] << endl;
1697  abort();
1698 }
1699 #endif
1700 
1702 NdbEventBuffer::find_bucket_chained(Uint64 gci)
1703 {
1704  if (0)
1705  printf("find_bucket_chained(%u/%u) ", Uint32(gci >> 32), Uint32(gci));
1706  if (unlikely(gci <= m_latestGCI))
1707  {
1711  if (0)
1712  ndbout_c("already complete (%u/%u)",
1713  Uint32(m_latestGCI >> 32),
1714  Uint32(m_latestGCI));
1715  return 0;
1716  }
1717 
1718  if (unlikely(m_total_buckets == 0))
1719  {
1720  return 0;
1721  }
1722 
1723  Uint32 pos = Uint32(gci & ACTIVE_GCI_MASK);
1724  Uint32 size = m_active_gci.size();
1725  Gci_container *buckets = (Gci_container*)(m_active_gci.getBase());
1726  while (pos < size)
1727  {
1728  Uint64 cmp = (buckets + pos)->m_gci;
1729  if (cmp == gci)
1730  {
1731  if (0)
1732  ndbout_c("found pos: %u", pos);
1733  return buckets + pos;
1734  }
1735 
1736  if (cmp == 0)
1737  {
1738  if (0)
1739  ndbout_c("empty(%u) ", pos);
1740  Uint32 search = pos + ACTIVE_GCI_DIRECTORY_SIZE;
1741  while (search < size)
1742  {
1743  if ((buckets + search)->m_gci == gci)
1744  {
1745  memcpy(buckets + pos, buckets + search, sizeof(Gci_container));
1746  bzero(buckets + search, sizeof(Gci_container));
1747  if (0)
1748  printf("moved from %u to %u", search, pos);
1749  if (search == size - 1)
1750  {
1751  m_active_gci.erase(search);
1752  if (0)
1753  ndbout_c(" shrink");
1754  }
1755  else
1756  {
1757  if (0)
1758  printf("\n");
1759  }
1760  return buckets + pos;
1761  }
1762  search += ACTIVE_GCI_DIRECTORY_SIZE;
1763  }
1764  goto newbucket;
1765  }
1766  pos += ACTIVE_GCI_DIRECTORY_SIZE;
1767  }
1768 
1772  if (0)
1773  ndbout_c("new (with expand) ");
1774  m_active_gci.fill(pos, g_empty_gci_container);
1775  buckets = (Gci_container*)(m_active_gci.getBase());
1776 newbucket:
1777  Gci_container* bucket = buckets + pos;
1778  bucket->m_gci = gci;
1779  bucket->m_gcp_complete_rep_count = m_total_buckets;
1780 
1781  Uint32 mask = m_known_gci.size() - 1;
1782  Uint64 * array = m_known_gci.getBase();
1783 
1784  Uint32 minpos = m_min_gci_index;
1785  Uint32 maxpos = m_max_gci_index;
1786  bool full = ((maxpos + 1) & mask) == minpos;
1787  if (unlikely(full))
1788  {
1789  resize_known_gci();
1790  minpos = m_min_gci_index;
1791  maxpos = m_max_gci_index;
1792  mask = m_known_gci.size() - 1;
1793  array = m_known_gci.getBase();
1794  }
1795 
1796  Uint32 maxindex = (maxpos - 1) & mask;
1797  Uint32 newmaxpos = (maxpos + 1) & mask;
1798  m_max_gci_index = newmaxpos;
1799  if (likely(minpos == maxpos || gci > array[maxindex]))
1800  {
1801  array[maxpos] = gci;
1802 #ifdef VM_TRACE
1803  verify_known_gci(false);
1804 #endif
1805  return bucket;
1806  }
1807 
1808  for (pos = minpos; pos != maxpos; pos = (pos + 1) & mask)
1809  {
1810  if (array[pos] > gci)
1811  break;
1812  }
1813 
1814  if (0)
1815  ndbout_c("insert %u/%u (max %u/%u) at pos %u (min: %u max: %u)",
1816  Uint32(gci >> 32),
1817  Uint32(gci),
1818  Uint32(array[maxindex] >> 32),
1819  Uint32(array[maxindex]),
1820  pos,
1821  m_min_gci_index, m_max_gci_index);
1822 
1823  assert(pos != maxpos);
1824  Uint64 oldgci;
1825  do {
1826  oldgci = array[pos];
1827  array[pos] = gci;
1828  gci = oldgci;
1829  pos = (pos + 1) & mask;
1830  } while (pos != maxpos);
1831  array[pos] = gci;
1832 
1833 #ifdef VM_TRACE
1834  verify_known_gci(false);
1835 #endif
1836  return bucket;
1837 }
1838 
1839 static
1840 void
1841 crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
1842  const SubGcpCompleteRep * const rep,
1843  Uint32 buckets)
1844 {
1845  Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
1846 
1847  ndbout_c("INVALID SUB_GCP_COMPLETE_REP");
1848  ndbout_c("gci_hi: %u", rep->gci_hi);
1849  ndbout_c("gci_lo: %u", rep->gci_lo);
1850  ndbout_c("sender: %x", rep->senderRef);
1851  ndbout_c("count: %d", rep->gcp_complete_rep_count);
1852  ndbout_c("bucket count: %u", old_cnt);
1853  ndbout_c("total buckets: %u", buckets);
1854  abort();
1855 }
1856 
1857 void
1858 NdbEventBuffer::complete_bucket(Gci_container* bucket)
1859 {
1860  Uint64 gci = bucket->m_gci;
1861  Gci_container* buckets = (Gci_container*)m_active_gci.getBase();
1862 
1863  if (0)
1864  ndbout_c("complete %u/%u pos: %u", Uint32(gci >> 32), Uint32(gci),
1865  Uint32(bucket - buckets));
1866 
1867 #ifdef VM_TRACE
1868  verify_known_gci(false);
1869 #endif
1870 
1874  if(!bucket->m_data.is_empty())
1875  {
1876 #ifdef VM_TRACE
1877  assert(bucket->m_data.m_count);
1878 #endif
1879  m_complete_data.m_data.append_list(&bucket->m_data, gci);
1880  if (bucket->m_state & Gci_container::GC_INCONSISTENT)
1881  {
1882  /*
1883  * Bucket marked as possibly missing data, probably due to
1884  * kernel running out of event_buffer during node failure.
1885  * Mark newly appended event list as inconsistent.
1886  */
1887  assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
1888  m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
1889  }
1890  }
1891  else // if (bucket->m_data.is_empty())
1892  {
1893  if (bucket->m_state & Gci_container::GC_INCONSISTENT)
1894  {
1895  /*
1896  * Bucket marked as possibly missing data, probably due to
1897  * kernel running out of event_buffer during node failure
1898  * Bucket contained no data so we must add a dummy event list
1899  * as inconsistency marker.
1900  */
1901  EventBufData *dummy_data= alloc_data();
1902  EventBufData_list *dummy_event_list = new EventBufData_list;
1903  dummy_event_list->append_used_data(dummy_data);
1904  dummy_event_list->m_is_not_multi_list = true;
1905  m_complete_data.m_data.append_list(dummy_event_list, gci);
1906  assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
1907  m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
1908  }
1909  }
1910 
1911  Uint32 minpos = m_min_gci_index;
1912  Uint32 mask = m_known_gci.size() - 1;
1913  assert((mask & (mask + 1)) == 0);
1914 
1915  bzero(bucket, sizeof(Gci_container));
1916 
1917  m_min_gci_index = (minpos + 1) & mask;
1918 
1919 #ifdef VM_TRACE
1920  verify_known_gci(true);
1921 #endif
1922 }
1923 
1924 void
1926  Uint32 len, int complete_cluster_failure)
1927 {
1928  Uint32 gci_hi = rep->gci_hi;
1929  Uint32 gci_lo = rep->gci_lo;
1930 
1931  if (unlikely(len < SubGcpCompleteRep::SignalLength))
1932  {
1933  gci_lo = 0;
1934  }
1935 
1936  const Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
1937  if (gci > m_highest_sub_gcp_complete_GCI)
1938  m_highest_sub_gcp_complete_GCI = gci;
1939 
1940  if (!complete_cluster_failure)
1941  {
1942  m_alive_node_bit_mask.set(refToNode(rep->senderRef));
1943 
1944  if (unlikely(m_active_op_count == 0))
1945  {
1946  return;
1947  }
1948  }
1949 
1950  DBUG_ENTER_EVENT("NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
1951 
1952  const Uint32 cnt= rep->gcp_complete_rep_count;
1953 
1954  Gci_container *bucket = find_bucket(gci);
1955 
1956  if (0)
1957  ndbout_c("execSUB_GCP_COMPLETE_REP(%u/%u) cnt: %u from %x flags: 0x%x",
1958  Uint32(gci >> 32), Uint32(gci), cnt, rep->senderRef,
1959  rep->flags);
1960 
1961  if (unlikely(rep->flags & (SubGcpCompleteRep::ADD_CNT |
1962  SubGcpCompleteRep::SUB_CNT)))
1963  {
1964  handle_change_nodegroup(rep);
1965  }
1966 
1967  if (unlikely(bucket == 0))
1968  {
1973 #ifdef VM_TRACE
1974  Uint64 minGCI = m_known_gci[m_min_gci_index];
1975  ndbout_c("bucket == 0, gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
1976  Uint32(gci >> 32), Uint32(gci),
1977  Uint32(minGCI >> 32), Uint32(minGCI),
1978  Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
1979  ndbout << " complete: " << m_complete_data << endl;
1980  for(Uint32 i = 0; i<m_active_gci.size(); i++)
1981  {
1982  if (((Gci_container*)(&m_active_gci[i]))->m_gci)
1983  ndbout << i << " - " << m_active_gci[i] << endl;
1984  }
1985 #endif
1986  DBUG_VOID_RETURN_EVENT;
1987  }
1988 
1989  if (rep->flags & SubGcpCompleteRep::MISSING_DATA)
1990  {
1991  bucket->m_state = Gci_container::GC_INCONSISTENT;
1992  }
1993 
1994  Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
1995  if(unlikely(old_cnt == ~(Uint32)0))
1996  {
1997  old_cnt = m_total_buckets;
1998  }
1999 
2000  //assert(old_cnt >= cnt);
2001  if (unlikely(! (old_cnt >= cnt)))
2002  {
2003  crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, m_total_buckets);
2004  }
2005  bucket->m_gcp_complete_rep_count = old_cnt - cnt;
2006 
2007  if(old_cnt == cnt)
2008  {
2009  Uint64 minGCI = m_known_gci[m_min_gci_index];
2010  if(likely(minGCI == 0 || gci == minGCI))
2011  {
2012  do_complete:
2013  m_startup_hack = false;
2014  complete_bucket(bucket);
2015  m_latestGCI = m_complete_data.m_gci = gci; // before reportStatus
2016  reportStatus();
2017 
2018  if(unlikely(m_latest_complete_GCI > gci))
2019  {
2020  complete_outof_order_gcis();
2021  }
2022 
2023  // signal that somethings happened
2024 
2025  NdbCondition_Signal(p_cond);
2026  }
2027  else
2028  {
2029  if (unlikely(m_startup_hack))
2030  {
2031  flushIncompleteEvents(gci);
2032  bucket = find_bucket(gci);
2033  assert(bucket);
2034  assert(bucket->m_gci == gci);
2035  goto do_complete;
2036  }
2038  g_eventLogger->info("out of order bucket: %d gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
2039  (int)(bucket-(Gci_container*)m_active_gci.getBase()),
2040  Uint32(gci >> 32), Uint32(gci),
2041  Uint32(minGCI >> 32), Uint32(minGCI),
2042  Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
2043  bucket->m_state = Gci_container::GC_COMPLETE;
2044  bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
2045  m_latest_complete_GCI = gci;
2046  }
2047  }
2048 
2049  DBUG_VOID_RETURN_EVENT;
2050 }
2051 
2052 void
2053 NdbEventBuffer::complete_outof_order_gcis()
2054 {
2055 #ifdef VM_TRACE
2056  verify_known_gci(false);
2057 #endif
2058 
2059  Uint64 * array = m_known_gci.getBase();
2060  Uint32 mask = m_known_gci.size() - 1;
2061  Uint32 minpos = m_min_gci_index;
2062  Uint32 maxpos = m_max_gci_index;
2063  Uint64 stop_gci = m_latest_complete_GCI;
2064 
2065  Uint64 start_gci = array[minpos];
2066  g_eventLogger->info("complete_outof_order_gcis from: %u/%u(%u) to: %u/%u(%u)",
2067  Uint32(start_gci >> 32), Uint32(start_gci), minpos,
2068  Uint32(stop_gci >> 32), Uint32(stop_gci), maxpos);
2069 
2070  assert(start_gci <= stop_gci);
2071  do
2072  {
2073  start_gci = array[minpos];
2074  Gci_container* bucket = find_bucket(start_gci);
2075  assert(bucket);
2076  assert(maxpos == m_max_gci_index);
2077  if (!(bucket->m_state & Gci_container::GC_COMPLETE)) // Not complete
2078  {
2079 #ifdef VM_TRACE
2080  verify_known_gci(false);
2081 #endif
2082  return;
2083  }
2084 
2085 #ifdef VM_TRACE
2086  ndbout_c("complete_outof_order_gcis - completing %u/%u rows: %u",
2087  Uint32(start_gci >> 32), Uint32(start_gci), bucket->m_data.m_count);
2088 #else
2089  ndbout_c("complete_outof_order_gcis - completing %u/%u",
2090  Uint32(start_gci >> 32), Uint32(start_gci));
2091 #endif
2092 
2093  complete_bucket(bucket);
2094  m_latestGCI = m_complete_data.m_gci = start_gci;
2095 
2096 #ifdef VM_TRACE
2097  verify_known_gci(true);
2098 #endif
2099  minpos = (minpos + 1) & mask;
2100  } while (start_gci != stop_gci);
2101 }
2102 
2103 void
2104 NdbEventBuffer::insert_event(NdbEventOperationImpl* impl,
2105  SubTableData &data,
2106  LinearSectionPtr *ptr,
2107  Uint32 &oid_ref)
2108 {
2109  DBUG_PRINT("info", ("gci{hi/lo}: %u/%u", data.gci_hi, data.gci_lo));
2110  do
2111  {
2112  if (impl->m_stop_gci == ~Uint64(0))
2113  {
2114  oid_ref = impl->m_oid;
2115  insertDataL(impl, &data, SubTableData::SignalLength, ptr);
2116  }
2117  NdbEventOperationImpl* blob_op = impl->theBlobOpList;
2118  while (blob_op != NULL)
2119  {
2120  if (blob_op->m_stop_gci == ~Uint64(0))
2121  {
2122  oid_ref = blob_op->m_oid;
2123  insertDataL(blob_op, &data, SubTableData::SignalLength, ptr);
2124  }
2125  blob_op = blob_op->m_next;
2126  }
2127  } while((impl = impl->m_next));
2128 }
2129 
2130 bool
2131 NdbEventBuffer::find_max_known_gci(Uint64 * res) const
2132 {
2133  const Uint64 * array = m_known_gci.getBase();
2134  Uint32 mask = m_known_gci.size() - 1;
2135  Uint32 minpos = m_min_gci_index;
2136  Uint32 maxpos = m_max_gci_index;
2137 
2138  if (minpos == maxpos)
2139  return false;
2140 
2141  if (res)
2142  {
2143  * res = array[(maxpos - 1) & mask];
2144  }
2145 
2146  return true;
2147 }
2148 
2149 void
2150 NdbEventBuffer::handle_change_nodegroup(const SubGcpCompleteRep* rep)
2151 {
2152  Uint64 gci = (Uint64(rep->gci_hi) << 32) | rep->gci_lo;
2153  Uint32 cnt = (rep->flags >> 16);
2154  Uint64 * array = m_known_gci.getBase();
2155  Uint32 mask = m_known_gci.size() - 1;
2156  Uint32 minpos = m_min_gci_index;
2157  Uint32 maxpos = m_max_gci_index;
2158 
2159  if (rep->flags & SubGcpCompleteRep::ADD_CNT)
2160  {
2161  ndbout_c("handle_change_nodegroup(add, cnt=%u,gci=%u/%u)",
2162  cnt, Uint32(gci >> 32), Uint32(gci));
2163 
2164  Uint32 found = 0;
2165  Uint32 pos = minpos;
2166  for (; pos != maxpos; pos = (pos + 1) & mask)
2167  {
2168  if (array[pos] == gci)
2169  {
2170  Gci_container* tmp = find_bucket(array[pos]);
2171  if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2172  {
2173  found = 1;
2174  ndbout_c(" - gci %u/%u already marked complete",
2175  Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2176  break;
2177  }
2178  else
2179  {
2180  found = 2;
2181  ndbout_c(" - gci %u/%u marking (and increasing)",
2182  Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2183  tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2184  tmp->m_gcp_complete_rep_count += cnt;
2185  break;
2186  }
2187  }
2188  else
2189  {
2190  ndbout_c(" - ignore %u/%u",
2191  Uint32(array[pos] >> 32), Uint32(array[pos]));
2192  }
2193  }
2194 
2195  if (found == 0)
2196  {
2197  ndbout_c(" - NOT FOUND (total: %u cnt: %u)", m_total_buckets, cnt);
2198  return;
2199  }
2200 
2201  if (found == 1)
2202  {
2203  return; // Nothing todo
2204  }
2205 
2206  m_total_buckets += cnt;
2207 
2208  pos = (pos + 1) & mask;
2209  for (; pos != maxpos; pos = (pos + 1) & mask)
2210  {
2211  assert(array[pos] > gci);
2212  Gci_container* tmp = find_bucket(array[pos]);
2213  assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2214  tmp->m_gcp_complete_rep_count += cnt;
2215  ndbout_c(" - increasing cnt on %u/%u by %u",
2216  Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci), cnt);
2217  }
2218  }
2219  else if (rep->flags & SubGcpCompleteRep::SUB_CNT)
2220  {
2221  ndbout_c("handle_change_nodegroup(sub, cnt=%u,gci=%u/%u)",
2222  cnt, Uint32(gci >> 32), Uint32(gci));
2223 
2224  Uint32 found = 0;
2225  Uint32 pos = minpos;
2226  for (; pos != maxpos; pos = (pos + 1) & mask)
2227  {
2228  if (array[pos] == gci)
2229  {
2230  Gci_container* tmp = find_bucket(array[pos]);
2231  if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2232  {
2233  found = 1;
2234  ndbout_c(" - gci %u/%u already marked complete",
2235  Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2236  break;
2237  }
2238  else
2239  {
2240  found = 2;
2241  ndbout_c(" - gci %u/%u marking",
2242  Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2243  tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2244  break;
2245  }
2246  }
2247  else
2248  {
2249  ndbout_c(" - ignore %u/%u",
2250  Uint32(array[pos] >> 32), Uint32(array[pos]));
2251  }
2252  }
2253 
2254  if (found == 0)
2255  {
2256  ndbout_c(" - NOT FOUND");
2257  return;
2258  }
2259 
2260  if (found == 1)
2261  {
2262  return; // Nothing todo
2263  }
2264 
2265  m_total_buckets -= cnt;
2266 
2267  pos = (pos + 1) & mask;
2268  for (; pos != maxpos; pos = (pos + 1) & mask)
2269  {
2270  assert(array[pos] > gci);
2271  Gci_container* tmp = find_bucket(array[pos]);
2272  assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2273  tmp->m_gcp_complete_rep_count -= cnt;
2274  ndbout_c(" - decreasing cnt on %u/%u by %u to: %u",
2275  Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci),
2276  cnt,
2277  tmp->m_gcp_complete_rep_count);
2278  }
2279  }
2280 }
2281 
2282 void
2283 NdbEventBuffer::set_total_buckets(Uint32 cnt)
2284 {
2285  if (m_total_buckets == cnt)
2286  return;
2287 
2288  assert(m_total_buckets == TOTAL_BUCKETS_INIT);
2289  m_total_buckets = cnt;
2290 
2291  Uint64 * array = m_known_gci.getBase();
2292  Uint32 mask = m_known_gci.size() - 1;
2293  Uint32 minpos = m_min_gci_index;
2294  Uint32 maxpos = m_max_gci_index;
2295 
2296  bool found = false;
2297  Uint32 pos = minpos;
2298  for (; pos != maxpos; pos = (pos + 1) & mask)
2299  {
2300  Gci_container* tmp = find_bucket(array[pos]);
2301  if (TOTAL_BUCKETS_INIT >= tmp->m_gcp_complete_rep_count)
2302  {
2303  found = true;
2304  if (0)
2305  ndbout_c("set_total_buckets(%u) complete %u/%u",
2306  cnt, Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2307  tmp->m_gcp_complete_rep_count = 0;
2308  complete_bucket(tmp);
2309  }
2310  else
2311  {
2312  assert(tmp->m_gcp_complete_rep_count > TOTAL_BUCKETS_INIT);
2313  tmp->m_gcp_complete_rep_count -= TOTAL_BUCKETS_INIT;
2314  }
2315  }
2316  if (found)
2317  {
2318  NdbCondition_Signal(p_cond);
2319  }
2320 }
2321 
2322 void
2324 {
2325  m_alive_node_bit_mask.clear(node_id);
2326 
2327  NdbEventOperation* op= m_ndb->getEventOperation(0);
2328  if (op == 0)
2329  return;
2330 
2331  DBUG_ENTER("NdbEventBuffer::report_node_failure_completed");
2332  SubTableData data;
2333  LinearSectionPtr ptr[3];
2334  bzero(&data, sizeof(data));
2335  bzero(ptr, sizeof(ptr));
2336 
2337  data.tableId = ~0;
2338  data.requestInfo = 0;
2339  SubTableData::setOperation(data.requestInfo,
2340  NdbDictionary::Event::_TE_NODE_FAILURE);
2341  SubTableData::setReqNodeId(data.requestInfo, node_id);
2342  SubTableData::setNdbdNodeId(data.requestInfo, node_id);
2343  data.flags = SubTableData::LOG;
2344 
2345  Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2346  find_max_known_gci(&gci);
2347 
2348  data.gci_hi = Uint32(gci >> 32);
2349  data.gci_lo = Uint32(gci);
2350 
2354  // no need to lock()/unlock(), receive thread calls this
2355  insert_event(&op->m_impl, data, ptr, data.senderData);
2356 
2357  if (!m_alive_node_bit_mask.isclear())
2358  DBUG_VOID_RETURN;
2359 
2360  /*
2361  * Cluster failure
2362  */
2363 
2364  DBUG_PRINT("info", ("Cluster failure"));
2365 
2366  gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2367  bool found = find_max_known_gci(&gci);
2368 
2369  Uint64 * array = m_known_gci.getBase();
2370  Uint32 mask = m_known_gci.size() - 1;
2371  Uint32 minpos = m_min_gci_index;
2372  Uint32 maxpos = m_max_gci_index;
2373 
2374  while (minpos != maxpos && array[minpos] != gci)
2375  {
2376  Gci_container* tmp = find_bucket(array[minpos]);
2377  assert(tmp);
2378  assert(maxpos == m_max_gci_index);
2379 
2380  if(!tmp->m_data.is_empty())
2381  {
2382  free_list(tmp->m_data);
2383  }
2384  tmp->~Gci_container();
2385  bzero(tmp, sizeof(Gci_container));
2386 
2387  minpos = (minpos + 1) & mask;
2388  }
2389  m_min_gci_index = minpos;
2390  if (found)
2391  {
2392  assert(((minpos + 1) & mask) == maxpos);
2393  }
2394  else
2395  {
2396  assert(minpos == maxpos);
2397  }
2398 
2402  data.tableId = ~0;
2403  data.requestInfo = 0;
2404  SubTableData::setOperation(data.requestInfo,
2405  NdbDictionary::Event::_TE_CLUSTER_FAILURE);
2406 
2410  // no need to lock()/unlock(), receive thread calls this
2411  insert_event(&op->m_impl, data, ptr, data.senderData);
2412 
2413 #ifdef VM_TRACE
2414  m_flush_gci = 0;
2415 #endif
2416 
2420  Gci_container* tmp = find_bucket(gci);
2421  assert(tmp);
2422  if (found)
2423  {
2424  assert(m_max_gci_index == maxpos); // shouldnt have changed...
2425  }
2426  else
2427  {
2428  assert(m_max_gci_index == ((maxpos + 1) & mask));
2429  }
2430  Uint32 cnt = tmp->m_gcp_complete_rep_count;
2431 
2432  SubGcpCompleteRep rep;
2433  rep.gci_hi= (Uint32)(gci >> 32);
2434  rep.gci_lo= (Uint32)(gci & 0xFFFFFFFF);
2435  rep.gcp_complete_rep_count= cnt;
2436  rep.flags = 0;
2437  execSUB_GCP_COMPLETE_REP(&rep, SubGcpCompleteRep::SignalLength, 1);
2438 
2439  DBUG_VOID_RETURN;
2440 }
2441 
2442 Uint64
2443 NdbEventBuffer::getLatestGCI()
2444 {
2445  return m_latestGCI;
2446 }
2447 
2448 int
2450  const SubTableData * const sdata,
2451  Uint32 len,
2452  LinearSectionPtr ptr[3])
2453 {
2454  DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
2455  const Uint32 ri = sdata->requestInfo;
2456  const Uint32 operation = SubTableData::getOperation(ri);
2457  Uint32 gci_hi = sdata->gci_hi;
2458  Uint32 gci_lo = sdata->gci_lo;
2459 
2460  if (unlikely(len < SubTableData::SignalLength))
2461  {
2462  gci_lo = 0;
2463  }
2464 
2465  Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
2466  const bool is_data_event =
2467  operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
2468 
2469  if (!is_data_event)
2470  {
2471  if (operation == NdbDictionary::Event::_TE_CLUSTER_FAILURE)
2472  {
2473  /*
2474  Mark event as stopping. Subsequent dropEventOperation
2475  will add the event to the dropped list for delete
2476  */
2477  op->m_stop_gci = gci;
2478  }
2479  else if (operation == NdbDictionary::Event::_TE_ACTIVE)
2480  {
2481  // internal event, do not relay to user
2482  DBUG_PRINT("info",
2483  ("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u",
2484  op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
2485  DBUG_RETURN_EVENT(0);
2486  }
2487  else if (operation == NdbDictionary::Event::_TE_STOP)
2488  {
2489  // internal event, do not relay to user
2490  DBUG_PRINT("info",
2491  ("_TE_STOP: m_ref_count: %u for op: %p id: %u",
2492  op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
2493  DBUG_RETURN_EVENT(0);
2494  }
2495  }
2496 
2497  if ( likely((Uint32)op->mi_type & (1 << operation)))
2498  {
2499  Gci_container* bucket= find_bucket(gci);
2500 
2501  DBUG_PRINT_EVENT("info", ("data insertion in eventId %d", op->m_eventId));
2502  DBUG_PRINT_EVENT("info", ("gci=%d tab=%d op=%d node=%d",
2503  sdata->gci, sdata->tableId,
2504  SubTableData::getOperation(sdata->requestInfo),
2505  SubTableData::getReqNodeId(sdata->requestInfo)));
2506 
2507  if (unlikely(bucket == 0))
2508  {
2513  DBUG_RETURN_EVENT(0);
2514  }
2515 
2516  const bool is_blob_event = (op->theMainOp != NULL);
2517  const bool use_hash = op->m_mergeEvents && is_data_event;
2518 
2519  if (! is_data_event && is_blob_event)
2520  {
2521  // currently subscribed to but not used
2522  DBUG_PRINT_EVENT("info", ("ignore non-data event on blob table"));
2523  DBUG_RETURN_EVENT(0);
2524  }
2525 
2526  // find position in bucket hash table
2527  EventBufData* data = 0;
2529  if (use_hash)
2530  {
2531  bucket->m_data_hash.search(hpos, op, ptr);
2532  data = hpos.data;
2533  }
2534 
2535  if (data == 0)
2536  {
2537  // allocate new result buffer
2538  data = alloc_data();
2539  if (unlikely(data == 0))
2540  {
2541  op->m_has_error = 2;
2542  DBUG_RETURN_EVENT(-1);
2543  }
2544  if (unlikely(copy_data(sdata, len, ptr, data, NULL)))
2545  {
2546  op->m_has_error = 3;
2547  DBUG_RETURN_EVENT(-1);
2548  }
2549  data->m_event_op = op;
2550  if (! is_blob_event || ! is_data_event)
2551  {
2552  bucket->m_data.append_data(data);
2553  }
2554  else
2555  {
2556  // find or create main event for this blob event
2557  EventBufData_hash::Pos main_hpos;
2558  int ret = get_main_data(bucket, main_hpos, data);
2559  if (ret == -1)
2560  {
2561  op->m_has_error = 4;
2562  DBUG_RETURN_EVENT(-1);
2563  }
2564  EventBufData* main_data = main_hpos.data;
2565  if (ret != 0) // main event was created
2566  {
2567  main_data->m_event_op = op->theMainOp;
2568  bucket->m_data.append_data(main_data);
2569  if (use_hash)
2570  {
2571  main_data->m_pkhash = main_hpos.pkhash;
2572  bucket->m_data_hash.append(main_hpos, main_data);
2573  }
2574  }
2575  // link blob event under main event
2576  add_blob_data(bucket, main_data, data);
2577  }
2578  if (use_hash)
2579  {
2580  data->m_pkhash = hpos.pkhash;
2581  bucket->m_data_hash.append(hpos, data);
2582  }
2583 #ifdef VM_TRACE
2584  op->m_data_count++;
2585 #endif
2586  }
2587  else
2588  {
2589  // event with same op, PK found, merge into old buffer
2590  if (unlikely(merge_data(sdata, len, ptr, data, &bucket->m_data.m_sz)))
2591  {
2592  op->m_has_error = 3;
2593  DBUG_RETURN_EVENT(-1);
2594  }
2595  // merge is on so we do not report blob part events
2596  if (! is_blob_event) {
2597  // report actual operation and the composite
2598  // there is no way to "fix" the flags for a composite op
2599  // since the flags represent multiple ops on multiple PKs
2600  // XXX fix by doing merge at end of epoch (extra mem cost)
2601  {
2602  EventBufData_list::Gci_op g = { op, (1 << operation) };
2603  bucket->m_data.add_gci_op(g);
2604  }
2605  {
2607  g = { op,
2608  (1 << SubTableData::getOperation(data->sdata->requestInfo))};
2609  bucket->m_data.add_gci_op(g);
2610  }
2611  }
2612  }
2613 #ifdef NDB_EVENT_VERIFY_SIZE
2614  verify_size(bucket->m_data);
2615 #endif
2616  DBUG_RETURN_EVENT(0);
2617  }
2618 
2619 #ifdef VM_TRACE
2620  if ((Uint32)op->m_eventImpl->mi_type & (1 << operation))
2621  {
2622  DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId));
2623  DBUG_RETURN_EVENT(0);
2624  }
2625  else {
2626  DBUG_PRINT_EVENT("info",("skipped"));
2627  DBUG_RETURN_EVENT(0);
2628  }
2629 #else
2630  DBUG_RETURN_EVENT(0);
2631 #endif
2632 }
2633 
2634 // allocate EventBufData
2635 EventBufData*
2636 NdbEventBuffer::alloc_data()
2637 {
2638  DBUG_ENTER_EVENT("alloc_data");
2639  EventBufData* data = m_free_data;
2640 
2641  if (unlikely(data == 0))
2642  {
2643 #ifdef VM_TRACE
2644  assert(m_free_data_count == 0);
2645  assert(m_free_data_sz == 0);
2646 #endif
2647  expand(4000);
2648  reportStatus();
2649 
2650  data = m_free_data;
2651  if (unlikely(data == 0))
2652  {
2653 #ifdef VM_TRACE
2654  printf("m_latest_command: %s\n", m_latest_command);
2655  printf("no free data, m_latestGCI %u/%u\n",
2656  (Uint32)(m_latestGCI << 32), (Uint32)m_latestGCI);
2657  printf("m_free_data_count %d\n", m_free_data_count);
2658  printf("m_available_data_count %d first gci{hi/lo} %u/%u last gci{hi/lo} %u/%u\n",
2659  m_available_data.m_count,
2660  m_available_data.m_head?m_available_data.m_head->sdata->gci_hi:0,
2661  m_available_data.m_head?m_available_data.m_head->sdata->gci_lo:0,
2662  m_available_data.m_tail?m_available_data.m_tail->sdata->gci_hi:0,
2663  m_available_data.m_tail?m_available_data.m_tail->sdata->gci_lo:0);
2664  printf("m_used_data_count %d\n", m_used_data.m_count);
2665 #endif
2666  DBUG_RETURN_EVENT(0); // TODO handle this, overrun, or, skip?
2667  }
2668  }
2669 
2670  // remove data from free list
2671  if (data->m_next_blob == 0)
2672  m_free_data = data->m_next;
2673  else {
2674  EventBufData* data2 = data->m_next_blob;
2675  if (data2->m_next == 0) {
2676  data->m_next_blob = data2->m_next_blob;
2677  data = data2;
2678  } else {
2679  EventBufData* data3 = data2->m_next;
2680  data2->m_next = data3->m_next;
2681  data = data3;
2682  }
2683  }
2684  data->m_next = 0;
2685  data->m_next_blob = 0;
2686 #ifdef VM_TRACE
2687  m_free_data_count--;
2688  assert(m_free_data_sz >= data->sz);
2689 #endif
2690  m_free_data_sz -= data->sz;
2691  DBUG_RETURN_EVENT(data);
2692 }
2693 
2694 // allocate initial or bigger memory area in EventBufData
2695 // takes sizes from given ptr and sets up data->ptr
2696 int
2697 NdbEventBuffer::alloc_mem(EventBufData* data,
2698  LinearSectionPtr ptr[3],
2699  Uint32 * change_sz)
2700 {
2701  DBUG_ENTER("NdbEventBuffer::alloc_mem");
2702  DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz));
2703  const Uint32 min_alloc_size = 128;
2704 
2705  Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
2706  Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2;
2707  if (alloc_size < min_alloc_size)
2708  alloc_size = min_alloc_size;
2709 
2710  if (data->sz < alloc_size)
2711  {
2712  Uint32 add_sz = alloc_size - data->sz;
2713 
2714  NdbMem_Free((char*)data->memory);
2715  assert(m_total_alloc >= data->sz);
2716  data->memory = 0;
2717  data->sz = 0;
2718 
2719  data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
2720  if (data->memory == 0)
2721  {
2722  m_total_alloc -= data->sz;
2723  DBUG_RETURN(-1);
2724  }
2725  data->sz = alloc_size;
2726  m_total_alloc += add_sz;
2727 
2728  if (change_sz != NULL)
2729  *change_sz += add_sz;
2730  }
2731 
2732  Uint32* memptr = data->memory;
2733  memptr += sz4;
2734  int i;
2735  for (i = 0; i <= 2; i++)
2736  {
2737  data->ptr[i].p = memptr;
2738  data->ptr[i].sz = ptr[i].sz;
2739  memptr += ptr[i].sz;
2740  }
2741 
2742  DBUG_RETURN(0);
2743 }
2744 
2745 void
2746 NdbEventBuffer::dealloc_mem(EventBufData* data,
2747  Uint32 * change_sz)
2748 {
2749  NdbMem_Free((char*)data->memory);
2750  assert(m_total_alloc >= data->sz);
2751  m_total_alloc -= data->sz;
2752  if (change_sz != NULL) {
2753  assert(*change_sz >= data->sz);
2754  *change_sz -= data->sz;
2755  }
2756  data->memory = 0;
2757  data->sz = 0;
2758 }
2759 
2760 int
2761 NdbEventBuffer::copy_data(const SubTableData * const sdata, Uint32 len,
2762  LinearSectionPtr ptr[3],
2763  EventBufData* data,
2764  Uint32 * change_sz)
2765 {
2766  DBUG_ENTER_EVENT("NdbEventBuffer::copy_data");
2767 
2768  if (alloc_mem(data, ptr, change_sz) != 0)
2769  DBUG_RETURN_EVENT(-1);
2770  memcpy(data->sdata, sdata, sizeof(SubTableData));
2771 
2772  if (unlikely(len < SubTableData::SignalLength))
2773  {
2774  data->sdata->gci_lo = 0;
2775  }
2776  if (len < SubTableData::SignalLengthWithTransId)
2777  {
2778  /* No TransId, set to uninit value */
2779  data->sdata->transId1 = ~Uint32(0);
2780  data->sdata->transId2 = ~Uint32(0);
2781  }
2782 
2783  int i;
2784  for (i = 0; i <= 2; i++)
2785  memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
2786  DBUG_RETURN_EVENT(0);
2787 }
2788 
2789 static struct Ev_t {
2790  enum {
2791  enum_INS = NdbDictionary::Event::_TE_INSERT,
2792  enum_DEL = NdbDictionary::Event::_TE_DELETE,
2793  enum_UPD = NdbDictionary::Event::_TE_UPDATE,
2794  enum_NUL = NdbDictionary::Event::_TE_NUL,
2795  enum_IDM = 254, // idempotent op possibly allowed on NF
2796  enum_ERR = 255 // always impossible
2797  };
2798  int t1, t2, t3;
2799 } ev_t[] = {
2800  { Ev_t::enum_INS, Ev_t::enum_INS, Ev_t::enum_IDM },
2801  { Ev_t::enum_INS, Ev_t::enum_DEL, Ev_t::enum_NUL }, //ok
2802  { Ev_t::enum_INS, Ev_t::enum_UPD, Ev_t::enum_INS }, //ok
2803  { Ev_t::enum_DEL, Ev_t::enum_INS, Ev_t::enum_UPD }, //ok
2804  { Ev_t::enum_DEL, Ev_t::enum_DEL, Ev_t::enum_IDM },
2805  { Ev_t::enum_DEL, Ev_t::enum_UPD, Ev_t::enum_ERR },
2806  { Ev_t::enum_UPD, Ev_t::enum_INS, Ev_t::enum_ERR },
2807  { Ev_t::enum_UPD, Ev_t::enum_DEL, Ev_t::enum_DEL }, //ok
2808  { Ev_t::enum_UPD, Ev_t::enum_UPD, Ev_t::enum_UPD } //ok
2809 };
2810 
2811 /*
2812  * | INS | DEL | UPD
2813  * 0 | pk ah + all ah | pk ah | pk ah + new ah
2814  * 1 | pk ad + all ad | old pk ad | new pk ad + new ad
2815  * 2 | empty | old non-pk ah+ad | old ah+ad
2816  */
2817 
2818 static AttributeHeader
2819 copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2,
2820  Uint32 flags)
2821 {
2822  AttributeHeader ah(p2[i2]);
2823  bool do_copy = (flags & 1);
2824  if (do_copy)
2825  p1[i1] = p2[i2];
2826  i1++;
2827  i2++;
2828  return ah;
2829 }
2830 
2831 static void
2832 copy_attr(AttributeHeader ah,
2833  Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2,
2834  Uint32 flags)
2835 {
2836  bool do_copy = (flags & 1);
2837  bool with_head = (flags & 2);
2838  Uint32 n = with_head + ah.getDataSize();
2839  if (do_copy)
2840  {
2841  Uint32 k;
2842  for (k = 0; k < n; k++)
2843  p1[j1 + k] = p2[j2 + k];
2844  }
2845  j1 += n;
2846  j2 += n;
2847 }
2848 
2849 int
2850 NdbEventBuffer::merge_data(const SubTableData * const sdata, Uint32 len,
2851  LinearSectionPtr ptr2[3],
2852  EventBufData* data,
2853  Uint32 * change_sz)
2854 {
2855  DBUG_ENTER_EVENT("NdbEventBuffer::merge_data");
2856 
2857  /* TODO : Consider how/if to merge multiple events/key with different
2858  * transid
2859  * Same consideration probably applies to AnyValue!
2860  */
2861 
2862  Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
2863 
2864  int t1 = SubTableData::getOperation(data->sdata->requestInfo);
2865  int t2 = SubTableData::getOperation(sdata->requestInfo);
2866  if (t1 == Ev_t::enum_NUL)
2867  DBUG_RETURN_EVENT(copy_data(sdata, len, ptr2, data, change_sz));
2868 
2869  Ev_t* tp = 0;
2870  int i;
2871  for (i = 0; (uint) i < sizeof(ev_t)/sizeof(ev_t[0]); i++) {
2872  if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) {
2873  tp = &ev_t[i];
2874  break;
2875  }
2876  }
2877  assert(tp != 0 && tp->t3 != Ev_t::enum_ERR);
2878 
2879  if (tp->t3 == Ev_t::enum_IDM) {
2880  LinearSectionPtr (&ptr1)[3] = data->ptr;
2881 
2882  /*
2883  * TODO
2884  * - can get data in INS ptr2[2] which is supposed to be empty
2885  * - can get extra data in DEL ptr2[2]
2886  * - why does DBUG_PRINT not work in this file ???
2887  *
2888  * replication + bug#19872 can ignore this since merge is on
2889  * only for tables with explicit PK and before data is not used
2890  */
2891  const int maxsec = 1; // ignore section 2
2892 
2893  int i;
2894  for (i = 0; i <= maxsec; i++) {
2895  if (ptr1[i].sz != ptr2[i].sz ||
2896  memcmp(ptr1[i].p, ptr2[i].p, ptr1[i].sz << 2) != 0) {
2897  DBUG_PRINT("info", ("idempotent op %d*%d data differs in sec %d",
2898  tp->t1, tp->t2, i));
2899  assert(false);
2900  DBUG_RETURN_EVENT(-1);
2901  }
2902  }
2903  DBUG_PRINT("info", ("idempotent op %d*%d data ok", tp->t1, tp->t2));
2904  DBUG_RETURN_EVENT(0);
2905  }
2906 
2907  // TODO: use old data items, avoid malloc/free on each merge
2908 
2909  // save old data
2910  EventBufData olddata = *data;
2911  data->memory = 0;
2912  data->sz = 0;
2913 
2914  // compose ptr1 o ptr2 = ptr
2915  LinearSectionPtr (&ptr1)[3] = olddata.ptr;
2916  LinearSectionPtr (&ptr)[3] = data->ptr;
2917 
2918  // loop twice where first loop only sets sizes
2919  int loop;
2920  int result = 0;
2921  for (loop = 0; loop <= 1; loop++)
2922  {
2923  if (loop == 1)
2924  {
2925  if (alloc_mem(data, ptr, change_sz) != 0)
2926  {
2927  result = -1;
2928  goto end;
2929  }
2930  *data->sdata = *sdata;
2931  SubTableData::setOperation(data->sdata->requestInfo, tp->t3);
2932  }
2933 
2934  ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
2935 
2936  // copy pk from new version
2937  {
2938  AttributeHeader ah;
2939  Uint32 i = 0;
2940  Uint32 j = 0;
2941  Uint32 i2 = 0;
2942  Uint32 j2 = 0;
2943  while (i < nkey)
2944  {
2945  ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
2946  copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
2947  }
2948  ptr[0].sz = i;
2949  ptr[1].sz = j;
2950  }
2951 
2952  // merge after values, new version overrides
2953  if (tp->t3 != Ev_t::enum_DEL)
2954  {
2955  AttributeHeader ah;
2956  Uint32 i = ptr[0].sz;
2957  Uint32 j = ptr[1].sz;
2958  Uint32 i1 = 0;
2959  Uint32 j1 = 0;
2960  Uint32 i2 = nkey;
2961  Uint32 j2 = ptr[1].sz;
2962  while (i1 < nkey)
2963  {
2964  j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
2965  }
2966  while (1)
2967  {
2968  bool b1 = (i1 < ptr1[0].sz);
2969  bool b2 = (i2 < ptr2[0].sz);
2970  if (b1 && b2)
2971  {
2972  Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId();
2973  Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId();
2974  if (id1 < id2)
2975  b2 = false;
2976  else if (id1 > id2)
2977  b1 = false;
2978  else
2979  {
2980  j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
2981  b1 = false;
2982  }
2983  }
2984  if (b1)
2985  {
2986  ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop);
2987  copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop);
2988  }
2989  else if (b2)
2990  {
2991  ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
2992  copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
2993  }
2994  else
2995  break;
2996  }
2997  ptr[0].sz = i;
2998  ptr[1].sz = j;
2999  }
3000 
3001  // merge before values, old version overrides
3002  if (tp->t3 != Ev_t::enum_INS)
3003  {
3004  AttributeHeader ah;
3005  Uint32 k = 0;
3006  Uint32 k1 = 0;
3007  Uint32 k2 = 0;
3008  while (1)
3009  {
3010  bool b1 = (k1 < ptr1[2].sz);
3011  bool b2 = (k2 < ptr2[2].sz);
3012  if (b1 && b2)
3013  {
3014  Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId();
3015  Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId();
3016  if (id1 < id2)
3017  b2 = false;
3018  else if (id1 > id2)
3019  b1 = false;
3020  else
3021  {
3022  k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize();
3023  b2 = false;
3024  }
3025  }
3026  if (b1)
3027  {
3028  ah = AttributeHeader(ptr1[2].p[k1]);
3029  copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2);
3030  }
3031  else if (b2)
3032  {
3033  ah = AttributeHeader(ptr2[2].p[k2]);
3034  copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2);
3035  }
3036  else
3037  break;
3038  }
3039  ptr[2].sz = k;
3040  }
3041  }
3042 
3043 end:
3044  dealloc_mem(&olddata, change_sz);
3045  DBUG_RETURN_EVENT(result);
3046 }
3047 
3048 /*
3049  * Given blob part event, find main table event on inline part. It
3050  * should exist (force in TUP) but may arrive later. If so, create
3051  * NUL event on main table. The real event replaces it later.
3052  */
3053 
3054 int
3055 NdbEventBuffer::get_main_data(Gci_container* bucket,
3056  EventBufData_hash::Pos& hpos,
3057  EventBufData* blob_data)
3058 {
3059  DBUG_ENTER_EVENT("NdbEventBuffer::get_main_data");
3060 
3061  int blobVersion = blob_data->m_event_op->theBlobVersion;
3062  assert(blobVersion == 1 || blobVersion == 2);
3063 
3064  NdbEventOperationImpl* main_op = blob_data->m_event_op->theMainOp;
3065  assert(main_op != NULL);
3066  const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl;
3067 
3068  // create LinearSectionPtr for main table key
3069  LinearSectionPtr ptr[3];
3070 
3071  Uint32 pk_ah[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
3072  Uint32* pk_data = blob_data->ptr[1].p;
3073  Uint32 pk_size = 0;
3074 
3075  if (unlikely(blobVersion == 1)) {
3076  /*
3077  * Blob PK attribute 0 is concatenated table PK null padded
3078  * to fixed maximum size. The actual size and attributes of
3079  * table PK must be discovered.
3080  */
3081  Uint32 max_size = AttributeHeader(blob_data->ptr[0].p[0]).getDataSize();
3082 
3083  Uint32 sz = 0; // words parsed so far
3084  Uint32 n = 0;
3085  Uint32 i;
3086  for (i = 0; n < mainTable->m_noOfKeys; i++) {
3087  const NdbColumnImpl* c = mainTable->getColumn(i);
3088  assert(c != NULL);
3089  if (! c->m_pk)
3090  continue;
3091 
3092  Uint32 bytesize = c->m_attrSize * c->m_arraySize;
3093  Uint32 lb, len;
3094  assert(sz < max_size);
3095  bool ok = NdbSqlUtil::get_var_length(c->m_type, &pk_data[sz],
3096  bytesize, lb, len);
3097  assert(ok);
3098 
3099  AttributeHeader ah(i, lb + len);
3100  pk_ah[n] = ah.m_value;
3101  sz += ah.getDataSize();
3102  n++;
3103  }
3104  assert(n == mainTable->m_noOfKeys);
3105  assert(sz <= max_size);
3106  pk_size = sz;
3107  } else {
3108  /*
3109  * Blob PK starts with separate table PKs. Total size must be
3110  * counted and blob attribute ids changed to table attribute ids.
3111  */
3112  Uint32 sz = 0; // count size
3113  Uint32 n = 0;
3114  Uint32 i;
3115  for (i = 0; n < mainTable->m_noOfKeys; i++) {
3116  const NdbColumnImpl* c = mainTable->getColumn(i);
3117  assert(c != NULL);
3118  if (! c->m_pk)
3119  continue;
3120 
3121  AttributeHeader ah(blob_data->ptr[0].p[n]);
3122  ah.setAttributeId(i);
3123  pk_ah[n] = ah.m_value;
3124  sz += ah.getDataSize();
3125  n++;
3126  }
3127  assert(n == mainTable->m_noOfKeys);
3128  pk_size = sz;
3129  }
3130 
3131  ptr[0].sz = mainTable->m_noOfKeys;
3132  ptr[0].p = pk_ah;
3133  ptr[1].sz = pk_size;
3134  ptr[1].p = pk_data;
3135  ptr[2].sz = 0;
3136  ptr[2].p = 0;
3137 
3138  DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
3139  DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
3140 
3141  // search for main event buffer
3142  bucket->m_data_hash.search(hpos, main_op, ptr);
3143  if (hpos.data != NULL)
3144  DBUG_RETURN_EVENT(0);
3145 
3146  // not found, create a place-holder
3147  EventBufData* main_data = alloc_data();
3148  if (main_data == NULL)
3149  DBUG_RETURN_EVENT(-1);
3150  SubTableData sdata = *blob_data->sdata;
3151  sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
3152  SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
3153  if (copy_data(&sdata, SubTableData::SignalLength, ptr, main_data, NULL) != 0)
3154  DBUG_RETURN_EVENT(-1);
3155  hpos.data = main_data;
3156 
3157  DBUG_RETURN_EVENT(1);
3158 }
3159 
3160 void
3161 NdbEventBuffer::add_blob_data(Gci_container* bucket,
3162  EventBufData* main_data,
3163  EventBufData* blob_data)
3164 {
3165  DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data");
3166  DBUG_PRINT_EVENT("info", ("main_data=%p blob_data=%p", main_data, blob_data));
3167  EventBufData* head;
3168  head = main_data->m_next_blob;
3169  while (head != NULL)
3170  {
3171  if (head->m_event_op == blob_data->m_event_op)
3172  break;
3173  head = head->m_next_blob;
3174  }
3175  if (head == NULL)
3176  {
3177  head = blob_data;
3178  head->m_next_blob = main_data->m_next_blob;
3179  main_data->m_next_blob = head;
3180  }
3181  else
3182  {
3183  blob_data->m_next = head->m_next;
3184  head->m_next = blob_data;
3185  }
3186  // adjust data list size
3187  bucket->m_data.m_count += 1;
3188  bucket->m_data.m_sz += blob_data->sz;
3189  DBUG_VOID_RETURN_EVENT;
3190 }
3191 
3193 NdbEventBuffer::move_data()
3194 {
3195  // handle received data
3196  if (!m_complete_data.m_data.is_empty())
3197  {
3198  // move this list to last in m_available_data
3199  m_available_data.append_list(&m_complete_data.m_data, 0);
3200 
3201  bzero(&m_complete_data, sizeof(m_complete_data));
3202  }
3203 
3204  // handle used data
3205  if (!m_used_data.is_empty())
3206  {
3207  // return m_used_data to m_free_data
3208  free_list(m_used_data);
3209  }
3210  if (!m_available_data.is_empty())
3211  {
3212  DBUG_ENTER_EVENT("NdbEventBuffer::move_data");
3213 #ifdef VM_TRACE
3214  DBUG_PRINT_EVENT("exit",("m_available_data_count %u", m_available_data.m_count));
3215 #endif
3216  DBUG_RETURN_EVENT(m_available_data.m_head->m_event_op);
3217  }
3218  return 0;
3219 }
3220 
3221 void
3222 NdbEventBuffer::free_list(EventBufData_list &list)
3223 {
3224 #ifdef NDB_EVENT_VERIFY_SIZE
3225  verify_size(list);
3226 #endif
3227  // return list to m_free_data
3228  list.m_tail->m_next= m_free_data;
3229  m_free_data= list.m_head;
3230 #ifdef VM_TRACE
3231  m_free_data_count+= list.m_count;
3232 #endif
3233  m_free_data_sz+= list.m_sz;
3234 
3235  list.m_head = list.m_tail = NULL;
3236  list.m_count = list.m_sz = 0;
3237 }
3238 
3239 void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
3240 {
3241 #ifdef NDB_EVENT_VERIFY_SIZE
3242  NdbEventBuffer::verify_size(*list);
3243 #endif
3244  move_gci_ops(list, gci);
3245 
3246  if (m_tail)
3247  m_tail->m_next= list->m_head;
3248  else
3249  m_head= list->m_head;
3250  m_tail= list->m_tail;
3251  m_count+= list->m_count;
3252  m_sz+= list->m_sz;
3253 }
3254 
3255 void
3256 EventBufData_list::add_gci_op(Gci_op g)
3257 {
3258  DBUG_ENTER_EVENT("EventBufData_list::add_gci_op");
3259  DBUG_PRINT_EVENT("info", ("p.op: %p g.event_types: %x", g.op, g.event_types));
3260  assert(g.op != NULL && g.op->theMainOp == NULL); // as in nextEvent
3261  Uint32 i;
3262  for (i = 0; i < m_gci_op_count; i++) {
3263  if (m_gci_op_list[i].op == g.op)
3264  break;
3265  }
3266  if (i < m_gci_op_count) {
3267  m_gci_op_list[i].event_types |= g.event_types;
3268  } else {
3269  if (m_gci_op_count == m_gci_op_alloc) {
3270  Uint32 n = 1 + 2 * m_gci_op_alloc;
3271  Gci_op* old_list = m_gci_op_list;
3272  m_gci_op_list = new Gci_op [n];
3273  if (m_gci_op_alloc != 0) {
3274  Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op);
3275  memcpy(m_gci_op_list, old_list, bytes);
3276  DBUG_PRINT_EVENT("info", ("this: %p delete m_gci_op_list: %p",
3277  this, old_list));
3278  delete [] old_list;
3279  }
3280  else
3281  assert(old_list == 0);
3282  DBUG_PRINT_EVENT("info", ("this: %p new m_gci_op_list: %p",
3283  this, m_gci_op_list));
3284  m_gci_op_alloc = n;
3285  }
3286  assert(m_gci_op_count < m_gci_op_alloc);
3287 #ifndef DBUG_OFF
3288  i = m_gci_op_count;
3289 #endif
3290  m_gci_op_list[m_gci_op_count++] = g;
3291  }
3292  DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types));
3293  DBUG_VOID_RETURN_EVENT;
3294 }
3295 
3296 void
3297 EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci)
3298 {
3299  DBUG_ENTER_EVENT("EventBufData_list::move_gci_ops");
3300  DBUG_PRINT_EVENT("info", ("this: %p list: %p gci: %u/%u",
3301  this, list, (Uint32)(gci >> 32), (Uint32)gci));
3302  assert(!m_is_not_multi_list);
3303  if (!list->m_is_not_multi_list)
3304  {
3305  assert(gci == 0);
3306  if (m_gci_ops_list_tail)
3307  m_gci_ops_list_tail->m_next = list->m_gci_ops_list;
3308  else
3309  {
3310  m_gci_ops_list = list->m_gci_ops_list;
3311  }
3312  m_gci_ops_list_tail = list->m_gci_ops_list_tail;
3313  goto end;
3314  }
3315  {
3316  Gci_ops *new_gci_ops = new Gci_ops;
3317  DBUG_PRINT_EVENT("info", ("this: %p m_gci_op_list: %p",
3318  new_gci_ops, list->m_gci_op_list));
3319  if (m_gci_ops_list_tail)
3320  m_gci_ops_list_tail->m_next = new_gci_ops;
3321  else
3322  {
3323  assert(m_gci_ops_list == 0);
3324  m_gci_ops_list = new_gci_ops;
3325  }
3326  m_gci_ops_list_tail = new_gci_ops;
3327 
3328  new_gci_ops->m_gci_op_list = list->m_gci_op_list;
3329  new_gci_ops->m_gci_op_count = list->m_gci_op_count;
3330  new_gci_ops->m_gci = gci;
3331  new_gci_ops->m_next = 0;
3332  }
3333 end:
3334  list->m_gci_op_list = 0;
3335  list->m_gci_ops_list_tail = 0;
3336  list->m_gci_op_alloc = 0;
3337  DBUG_VOID_RETURN_EVENT;
3338 }
3339 
3341 NdbEventBuffer::createEventOperation(const char* eventName,
3342  NdbError &theError)
3343 {
3344  DBUG_ENTER("NdbEventBuffer::createEventOperation");
3345  NdbEventOperation* tOp= new NdbEventOperation(m_ndb, eventName);
3346  if (tOp == 0)
3347  {
3348  theError.code= 4000;
3349  DBUG_RETURN(NULL);
3350  }
3351  if (tOp->getState() != NdbEventOperation::EO_CREATED) {
3352  theError.code= tOp->getNdbError().code;
3353  delete tOp;
3354  DBUG_RETURN(NULL);
3355  }
3356  // add user reference
3357  // removed in dropEventOperation
3358  getEventOperationImpl(tOp)->m_ref_count = 1;
3359  DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
3360  getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp)));
3361  DBUG_RETURN(tOp);
3362 }
3363 
3365 NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt,
3366  NdbError &theError)
3367 {
3368  DBUG_ENTER("NdbEventBuffer::createEventOperationImpl");
3369  NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt);
3370  if (tOp == 0)
3371  {
3372  theError.code= 4000;
3373  DBUG_RETURN(NULL);
3374  }
3375  if (tOp->getState() != NdbEventOperation::EO_CREATED) {
3376  theError.code= tOp->getNdbError().code;
3377  delete tOp;
3378  DBUG_RETURN(NULL);
3379  }
3380  DBUG_RETURN(tOp);
3381 }
3382 
3383 void
3385 {
3386  DBUG_ENTER("NdbEventBuffer::dropEventOperation");
3387  NdbEventOperationImpl* op= getEventOperationImpl(tOp);
3388 
3389  op->stop();
3390  // stop blob event ops
3391  if (op->theMainOp == NULL)
3392  {
3393  Uint64 max_stop_gci = op->m_stop_gci;
3394  NdbEventOperationImpl* tBlobOp = op->theBlobOpList;
3395  while (tBlobOp != NULL)
3396  {
3397  tBlobOp->stop();
3398  Uint64 stop_gci = tBlobOp->m_stop_gci;
3399  if (stop_gci > max_stop_gci)
3400  max_stop_gci = stop_gci;
3401  tBlobOp = tBlobOp->m_next;
3402  }
3403  tBlobOp = op->theBlobOpList;
3404  while (tBlobOp != NULL)
3405  {
3406  tBlobOp->m_stop_gci = max_stop_gci;
3407  tBlobOp = tBlobOp->m_next;
3408  }
3409  op->m_stop_gci = max_stop_gci;
3410  }
3411 
3415  NdbMutex_Lock(m_mutex);
3416 
3417  // release blob handles now, further access is user error
3418  if (op->theMainOp == NULL)
3419  {
3420  while (op->theBlobList != NULL)
3421  {
3422  NdbBlob* tBlob = op->theBlobList;
3423  op->theBlobList = tBlob->theNext;
3424  m_ndb->releaseNdbBlob(tBlob);
3425  }
3426  }
3427 
3428  if (op->m_next)
3429  op->m_next->m_prev= op->m_prev;
3430  if (op->m_prev)
3431  op->m_prev->m_next= op->m_next;
3432  else
3433  m_ndb->theImpl->m_ev_op= op->m_next;
3434 
3435  assert(m_ndb->theImpl->m_ev_op == 0 || m_ndb->theImpl->m_ev_op->m_prev == 0);
3436 
3437  DBUG_ASSERT(op->m_ref_count > 0);
3438  // remove user reference
3439  // added in createEventOperation
3440  // user error to use reference after this
3441  op->m_ref_count--;
3442  DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
3443  if (op->m_ref_count == 0)
3444  {
3445  NdbMutex_Unlock(m_mutex);
3446  DBUG_PRINT("info", ("deleting op: %p", op));
3447  delete op->m_facade;
3448  }
3449  else
3450  {
3451  op->m_next= m_dropped_ev_op;
3452  op->m_prev= 0;
3453  if (m_dropped_ev_op)
3454  m_dropped_ev_op->m_prev= op;
3455  m_dropped_ev_op= op;
3456 
3457  NdbMutex_Unlock(m_mutex);
3458  }
3459  DBUG_VOID_RETURN;
3460 }
3461 
3462 void
3463 NdbEventBuffer::reportStatus()
3464 {
3465  EventBufData *apply_buf= m_available_data.m_head;
3466  Uint64 apply_gci, latest_gci= m_latestGCI;
3467  if (apply_buf == 0)
3468  apply_buf= m_complete_data.m_data.m_head;
3469  if (apply_buf && apply_buf->sdata)
3470  {
3471  Uint32 gci_hi = apply_buf->sdata->gci_hi;
3472  Uint32 gci_lo = apply_buf->sdata->gci_lo;
3473  apply_gci= gci_lo | (Uint64(gci_hi) << 32);
3474  }
3475  else
3476  apply_gci= latest_gci;
3477 
3478  if (m_free_thresh)
3479  {
3480  if (100*(Uint64)m_free_data_sz < m_min_free_thresh*(Uint64)m_total_alloc &&
3481  m_total_alloc > 1024*1024)
3482  {
3483  /* report less free buffer than m_free_thresh,
3484  next report when more free than 2 * m_free_thresh
3485  */
3486  m_min_free_thresh= 0;
3487  m_max_free_thresh= 2 * m_free_thresh;
3488  goto send_report;
3489  }
3490 
3491  if (100*(Uint64)m_free_data_sz > m_max_free_thresh*(Uint64)m_total_alloc &&
3492  m_total_alloc > 1024*1024)
3493  {
3494  /* report more free than 2 * m_free_thresh
3495  next report when less free than m_free_thresh
3496  */
3497  m_min_free_thresh= m_free_thresh;
3498  m_max_free_thresh= 100;
3499  goto send_report;
3500  }
3501  }
3502  if (m_gci_slip_thresh &&
3503  (latest_gci-apply_gci >= m_gci_slip_thresh))
3504  {
3505  goto send_report;
3506  }
3507  return;
3508 
3509 send_report:
3510  Uint32 data[8];
3511  data[0]= NDB_LE_EventBufferStatus;
3512  data[1]= m_total_alloc-m_free_data_sz;
3513  data[2]= m_total_alloc;
3514  data[3]= 0;
3515  data[4]= (Uint32)(apply_gci);
3516  data[5]= (Uint32)(apply_gci >> 32);
3517  data[6]= (Uint32)(latest_gci);
3518  data[7]= (Uint32)(latest_gci >> 32);
3519  Ndb_internal::send_event_report(true, m_ndb, data,8);
3520 #ifdef VM_TRACE
3521  assert(m_total_alloc >= m_free_data_sz);
3522 #endif
3523 }
3524 
3525 #ifdef VM_TRACE
3526 void
3527 NdbEventBuffer::verify_size(const EventBufData* data, Uint32 count, Uint32 sz)
3528 {
3529 #if 0
3530  Uint32 tmp_count = 0;
3531  Uint32 tmp_sz = 0;
3532  while (data != 0) {
3533  Uint32 full_count, full_sz;
3534  data->get_full_size(full_count, full_sz);
3535  tmp_count += full_count;
3536  tmp_sz += full_sz;
3537  data = data->m_next;
3538  }
3539  assert(tmp_count == count);
3540  assert(tmp_sz == sz);
3541 #endif
3542 }
3543 void
3544 NdbEventBuffer::verify_size(const EventBufData_list & list)
3545 {
3546 #if 0
3547  verify_size(list.m_head, list.m_count, list.m_sz);
3548 #endif
3549 }
3550 #endif
3551 
3552 // hash table routines
3553 
3554 // could optimize the all-fixed case
3555 Uint32
3556 EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
3557 {
3558  DBUG_ENTER_EVENT("EventBufData_hash::getpkhash");
3559  DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
3560  DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
3561 
3562  const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
3563 
3564  // in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
3565  // for pk update (to equivalent pk) post/pre values give same hash
3566  Uint32 nkey = tab->m_noOfKeys;
3567  assert(nkey != 0 && nkey <= ptr[0].sz);
3568  const Uint32* hptr = ptr[0].p;
3569  const uchar* dptr = (uchar*)ptr[1].p;
3570 
3571  // hash registers
3572  ulong nr1 = 0;
3573  ulong nr2 = 0;
3574  while (nkey-- != 0)
3575  {
3576  AttributeHeader ah(*hptr++);
3577  Uint32 bytesize = ah.getByteSize();
3578  assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz));
3579 
3580  Uint32 i = ah.getAttributeId();
3581  const NdbColumnImpl* col = tab->getColumn(i);
3582  assert(col != 0);
3583 
3584  Uint32 lb, len;
3585  bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len);
3586  assert(ok);
3587 
3588  CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
3589  (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
3590  dptr += ((bytesize + 3) / 4) * 4;
3591  }
3592  DBUG_PRINT_EVENT("info", ("hash result=%08x", nr1));
3593  DBUG_RETURN_EVENT(nr1);
3594 }
3595 
3596 bool
3597 EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
3598 {
3599  DBUG_ENTER_EVENT("EventBufData_hash::getpkequal");
3600  DBUG_DUMP_EVENT("ah1", (char*)ptr1[0].p, ptr1[0].sz << 2);
3601  DBUG_DUMP_EVENT("pk1", (char*)ptr1[1].p, ptr1[1].sz << 2);
3602  DBUG_DUMP_EVENT("ah2", (char*)ptr2[0].p, ptr2[0].sz << 2);
3603  DBUG_DUMP_EVENT("pk2", (char*)ptr2[1].p, ptr2[1].sz << 2);
3604 
3605  const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
3606 
3607  Uint32 nkey = tab->m_noOfKeys;
3608  assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz);
3609  const Uint32* hptr1 = ptr1[0].p;
3610  const Uint32* hptr2 = ptr2[0].p;
3611  const uchar* dptr1 = (uchar*)ptr1[1].p;
3612  const uchar* dptr2 = (uchar*)ptr2[1].p;
3613 
3614  bool equal = true;
3615 
3616  while (nkey-- != 0)
3617  {
3618  AttributeHeader ah1(*hptr1++);
3619  AttributeHeader ah2(*hptr2++);
3620  // sizes can differ on update of varchar endspace
3621  Uint32 bytesize1 = ah1.getByteSize();
3622  Uint32 bytesize2 = ah2.getByteSize();
3623  assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz));
3624  assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz));
3625 
3626  assert(ah1.getAttributeId() == ah2.getAttributeId());
3627  Uint32 i = ah1.getAttributeId();
3628  const NdbColumnImpl* col = tab->getColumn(i);
3629  assert(col != 0);
3630 
3631  Uint32 lb1, len1;
3632  bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1);
3633  Uint32 lb2, len2;
3634  bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2);
3635  assert(ok1 && ok2 && lb1 == lb2);
3636 
3637  CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
3638  int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
3639  if (res != 0)
3640  {
3641  equal = false;
3642  break;
3643  }
3644  dptr1 += ((bytesize1 + 3) / 4) * 4;
3645  dptr2 += ((bytesize2 + 3) / 4) * 4;
3646  }
3647 
3648  DBUG_PRINT_EVENT("info", ("equal=%s", equal ? "true" : "false"));
3649  DBUG_RETURN_EVENT(equal);
3650 }
3651 
3652 void
3653 EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
3654 {
3655  DBUG_ENTER_EVENT("EventBufData_hash::search");
3656  Uint32 pkhash = getpkhash(op, ptr);
3657  Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
3658  EventBufData* data = m_hash[index];
3659  while (data != 0)
3660  {
3661  if (data->m_event_op == op &&
3662  data->m_pkhash == pkhash &&
3663  getpkequal(op, data->ptr, ptr))
3664  break;
3665  data = data->m_next_hash;
3666  }
3667  hpos.index = index;
3668  hpos.data = data;
3669  hpos.pkhash = pkhash;
3670  DBUG_PRINT_EVENT("info", ("search result=%p", data));
3671  DBUG_VOID_RETURN_EVENT;
3672 }
3673 
3674 template class Vector<Gci_container_pod>;