MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbBlob.cpp
1 /*
2  Copyright (c) 2004, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "API.hpp"
19 #include <signaldata/TcKeyReq.hpp>
20 #include <NdbEnv.h>
21 #include <ndb_version.h>
22 
23 /*
24  * Reading index table directly (as a table) is faster but there are
25  * bugs or limitations. Keep the code and make possible to choose.
26  */
27 static const bool g_ndb_blob_ok_to_read_index_table = false;
28 
29 // get state
30 
33 {
34  return theState;
35 }
36 
37 void
38 NdbBlob::getVersion(int& version)
39 {
40  version = theEventBlobVersion;
41 }
42 
43 // set state (inline)
44 
45 inline void
46 NdbBlob::setState(State newState)
47 {
48  DBUG_ENTER("NdbBlob::setState");
49  DBUG_PRINT("info", ("this=%p newState=%u", this, newState));
50  theState = newState;
51  DBUG_VOID_RETURN;
52 }
53 
54 // define blob table
55 
56 int
57 NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName)
58 {
59  DBUG_ENTER("NdbBlob::getBlobTableName");
60  NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
61  if (t == NULL)
62  DBUG_RETURN(-1);
63  NdbColumnImpl* c = t->getColumn(columnName);
64  if (c == NULL)
65  DBUG_RETURN(-1);
66  getBlobTableName(btname, t, c);
67  DBUG_RETURN(0);
68 }
69 
70 void
71 NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c)
72 {
73  DBUG_ENTER("NdbBlob::getBlobTableName");
74  assert(t != 0 && c != 0 && c->getBlobType() && c->getPartSize() != 0);
75  memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
76  sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_id, (int)c->m_column_no);
77  DBUG_PRINT("info", ("blob table name: %s", btname));
78  DBUG_VOID_RETURN;
79 }
80 
81 int
82 NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c, NdbError& error)
83 {
84  DBUG_ENTER("NdbBlob::getBlobTable");
85  const int blobVersion = c->getBlobVersion();
86  assert(blobVersion == NDB_BLOB_V1 || blobVersion == NDB_BLOB_V2);
87  char btname[NdbBlobImpl::BlobTableNameSize];
88  getBlobTableName(btname, t, c);
89  bt.setName(btname);
90  bt.setLogging(t->getLogging());
91  /*
92  BLOB tables use the same fragmentation as the original table
93  It also uses the same tablespaces and it never uses any range or
94  list arrays.
95  */
96  bt.m_primaryTableId = t->m_id;
97  bt.m_fd.clear();
98  bt.m_range.clear();
100  bt.m_tablespace_id = t->m_tablespace_id;
101  bt.m_tablespace_version = t->m_tablespace_version;
103 
104  DBUG_PRINT("info", ("Define BLOB table V%d with"
105  " primary table = %u and Fragment Type = %u",
106  blobVersion,
107  bt.m_primaryTableId, (uint)bt.getFragmentType()));
108  if (unlikely(blobVersion == NDB_BLOB_V1)) {
109  /*
110  * Stripe size 0 in V1 does not work as intended.
111  * No point to add support for it now.
112  */
113  if (c->getStripeSize() == 0) {
114  error.code = NdbBlobImpl::ErrTable;
115  DBUG_RETURN(-1);
116  }
117  { NdbDictionary::Column bc("PK");
119  assert(t->m_keyLenInWords != 0);
120  bc.setLength(t->m_keyLenInWords);
121  bc.setPrimaryKey(true);
122  bc.setDistributionKey(true);
123  bt.addColumn(bc);
124  }
125  { NdbDictionary::Column bc("DIST");
127  bc.setPrimaryKey(true);
128  bc.setDistributionKey(true);
129  bt.addColumn(bc);
130  }
131  { NdbDictionary::Column bc("PART");
133  bc.setPrimaryKey(true);
134  bc.setDistributionKey(false);
135  bt.addColumn(bc);
136  }
137  { NdbDictionary::Column bc("DATA");
138  switch (c->m_type) {
140  bc.setType(NdbDictionary::Column::Binary);
141  break;
143  bc.setType(NdbDictionary::Column::Char);
144  break;
145  default:
146  assert(false);
147  break;
148  }
149  bc.setLength(c->getPartSize());
150  bc.setStorageType(c->getStorageType());
151  bt.addColumn(bc);
152  }
153  } else {
154  {
155  // table PK attributes
156  const uint columns = t->m_columns.size();
157  const uint noOfKeys = t->m_noOfKeys;
158  uint n = 0;
159  uint i;
160  for (i = 0; n < noOfKeys; i++) {
161  assert(i < columns);
162  const NdbColumnImpl* c = t->getColumn(i);
163  assert(c != NULL);
164  if (c->m_pk) {
165  bt.addColumn(*c);
166  // addColumn might usefully return the column added..
167  NdbColumnImpl* bc = bt.getColumn(n);
168  assert(bc != NULL);
169  if (c->getDistributionKey()) {
170  bc->setDistributionKey(true);
171  }
172  // confuses restore and wrong anyway
173  bc->setAutoIncrement(false);
174  bc->setDefaultValue("");
175  n++;
176  }
177  }
178  }
179  // in V2 add NDB$ to avoid conflict with table PK
180  if (c->getStripeSize() != 0)
181  { NdbDictionary::Column bc("NDB$DIST");
183  bc.setPrimaryKey(true);
184  bc.setDistributionKey(true);
185  bt.addColumn(bc);
186  }
187  { NdbDictionary::Column bc("NDB$PART");
189  bc.setPrimaryKey(true);
190  bc.setDistributionKey(false);
191  bt.addColumn(bc);
192  }
193  // in V2 add id sequence for use in blob event code
194  { NdbDictionary::Column bc("NDB$PKID");
196  bc.setPrimaryKey(false);
197  bc.setDistributionKey(false);
198  bt.addColumn(bc);
199  }
200  // in V2 changes to Longvar* regardless of size
201  { NdbDictionary::Column bc("NDB$DATA");
202  const Uint32 storageType = (Uint32)c->getStorageType();
203  switch (c->m_type) {
205  if (storageType == NDB_STORAGETYPE_MEMORY)
207  else
209  break;
211  if (storageType == NDB_STORAGETYPE_MEMORY)
213  else
215  break;
216  default:
217  assert(false);
218  break;
219  }
220  // the 2 length bytes are not part of part size
221  bc.setLength(c->getPartSize());
222  bc.setStorageType(c->getStorageType());
223  bt.addColumn(bc);
224  }
225  }
226  DBUG_RETURN(0);
227 }
228 
229 int
230 NdbBlob::getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName)
231 {
232  NdbEventImpl* e = anNdb->theDictionary->m_impl.getEvent(eventName);
233  if (e == NULL)
234  return -1;
235  NdbColumnImpl* c = e->m_tableImpl->getColumn(columnName);
236  if (c == NULL)
237  return -1;
238  getBlobEventName(bename, e, c);
239  delete e; // it is from new NdbEventImpl
240  return 0;
241 }
242 
243 void
244 NdbBlob::getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c)
245 {
246  // XXX events should have object id
247  BaseString::snprintf(bename, MAX_TAB_NAME_SIZE, "NDB$BLOBEVENT_%s_%d", e->m_name.c_str(), (int)c->m_column_no);
248 }
249 
250 void
251 NdbBlob::getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c)
252 {
253  DBUG_ENTER("NdbBlob::getBlobEvent");
254  // blob table
255  assert(c->m_blobTable != NULL);
256  const NdbTableImpl& bt = *c->m_blobTable;
257  // blob event name
258  char bename[MAX_TAB_NAME_SIZE+1];
259  getBlobEventName(bename, e, c);
260  bename[sizeof(bename)-1]= 0;
261  be.setName(bename);
262  be.setTable(bt);
263  // simple assigments
264  be.mi_type = e->mi_type;
265  be.m_dur = e->m_dur;
266  be.m_mergeEvents = e->m_mergeEvents;
267  // report unchanged data
268  // not really needed now since UPD is DEL o INS and we subscribe to all
269  be.setReport(NdbDictionary::Event::ER_ALL);
270  // columns PK - DIST - PART - DATA
271  { const NdbColumnImpl* bc = bt.getColumn((Uint32)0);
272  be.addColumn(*bc);
273  }
274  { const NdbColumnImpl* bc = bt.getColumn((Uint32)1);
275  be.addColumn(*bc);
276  }
277  { const NdbColumnImpl* bc = bt.getColumn((Uint32)2);
278  be.addColumn(*bc);
279  }
280  { const NdbColumnImpl* bc = bt.getColumn((Uint32)3);
281  be.addColumn(*bc);
282  }
283  DBUG_VOID_RETURN;
284 }
285 
286 // initialization
287 
288 NdbBlob::NdbBlob(Ndb*)
289 {
290  init();
291 }
292 
293 void
294 NdbBlob::init()
295 {
296  theBlobVersion = 0;
297  theFixedDataFlag = false;
298  theHeadSize = 0;
299  theVarsizeBytes = 0;
300  theState = Idle;
301  theEventBlobVersion = -1;
302  theBtColumnNo[0] = -1;
303  theBtColumnNo[1] = -1;
304  theBtColumnNo[2] = -1;
305  theBtColumnNo[3] = -1;
306  theBtColumnNo[4] = -1;
307  theNdb = NULL;
308  theNdbCon = NULL;
309  theNdbOp = NULL;
310  theEventOp = NULL;
311  theBlobEventOp = NULL;
312  theBlobEventPkRecAttr = NULL;
313  theBlobEventDistRecAttr = NULL;
314  theBlobEventPartRecAttr = NULL;
315  theBlobEventPkidRecAttr = NULL;
316  theBlobEventDataRecAttr = NULL;
317  theTable = NULL;
318  theAccessTable = NULL;
319  theBlobTable = NULL;
320  theColumn = NULL;
321  theFillChar = 0xFF;
322  theInlineSize = 0;
323  thePartSize = 0;
324  theStripeSize = 0;
325  theGetFlag = false;
326  theGetBuf = NULL;
327  theSetFlag = false;
328  theSetValueInPreExecFlag = false;
329  theSetBuf = NULL;
330  theGetSetBytes = 0;
331  thePendingBlobOps = 0;
332  theActiveHook = NULL;
333  theActiveHookArg = NULL;
334  thePartLen = 0;
335  theInlineData = NULL;
336  theHeadInlineRecAttr = NULL;
337  theHeadInlineReadOp = NULL;
338  theHeadInlineUpdateFlag = false;
339  userDefinedPartitioning = false;
340  thePartitionId = noPartitionId();
341  thePartitionIdRecAttr = NULL;
342  theNullFlag = -1;
343  theLength = 0;
344  thePos = 0;
345  theNext = NULL;
346 }
347 
348 void
349 NdbBlob::release()
350 {
351  theKeyBuf.release();
352  theAccessKeyBuf.release();
353  thePackKeyBuf.release();
354  theHeadInlineBuf.release();
355  theHeadInlineCopyBuf.release();
356  thePartBuf.release();
357  theBlobEventDataBuf.release();
358  setState(Idle);
359 }
360 
361 // buffers
362 
363 NdbBlob::Buf::Buf() :
364  data(NULL),
365  size(0),
366  maxsize(0)
367 {
368 }
369 
370 NdbBlob::Buf::~Buf()
371 {
372  delete [] data;
373 }
374 
375 void
376 NdbBlob::Buf::alloc(unsigned n)
377 {
378  size = n;
379  if (maxsize < n) {
380  delete [] data;
381  // align to Uint64
382  if (n % 8 != 0)
383  n += 8 - n % 8;
384  data = new char [n];
385  maxsize = n;
386  }
387 #ifdef VM_TRACE
388  memset(data, 'X', maxsize);
389 #endif
390 }
391 
392 void
393 NdbBlob::Buf::release()
394 {
395  if (data)
396  delete [] data;
397  data = NULL;
398  size = 0;
399  maxsize = 0;
400 }
401 
402 void
403 NdbBlob::Buf::zerorest()
404 {
405  assert(size <= maxsize);
406  memset(data + size, 0, maxsize - size);
407 }
408 
409 void
410 NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
411 {
412  size = src.size;
413  memcpy(data, src.data, size);
414 }
415 
416 // classify operations (inline)
417 
418 inline bool
419 NdbBlob::isTableOp()
420 {
421  return theTable == theAccessTable;
422 }
423 
424 inline bool
425 NdbBlob::isIndexOp()
426 {
427  return theTable != theAccessTable;
428 }
429 
430 inline bool
431 NdbBlob::isKeyOp()
432 {
433  return
434  theNdbOp->theOperationType == NdbOperation::InsertRequest ||
435  theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
436  theNdbOp->theOperationType == NdbOperation::WriteRequest ||
437  theNdbOp->theOperationType == NdbOperation::ReadRequest ||
438  theNdbOp->theOperationType == NdbOperation::ReadExclusive ||
439  theNdbOp->theOperationType == NdbOperation::DeleteRequest;
440 }
441 
442 inline bool
443 NdbBlob::isReadOp()
444 {
445  return
446  theNdbOp->theOperationType == NdbOperation::ReadRequest ||
447  theNdbOp->theOperationType == NdbOperation::ReadExclusive;
448 }
449 
450 inline bool
451 NdbBlob::isInsertOp()
452 {
453  return
454  theNdbOp->theOperationType == NdbOperation::InsertRequest;
455 }
456 
457 inline bool
458 NdbBlob::isUpdateOp()
459 {
460  return
461  theNdbOp->theOperationType == NdbOperation::UpdateRequest;
462 }
463 
464 inline bool
465 NdbBlob::isWriteOp()
466 {
467  return
468  theNdbOp->theOperationType == NdbOperation::WriteRequest;
469 }
470 
471 inline bool
472 NdbBlob::isDeleteOp()
473 {
474  return
475  theNdbOp->theOperationType == NdbOperation::DeleteRequest;
476 }
477 
478 inline bool
479 NdbBlob::isScanOp()
480 {
481  return
482  theNdbOp->theOperationType == NdbOperation::OpenScanRequest ||
483  theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest;
484 }
485 
486 inline bool
487 NdbBlob::isReadOnlyOp()
488 {
489  return ! (
490  theNdbOp->theOperationType == NdbOperation::InsertRequest ||
491  theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
492  theNdbOp->theOperationType == NdbOperation::WriteRequest
493  );
494 }
495 
496 inline bool
497 NdbBlob::isTakeOverOp()
498 {
499  return
500  TcKeyReq::getTakeOverScanFlag(theNdbOp->theScanInfo);
501 }
502 
503 // computations (inline)
504 
505 inline Uint32
506 NdbBlob::getPartNumber(Uint64 pos)
507 {
508  assert(thePartSize != 0 && pos >= theInlineSize);
509  Uint64 partNo = (pos - theInlineSize) / thePartSize;
510  assert(partNo < (Uint64(1) << 32));
511  return Uint32(partNo);
512 }
513 
514 inline Uint32
515 NdbBlob::getPartOffset(Uint64 pos)
516 {
517  assert(thePartSize != 0 && pos >= theInlineSize);
518  return (pos - theInlineSize) % thePartSize;
519 }
520 
521 inline Uint32
522 NdbBlob::getPartCount()
523 {
524  if (theLength <= theInlineSize)
525  return 0;
526  return 1 + getPartNumber(theLength - 1);
527 }
528 
529 inline Uint32
530 NdbBlob::getDistKey(Uint32 part)
531 {
532  assert(theStripeSize != 0);
533  Uint32 dist = 0;
534  if (unlikely(theBlobVersion == NDB_BLOB_V1))
535  dist = (part / theStripeSize) % theStripeSize;
536  else {
537  // correct the mistake
538  dist = (part / theStripeSize);
539  }
540  return dist;
541 }
542 
543 inline void
544 NdbBlob::setHeadPartitionId(NdbOperation* anOp)
545 {
546  /* For UserDefined partitioned tables,
547  * we must set the head row's partition id
548  * manually when reading/modifying it with
549  * primary key or unique key.
550  * For scans we do not have to.
551  */
552  if (userDefinedPartitioning &&
553  (thePartitionId != noPartitionId())) {
554  anOp->setPartitionId(thePartitionId);
555  }
556 }
557 
558 inline void
559 NdbBlob::setPartPartitionId(NdbOperation* anOp)
560 {
561  /* For UserDefined partitioned tables
562  * we must set the part row's partition
563  * id manually when performing operations.
564  * This means that stripe size is ignored
565  * for UserDefined partitioned tables.
566  * All part row operations use primary keys
567  */
568  if (userDefinedPartitioning) {
569  assert(thePartitionId != noPartitionId());
570  anOp->setPartitionId(thePartitionId);
571  }
572 }
573 
574 // pack/unpack table/index key XXX support routines, shortcuts
575 
576 int
577 NdbBlob::packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf)
578 {
579  DBUG_ENTER("NdbBlob::packKeyValue");
580  const Uint32* data = (const Uint32*)srcBuf.data;
581  unsigned pos = 0;
582  Uint32* pack_data = (Uint32*)thePackKeyBuf.data;
583  unsigned pack_pos = 0;
584  for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
585  NdbColumnImpl* c = aTable->m_columns[i];
586  assert(c != NULL);
587  if (c->m_pk) {
588  unsigned len = c->m_attrSize * c->m_arraySize;
589  Uint32 pack_len;
590  bool ok = c->get_var_length(&data[pos], pack_len);
591  if (! ok) {
592  setErrorCode(NdbBlobImpl::ErrCorruptPK);
593  DBUG_RETURN(-1);
594  }
595  memcpy(&pack_data[pack_pos], &data[pos], pack_len);
596  while (pack_len % 4 != 0) {
597  char* p = (char*)&pack_data[pack_pos] + pack_len++;
598  *p = 0;
599  }
600  pos += (len + 3) / 4;
601  pack_pos += pack_len / 4;
602  }
603  }
604  assert(4 * pos == srcBuf.size);
605  assert(4 * pack_pos <= thePackKeyBuf.maxsize);
606  thePackKeyBuf.size = 4 * pack_pos;
607  thePackKeyBuf.zerorest();
608  DBUG_RETURN(0);
609 }
610 
611 int
612 NdbBlob::unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf)
613 {
614  DBUG_ENTER("NdbBlob::unpackKeyValue");
615  Uint32* data = (Uint32*)dstBuf.data;
616  unsigned pos = 0;
617  const Uint32* pack_data = (const Uint32*)thePackKeyBuf.data;
618  unsigned pack_pos = 0;
619  for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
620  NdbColumnImpl* c = aTable->m_columns[i];
621  assert(c != NULL);
622  if (c->m_pk) {
623  unsigned len = c->m_attrSize * c->m_arraySize;
624  Uint32 pack_len;
625  bool ok = c->get_var_length(&pack_data[pack_pos], pack_len);
626  if (! ok) {
627  setErrorCode(NdbBlobImpl::ErrCorruptPK);
628  DBUG_RETURN(-1);
629  }
630  memcpy(&data[pos], &pack_data[pack_pos], pack_len);
631  while (pack_len % 4 != 0) {
632  char* p = (char*)&data[pos] + pack_len++;
633  *p = 0;
634  }
635  pos += (len + 3) / 4;
636  pack_pos += pack_len / 4;
637  }
638  }
639  assert(4 * pos == dstBuf.size);
640  assert(4 * pack_pos == thePackKeyBuf.size);
641  DBUG_RETURN(0);
642 }
643 
644 /* Set both packed and unpacked KeyBuf from NdbRecord and row. */
645 int
646 NdbBlob::copyKeyFromRow(const NdbRecord *record, const char *row,
647  Buf& packedBuf, Buf& unpackedBuf)
648 {
649  char buf[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
650  DBUG_ENTER("NdbBlob::copyKeyFromRow");
651 
652  assert(record->flags & NdbRecord::RecHasAllKeys);
653 
654  char *packed= packedBuf.data;
655  char *unpacked= unpackedBuf.data;
656 
657  for (Uint32 i= 0; i < record->key_index_length; i++)
658  {
659  const NdbRecord::Attr *col= &record->columns[record->key_indexes[i]];
660 
661  Uint32 len= ~0;
662  bool len_ok;
663  const char *src;
664  if (col->flags & NdbRecord::IsMysqldShrinkVarchar)
665  {
666  /* Used to support special varchar format for mysqld keys. */
667  len_ok= col->shrink_varchar(row, len, buf);
668  src= buf;
669  }
670  else
671  {
672  len_ok= col->get_var_length(row, len);
673  src= &row[col->offset];
674  }
675 
676  if (!len_ok)
677  {
678  setErrorCode(NdbBlobImpl::ErrCorruptPK);
679  DBUG_RETURN(-1);
680  }
681 
682  /* Copy the key. */
683  memcpy(packed, src, len);
684  memcpy(unpacked, src, len);
685 
686  /* Zero-pad if needed. */
687  Uint32 packed_len= (len + 3) & ~3;
688  Uint32 unpacked_len= (col->maxSize + 3) & ~3;
689  Uint32 packed_pad= packed_len - len;
690  Uint32 unpacked_pad= unpacked_len - len;
691  if (packed_pad > 0)
692  bzero(packed + len, packed_pad);
693  if (unpacked_pad > 0)
694  bzero(unpacked + len, unpacked_pad);
695  packed+= packed_len;
696  unpacked+= unpacked_len;
697  }
698 
699  packedBuf.size= packed - packedBuf.data;
700  packedBuf.zerorest();
701  assert(unpacked == unpackedBuf.data + unpackedBuf.size);
702  DBUG_RETURN(0);
703 }
704 
705 /*
706  * This method is used to get data ptr and length values for the
707  * header for an 'empty' Blob. This is a blob with length zero,
708  * or a NULL BLOB.
709  * This header is used to build signals for an insert or write
710  * operation before the correct blob header information is known.
711  * Once the blob header information is known, another operation will
712  * set the header information correctly.
713  */
714 void
715 NdbBlob::getNullOrEmptyBlobHeadDataPtr(const char * & data,
716  Uint32 & byteSize)
717 {
718  /* Only for use when preparing signals before a blob value has been set
719  * e.g. NdbRecord
720  */
721  assert(theState==Prepared);
722  assert(theLength==0);
723  assert(theSetBuf==NULL);
724  assert(theGetSetBytes==0);
725  assert(thePos==0);
726  assert(theHeadInlineBuf.data!=NULL);
727 
728  DBUG_PRINT("info", ("getNullOrEmptyBlobHeadDataPtr. Nullable : %d",
729  theColumn->m_nullable));
730 
731  if (theColumn->m_nullable)
732  {
733  /* Null Blob */
734  data = NULL;
735  byteSize = 0;
736  return;
737  }
738 
739  /* Set up the buffer ptr to appear to be pointing to some data */
740  theSetBuf=(char*) 1; // Extremely nasty way of being non-null
741  // If it's ever de-reffed, should show up
742 
743  /* Pack header etc. */
744  prepareSetHeadInlineValue();
745 
746  data=theHeadInlineBuf.data;
747 
748  /* Calculate size */
749  if (unlikely(theBlobVersion == NDB_BLOB_V1))
750  byteSize = theHeadInlineBuf.size;
751  else
752  byteSize = theHead.varsize + 2;
753 
754  /* Reset affected members */
755  theSetBuf=NULL;
756  memset(&theHead, 0, sizeof(theHead));
757 
758  /* This column is not null anymore - record the fact so that
759  * a setNull() call will modify state
760  */
761  theNullFlag=false;
762 }
763 
764 
765 // getters and setters
766 
767 void
768 NdbBlob::packBlobHead(const Head& head, char* buf, int blobVersion)
769 {
770  DBUG_ENTER("NdbBlob::packBlobHead");
771  DBUG_PRINT("info", ("version=%d", blobVersion));
772  if (unlikely(blobVersion == NDB_BLOB_V1)) {
773  // native
774  memcpy(buf, &head.length, sizeof(head.length));
775  } else {
776  unsigned char* p = (unsigned char*)buf;
777  // all little-endian
778  uint i, n;
779  for (i = 0, n = 0; i < 2; i++, n += 8)
780  *p++ = (head.varsize >> n) & 0xff;
781  for (i = 0, n = 0; i < 2; i++, n += 8)
782  *p++ = (head.reserved >> n) & 0xff;
783  for (i = 0, n = 0; i < 4; i++, n += 8)
784  *p++ = (head.pkid >> n) & 0xff;
785  for (i = 0, n = 0; i < 8; i++, n += 8)
786  *p++ = (head.length >> n) & 0xff;
787  assert(p - (uchar*)buf == 16);
788  assert(head.reserved == 0);
789  DBUG_DUMP("info", (uchar*)buf, 16);
790  }
791  DBUG_PRINT("info", ("pack: varsize=%u length=%u pkid=%u",
792  (uint)head.varsize, (uint)head.length, (uint)head.pkid));
793  DBUG_VOID_RETURN;
794 }
795 
796 void
797 NdbBlob::unpackBlobHead(Head& head, const char* buf, int blobVersion)
798 {
799  DBUG_ENTER("NdbBlob::unpackBlobHead");
800  DBUG_PRINT("info", ("version=%d", blobVersion));
801  head.varsize = 0;
802  head.reserved = 0;
803  head.pkid = 0;
804  head.length = 0;
805  if (unlikely(blobVersion == NDB_BLOB_V1)) {
806  // native
807  memcpy(&head.length, buf, sizeof(head.length));
808  head.headsize = (NDB_BLOB_V1_HEAD_SIZE << 2);
809  } else {
810  const unsigned char* p = (const unsigned char*)buf;
811  // all little-endian
812  uint i, n;
813  for (i = 0, n = 0; i < 2; i++, n += 8)
814  head.varsize |= ((Uint16)*p++ << n);
815  for (i = 0, n = 0; i < 2; i++, n += 8)
816  head.reserved |= ((Uint32)*p++ << n);
817  for (i = 0, n = 0; i < 4; i++, n += 8)
818  head.pkid |= ((Uint32)*p++ << n);
819  for (i = 0, n = 0; i < 8; i++, n += 8)
820  head.length |= ((Uint64)*p++ << n);
821  assert(p - (uchar*)buf == 16);
822  assert(head.reserved == 0);
823  head.headsize = (NDB_BLOB_V2_HEAD_SIZE << 2);
824  DBUG_DUMP("info", (uchar*)buf, 16);
825  }
826  DBUG_PRINT("info", ("unpack: varsize=%u length=%u pkid=%u",
827  (uint)head.varsize, (uint)head.length, (uint)head.pkid));
828  DBUG_VOID_RETURN;
829 }
830 
831 inline void
832 NdbBlob::packBlobHead()
833 {
834  packBlobHead(theHead, theHeadInlineBuf.data, theBlobVersion);
835 }
836 
837 inline void
838 NdbBlob::unpackBlobHead()
839 {
840  unpackBlobHead(theHead, theHeadInlineBuf.data, theBlobVersion);
841 }
842 
843 int
844 NdbBlob::getTableKeyValue(NdbOperation* anOp)
845 {
846  DBUG_ENTER("NdbBlob::getTableKeyValue");
847  Uint32* data = (Uint32*)theKeyBuf.data;
848  unsigned pos = 0;
849  for (unsigned i = 0; i < theTable->m_columns.size(); i++) {
850  NdbColumnImpl* c = theTable->m_columns[i];
851  assert(c != NULL);
852  if (c->m_pk) {
853  unsigned len = c->m_attrSize * c->m_arraySize;
854  if (anOp->getValue_impl(c, (char*)&data[pos]) == NULL) {
855  setErrorCode(anOp);
856  DBUG_RETURN(-1);
857  }
858  // odd bytes receive no data and must be zeroed
859  while (len % 4 != 0) {
860  char* p = (char*)&data[pos] + len++;
861  *p = 0;
862  }
863  pos += len / 4;
864  }
865  }
866  assert(pos == theKeyBuf.size / 4);
867  DBUG_RETURN(0);
868 }
869 
870 // in V2 operation can also be on blob part
871 int
872 NdbBlob::setTableKeyValue(NdbOperation* anOp)
873 {
874  DBUG_ENTER("NdbBlob::setTableKeyValue");
875  DBUG_DUMP("info", (uchar*) theKeyBuf.data, 4 * theTable->m_keyLenInWords);
876  const bool isBlobPartOp = (anOp->m_currentTable == theBlobTable);
877  const Uint32* data = (const Uint32*)theKeyBuf.data;
878  const unsigned columns = theTable->m_columns.size();
879  uint n = 0;
880  const uint noOfKeys = theTable->m_noOfKeys;
881  unsigned pos = 0;
882  for (unsigned i = 0; n < noOfKeys; i++) {
883  assert(i < columns);
884  const NdbColumnImpl* c = theTable->getColumn(i);
885  assert(c != NULL);
886  if (c->m_pk) {
887  unsigned len = c->m_attrSize * c->m_arraySize;
888  if (isBlobPartOp) {
889  c = theBlobTable->getColumn(n);
890  assert(c != NULL);
891  }
892  if (anOp->equal_impl(c, (const char*)&data[pos]) == -1) {
893  setErrorCode(anOp);
894  DBUG_RETURN(-1);
895  }
896  pos += (len + 3) / 4;
897  n++;
898  }
899  }
900  assert(pos == theKeyBuf.size / 4);
901  DBUG_RETURN(0);
902 }
903 
904 int
905 NdbBlob::setAccessKeyValue(NdbOperation* anOp)
906 {
907  DBUG_ENTER("NdbBlob::setAccessKeyValue");
908  DBUG_DUMP("info", (uchar*) theAccessKeyBuf.data,
909  4 * theAccessTable->m_keyLenInWords);
910  const Uint32* data = (const Uint32*)theAccessKeyBuf.data;
911  const unsigned columns = theAccessTable->m_columns.size();
912  unsigned pos = 0;
913  for (unsigned i = 0; i < columns; i++) {
914  NdbColumnImpl* c = theAccessTable->m_columns[i];
915  assert(c != NULL);
916  if (c->m_pk) {
917  unsigned len = c->m_attrSize * c->m_arraySize;
918  if (anOp->equal_impl(c, (const char*)&data[pos]) == -1) {
919  setErrorCode(anOp);
920  DBUG_RETURN(-1);
921  }
922  pos += (len + 3) / 4;
923  }
924  }
925  assert(pos == theAccessKeyBuf.size / 4);
926  DBUG_RETURN(0);
927 }
928 
929 int
930 NdbBlob::setDistKeyValue(NdbOperation* anOp, Uint32 part)
931 {
932  DBUG_ENTER("NdbBlob::setDistKeyValue");
933  if (theStripeSize != 0) {
934  Uint32 dist = getDistKey(part);
935  DBUG_PRINT("info", ("dist=%u", dist));
936  if (anOp->equal(theBtColumnNo[BtColumnDist], dist) == -1)
937  DBUG_RETURN(-1);
938  }
939  DBUG_RETURN(0);
940 }
941 
942 int
943 NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
944 {
945  DBUG_ENTER("NdbBlob::setPartKeyValue");
946  DBUG_PRINT("info", ("part=%u packkey=", part));
947  DBUG_DUMP("info", (uchar*) thePackKeyBuf.data, thePackKeyBuf.size);
948  // TODO use attr ids after compatibility with 4.1.7 not needed
949  if (unlikely(theBlobVersion == NDB_BLOB_V1)) {
950  // keep using names
951  if (anOp->equal("PK", thePackKeyBuf.data) == -1 ||
952  anOp->equal("DIST", getDistKey(part)) == -1 ||
953  anOp->equal("PART", part) == -1) {
954  setErrorCode(anOp);
955  DBUG_RETURN(-1);
956  }
957  } else {
958  if (setTableKeyValue(anOp) == -1 ||
959  setDistKeyValue(anOp, part) == -1 ||
960  anOp->equal(theBtColumnNo[BtColumnPart], part) == -1) {
961  setErrorCode(anOp);
962  DBUG_RETURN(-1);
963  }
964  }
965  setPartPartitionId(anOp);
966  DBUG_RETURN(0);
967 }
968 
969 int
970 NdbBlob::setPartPkidValue(NdbOperation* anOp, Uint32 pkid)
971 {
972  DBUG_ENTER("NdbBlob::setPartPkidValue");
973  DBUG_PRINT("info", ("pkid=%u", pkid));
974  if (unlikely(theBlobVersion == NDB_BLOB_V1))
975  ;
976  else {
977  if (anOp->setValue(theBtColumnNo[BtColumnPkid], pkid) == -1) {
978  setErrorCode(anOp);
979  DBUG_RETURN(-1);
980  }
981  }
982  DBUG_RETURN(0);
983 }
984 
985 int
986 NdbBlob::getPartDataValue(NdbOperation* anOp, char* buf, Uint16* aLenLoc)
987 {
988  DBUG_ENTER("NdbBlob::getPartDataValue");
989  assert(aLenLoc != NULL);
990  Uint32 bcNo = theBtColumnNo[BtColumnData];
991  if (theFixedDataFlag) {
992  if (anOp->getValue(bcNo, buf) == NULL) {
993  setErrorCode(anOp);
994  DBUG_RETURN(-1);
995  }
996  // length is full size and is not returned via NDB API
997  *aLenLoc = thePartSize;
998  } else {
999  const NdbColumnImpl* bc = theBlobTable->getColumn(bcNo);
1000  assert(bc != NULL);
1001  if (anOp->getVarValue(bc, buf, aLenLoc) == NULL) {
1002  setErrorCode(anOp);
1003  DBUG_RETURN(-1);
1004  }
1005  // in V2 length is set when next execute returns
1006  }
1007  DBUG_RETURN(0);
1008 }
1009 
1010 int
1011 NdbBlob::setPartDataValue(NdbOperation* anOp, const char* buf, const Uint16& aLen)
1012 {
1013  DBUG_ENTER("NdbBlob::setPartDataValue");
1014  assert(aLen != 0);
1015  Uint32 bcNo = theBtColumnNo[BtColumnData];
1016  if (theFixedDataFlag) {
1017  if (anOp->setValue(bcNo, buf) == -1) {
1018  setErrorCode(anOp);
1019  DBUG_RETURN(-1);
1020  }
1021  } else {
1022  const NdbColumnImpl* bc = theBlobTable->getColumn(bcNo);
1023  assert(bc != NULL);
1024  if (anOp->setVarValue(bc, buf, aLen) == -1) {
1025  setErrorCode(anOp);
1026  DBUG_RETURN(-1);
1027  }
1028  }
1029  DBUG_RETURN(0);
1030 }
1031 
1032 int
1033 NdbBlob::getHeadInlineValue(NdbOperation* anOp)
1034 {
1035  DBUG_ENTER("NdbBlob::getHeadInlineValue");
1036 
1037  /* Get values using implementation of getValue to avoid NdbRecord
1038  * specific checks
1039  */
1040  theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
1041  if (theHeadInlineRecAttr == NULL) {
1042  setErrorCode(anOp);
1043  DBUG_RETURN(-1);
1044  }
1045  if (userDefinedPartitioning)
1046  {
1047  /* For UserDefined partitioned tables, we ask for the partition
1048  * id of the main table row to use for the parts
1049  * Not technically needed for main table access via PK, which must
1050  * have partition id set for access, but we do it anyway and check
1051  * it's as expected.
1052  */
1053  thePartitionIdRecAttr =
1054  anOp->getValue_impl(&NdbColumnImpl::getImpl(*NdbDictionary::Column::FRAGMENT));
1055 
1056  if (thePartitionIdRecAttr == NULL) {
1057  setErrorCode(anOp);
1058  DBUG_RETURN(-1);
1059  }
1060  }
1061  /*
1062  * If we get no data from this op then the operation is aborted
1063  * one way or other. Following hack in 5.0 makes sure we don't read
1064  * garbage. The proper fix exists only in version >= 5.1.
1065  */
1066  // 5.0 theHead->length = 0;
1067  memset(&theHead, 0, sizeof(theHead));
1068  packBlobHead();
1069  DBUG_RETURN(0);
1070 }
1071 
1072 void
1073 NdbBlob::getHeadFromRecAttr()
1074 {
1075  DBUG_ENTER("NdbBlob::getHeadFromRecAttr");
1076  assert(theHeadInlineRecAttr != NULL);
1077  theNullFlag = theHeadInlineRecAttr->isNULL();
1078  assert(theEventBlobVersion >= 0 || theNullFlag != -1);
1079  if (theNullFlag == 0) {
1080  unpackBlobHead();
1081  theLength = theHead.length;
1082  } else {
1083  theLength = 0;
1084  }
1085  if (theEventBlobVersion == -1) {
1086  if (userDefinedPartitioning)
1087  {
1088  /* Use main table fragment id as partition id
1089  * for blob parts table
1090  */
1091  Uint32 id = thePartitionIdRecAttr->u_32_value();
1092  DBUG_PRINT("info", ("table partition id: %u", id));
1093  if (thePartitionId == noPartitionId()) {
1094  DBUG_PRINT("info", ("discovered here"));
1095  thePartitionId = id;
1096  } else {
1097  assert(thePartitionId == id);
1098  }
1099  }
1100  else
1101  {
1102  assert(thePartitionIdRecAttr == NULL);
1103  }
1104  }
1105 
1106  DBUG_PRINT("info", ("theNullFlag=%d theLength=%llu",
1107  theNullFlag, theLength));
1108  DBUG_VOID_RETURN;
1109 }
1110 
1111 void
1112 NdbBlob::prepareSetHeadInlineValue()
1113 {
1114  theHead.length = theLength;
1115  if (unlikely(theBlobVersion == NDB_BLOB_V1)) {
1116  if (theLength < theInlineSize)
1117  memset(theInlineData + theLength, 0, size_t(theInlineSize - theLength));
1118  } else {
1119  // the 2 length bytes are not counted in length
1120  if (theLength < theInlineSize)
1121  theHead.varsize = (theHeadSize - 2) + Uint32(theLength);
1122  else
1123  theHead.varsize = (theHeadSize - 2) + theInlineSize;
1124  theHead.pkid = 0; // wl3717_todo not yet
1125  }
1126  packBlobHead();
1127  theHeadInlineUpdateFlag = false;
1128  assert(theNullFlag != -1);
1129 }
1130 
1131 int
1132 NdbBlob::setHeadInlineValue(NdbOperation* anOp)
1133 {
1134  DBUG_ENTER("NdbBlob::setHeadInlineValue");
1135  prepareSetHeadInlineValue();
1136  const char* aValue = theNullFlag ? 0 : theHeadInlineBuf.data;
1137  if (anOp->setValue(theColumn, aValue) == -1) {
1138  setErrorCode(anOp);
1139  DBUG_RETURN(-1);
1140  }
1141  DBUG_RETURN(0);
1142 }
1143 
1144 // getValue/setValue
1145 
1146 int
1147 NdbBlob::getValue(void* data, Uint32 bytes)
1148 {
1149  DBUG_ENTER("NdbBlob::getValue");
1150  DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
1151  if (! isReadOp() && ! isScanOp()) {
1152  setErrorCode(NdbBlobImpl::ErrCompat);
1153  DBUG_RETURN(-1);
1154  }
1155  if (theGetFlag || theState != Prepared) {
1156  setErrorCode(NdbBlobImpl::ErrState);
1157  DBUG_RETURN(-1);
1158  }
1159  if (data == NULL && bytes != 0) {
1160  setErrorCode(NdbBlobImpl::ErrUsage);
1161  DBUG_RETURN(-1);
1162  }
1163  theGetFlag = true;
1164  theGetBuf = static_cast<char*>(data);
1165  theGetSetBytes = bytes;
1166  DBUG_RETURN(0);
1167 }
1168 
1169 int
1170 NdbBlob::setValue(const void* data, Uint32 bytes)
1171 {
1172  DBUG_ENTER("NdbBlob::setValue");
1173  DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
1174  if (isReadOnlyOp()) {
1175  setErrorCode(NdbBlobImpl::ErrCompat);
1176  DBUG_RETURN(-1);
1177  }
1178  if (theSetFlag || theState != Prepared) {
1179  setErrorCode(NdbBlobImpl::ErrState);
1180  DBUG_RETURN(-1);
1181  }
1182  if (data == NULL && bytes != 0) {
1183  setErrorCode(NdbBlobImpl::ErrUsage);
1184  DBUG_RETURN(-1);
1185  }
1186  theSetFlag = true;
1187  theSetBuf = static_cast<const char*>(data);
1188  theGetSetBytes = bytes;
1189  if (isInsertOp()) {
1190  // write inline part now
1191  if (theSetBuf != NULL) {
1192  Uint32 n = theGetSetBytes;
1193  if (n > theInlineSize)
1194  n = theInlineSize;
1195  assert(thePos == 0);
1196  if (writeDataPrivate(theSetBuf, n) == -1)
1197  DBUG_RETURN(-1);
1198  } else {
1199  theNullFlag = true;
1200  theLength = 0;
1201  }
1202  /*
1203  * In NdbRecAttr case, we set the value of the blob head here with
1204  * an extra setValue()
1205  * In NdbRecord case, this is done by adding a separate operation in
1206  * preExecute() as we cannot modify the head-table NdbOperation.
1207  */
1208  if (!theNdbRecordFlag)
1209  {
1210  if (setHeadInlineValue(theNdbOp) == -1)
1211  DBUG_RETURN(-1);
1212  }
1213  }
1214  DBUG_RETURN(0);
1215 }
1216 
1217 // activation hook
1218 
1219 int
1220 NdbBlob::setActiveHook(ActiveHook activeHook, void* arg)
1221 {
1222  DBUG_ENTER("NdbBlob::setActiveHook");
1223  DBUG_PRINT("info", ("hook=%p arg=%p", (void*)&activeHook, arg));
1224  if (theState != Prepared) {
1225  setErrorCode(NdbBlobImpl::ErrState);
1226  DBUG_RETURN(-1);
1227  }
1228  theActiveHook = activeHook;
1229  theActiveHookArg = arg;
1230  DBUG_RETURN(0);
1231 }
1232 
1233 // misc operations
1234 
1235 int
1236 NdbBlob::getDefined(int& isNull) // deprecated
1237 {
1238  DBUG_ENTER("NdbBlob::getDefined");
1239  if (theState == Prepared && theSetFlag) {
1240  isNull = (theSetBuf == NULL);
1241  DBUG_RETURN(0);
1242  }
1243  isNull = theNullFlag;
1244  DBUG_RETURN(0);
1245 }
1246 
1247 int
1248 NdbBlob::getNull(bool& isNull) // deprecated
1249 {
1250  DBUG_ENTER("NdbBlob::getNull");
1251  if (theState == Prepared && theSetFlag) {
1252  isNull = (theSetBuf == NULL);
1253  DBUG_RETURN(0);
1254  }
1255  if (theNullFlag == -1) {
1256  setErrorCode(NdbBlobImpl::ErrState);
1257  DBUG_RETURN(-1);
1258  }
1259  isNull = theNullFlag;
1260  DBUG_RETURN(0);
1261 }
1262 
1263 int
1264 NdbBlob::getNull(int& isNull)
1265 {
1266  DBUG_ENTER("NdbBlob::getNull");
1267  if (theState == Prepared && theSetFlag) {
1268  isNull = (theSetBuf == NULL);
1269  DBUG_RETURN(0);
1270  }
1271  isNull = theNullFlag;
1272  if (isNull == -1 && theEventBlobVersion == -1) {
1273  setErrorCode(NdbBlobImpl::ErrState);
1274  DBUG_RETURN(-1);
1275  }
1276  DBUG_PRINT("info", ("isNull=%d", isNull));
1277  DBUG_RETURN(0);
1278 }
1279 
1280 int
1282 {
1283  DBUG_ENTER("NdbBlob::setNull");
1284  if (isReadOnlyOp()) {
1285  setErrorCode(NdbBlobImpl::ErrCompat);
1286  DBUG_RETURN(-1);
1287  }
1288  if (theNullFlag == -1) {
1289  if (theState == Prepared) {
1290  DBUG_RETURN(setValue(0, 0));
1291  }
1292  setErrorCode(NdbBlobImpl::ErrState);
1293  DBUG_RETURN(-1);
1294  }
1295  if (theNullFlag)
1296  DBUG_RETURN(0);
1297  if (deletePartsThrottled(0, getPartCount()) == -1)
1298  DBUG_RETURN(-1);
1299  theNullFlag = true;
1300  theLength = 0;
1301  theHeadInlineUpdateFlag = true;
1302  DBUG_RETURN(0);
1303 }
1304 
1305 int
1307 {
1308  DBUG_ENTER("NdbBlob::getLength");
1309  if (theState == Prepared && theSetFlag) {
1310  len = theGetSetBytes;
1311  DBUG_RETURN(0);
1312  }
1313  if (theNullFlag == -1) {
1314  setErrorCode(NdbBlobImpl::ErrState);
1315  DBUG_RETURN(-1);
1316  }
1317  len = theLength;
1318  DBUG_RETURN(0);
1319 }
1320 
1321 int
1322 NdbBlob::truncate(Uint64 length)
1323 {
1324  DBUG_ENTER("NdbBlob::truncate");
1325  DBUG_PRINT("info", ("length old=%llu new=%llu", theLength, length));
1326  if (isReadOnlyOp()) {
1327  setErrorCode(NdbBlobImpl::ErrCompat);
1328  DBUG_RETURN(-1);
1329  }
1330  if (theNullFlag == -1) {
1331  setErrorCode(NdbBlobImpl::ErrState);
1332  DBUG_RETURN(-1);
1333  }
1334  if (theLength > length) {
1335  if (length > theInlineSize) {
1336  Uint32 part1 = getPartNumber(length - 1);
1337  Uint32 part2 = getPartNumber(theLength - 1);
1338  assert(part2 >= part1);
1339  if (part2 > part1 && deletePartsThrottled(part1 + 1, part2 - part1) == -1)
1340  DBUG_RETURN(-1);
1341  Uint32 off = getPartOffset(length);
1342  if (off != 0) {
1343  assert(off < thePartSize);
1344  /* Ensure all previous writes to this blob are flushed so
1345  * that we can read their updates
1346  */
1347  if (executePendingBlobWrites() == -1)
1348  DBUG_RETURN(-1);
1349  Uint16 len = 0;
1350  if (readPart(thePartBuf.data, part1, len) == -1)
1351  DBUG_RETURN(-1);
1352  if (executePendingBlobReads() == -1)
1353  DBUG_RETURN(-1);
1354  assert(len != 0);
1355  DBUG_PRINT("info", ("part %u length old=%u new=%u",
1356  part1, (Uint32)len, off));
1357  if (theFixedDataFlag)
1358  memset(thePartBuf.data + off, theFillChar, thePartSize - off);
1359  if (updatePart(thePartBuf.data, part1, off) == -1)
1360  DBUG_RETURN(-1);
1361  }
1362  } else {
1363  if (deletePartsThrottled(0, getPartCount()) == -1)
1364  DBUG_RETURN(-1);
1365  }
1366  theLength = length;
1367  theHeadInlineUpdateFlag = true;
1368  if (thePos > length)
1369  thePos = length;
1370  }
1371  DBUG_RETURN(0);
1372 }
1373 
1374 int
1375 NdbBlob::getPos(Uint64& pos)
1376 {
1377  DBUG_ENTER("NdbBlob::getPos");
1378  if (theNullFlag == -1) {
1379  setErrorCode(NdbBlobImpl::ErrState);
1380  DBUG_RETURN(-1);
1381  }
1382  pos = thePos;
1383  DBUG_RETURN(0);
1384 }
1385 
1386 int
1387 NdbBlob::setPos(Uint64 pos)
1388 {
1389  DBUG_ENTER("NdbBlob::setPos");
1390  DBUG_PRINT("info", ("this=%p pos=%llu", this, pos));
1391  if (theNullFlag == -1) {
1392  setErrorCode(NdbBlobImpl::ErrState);
1393  DBUG_RETURN(-1);
1394  }
1395  if (pos > theLength) {
1396  setErrorCode(NdbBlobImpl::ErrSeek);
1397  DBUG_RETURN(-1);
1398  }
1399  thePos = pos;
1400  DBUG_RETURN(0);
1401 }
1402 
1403 // read/write
1404 
1405 int
1406 NdbBlob::readData(void* data, Uint32& bytes)
1407 {
1408  DBUG_ENTER("NdbBlob::readData");
1409  if (unlikely(theState != Active)) {
1410  setErrorCode(NdbBlobImpl::ErrState);
1411  DBUG_RETURN(-1);
1412  }
1413  char* buf = static_cast<char*>(data);
1414  int ret = readDataPrivate(buf, bytes);
1415  DBUG_RETURN(ret);
1416 }
1417 
1418 int
1419 NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
1420 {
1421  DBUG_ENTER("NdbBlob::readDataPrivate");
1422  DBUG_PRINT("info", ("bytes=%u thePos=%u theLength=%u",
1423  bytes, (Uint32)thePos, (Uint32)theLength));
1424  assert(thePos <= theLength);
1425  Uint64 pos = thePos;
1426  if (bytes > theLength - pos)
1427  bytes = Uint32(theLength - pos);
1428  Uint32 len = bytes;
1429  if (len > 0) {
1430  // inline part
1431  if (pos < theInlineSize) {
1432  Uint32 n = theInlineSize - Uint32(pos);
1433  if (n > len)
1434  n = len;
1435  memcpy(buf, theInlineData + pos, n);
1436  pos += n;
1437  buf += n;
1438  len -= n;
1439  }
1440  }
1441  if (unlikely(len > 0 && thePartSize == 0)) {
1442  setErrorCode(NdbBlobImpl::ErrSeek);
1443  DBUG_RETURN(-1);
1444  }
1445  if (len > 0) {
1446  assert(pos >= theInlineSize);
1447  Uint32 off = (pos - theInlineSize) % thePartSize;
1448  // partial first block
1449  if (off != 0) {
1450  DBUG_PRINT("info", ("partial first block pos=%llu len=%u", pos, len));
1451  Uint32 part = getPartNumber(pos);
1452  Uint16 sz = 0;
1453  if (readPart(thePartBuf.data, part, sz) == -1)
1454  DBUG_RETURN(-1);
1455  // need result now
1456  if (executePendingBlobReads() == -1)
1457  DBUG_RETURN(-1);
1458  assert(sz >= off);
1459  Uint32 n = sz - off;
1460  if (n > len)
1461  n = len;
1462  memcpy(buf, thePartBuf.data + off, n);
1463  pos += n;
1464  buf += n;
1465  len -= n;
1466  }
1467  }
1468  if (len > 0) {
1469  assert((pos - theInlineSize) % thePartSize == 0);
1470  // complete blocks in the middle
1471  if (len >= thePartSize) {
1472  Uint32 part = getPartNumber(pos);
1473  Uint32 count = len / thePartSize;
1474  do
1475  {
1476  /* How much quota left, avoiding underflow? */
1477  Uint32 partsThisTrip = count;
1478  if (theEventBlobVersion == -1)
1479  {
1480  /* Table read as opposed to event buffer read */
1481  const Uint32 remainingQuota =
1482  theNdbCon->maxPendingBlobReadBytes -
1483  MIN(theNdbCon->maxPendingBlobReadBytes, theNdbCon->pendingBlobReadBytes);
1484  const Uint32 maxPartsThisTrip = MAX(remainingQuota / thePartSize, 1); // always read one part
1485  partsThisTrip= MIN(count, maxPartsThisTrip);
1486  }
1487 
1488  if (readParts(buf, part, partsThisTrip) == -1)
1489  DBUG_RETURN(-1);
1490  Uint32 n = thePartSize * partsThisTrip;
1491 
1492  pos += n;
1493  buf += n;
1494  len -= n;
1495  part += partsThisTrip;
1496  count -= partsThisTrip;
1497  if (count != 0)
1498  {
1499  /* Execute this batch before defining next */
1500  if (executePendingBlobReads() == -1)
1501  DBUG_RETURN(-1);
1502  }
1503  } while (count != 0);
1504  }
1505  }
1506  if (len > 0) {
1507  // partial last block
1508  DBUG_PRINT("info", ("partial last block pos=%llu len=%u", pos, len));
1509  assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
1510  Uint32 part = getPartNumber(pos);
1511  Uint16 sz = 0;
1512  if (readPart(thePartBuf.data, part, sz) == -1)
1513  DBUG_RETURN(-1);
1514  // need result now
1515  if (executePendingBlobReads() == -1)
1516  DBUG_RETURN(-1);
1517  assert(len <= sz);
1518  memcpy(buf, thePartBuf.data, len);
1519  Uint32 n = len;
1520  pos += n;
1521  buf += n;
1522  len -= n;
1523  }
1524  assert(len == 0);
1525  thePos = pos;
1526  assert(thePos <= theLength);
1527  DBUG_RETURN(0);
1528 }
1529 
1530 int
1531 NdbBlob::writeData(const void* data, Uint32 bytes)
1532 {
1533  DBUG_ENTER("NdbBlob::writeData");
1534  if (unlikely(isReadOnlyOp())) {
1535  setErrorCode(NdbBlobImpl::ErrCompat);
1536  DBUG_RETURN(-1);
1537  }
1538  if (unlikely(theState != Active)) {
1539  setErrorCode(NdbBlobImpl::ErrState);
1540  DBUG_RETURN(-1);
1541  }
1542  const char* buf = static_cast<const char*>(data);
1543  int ret = writeDataPrivate(buf, bytes);
1544  DBUG_RETURN(ret);
1545 }
1546 
1547 int
1548 NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes)
1549 {
1550  DBUG_ENTER("NdbBlob::writeDataPrivate");
1551  DBUG_PRINT("info", ("pos=%llu bytes=%u", thePos, bytes));
1552  assert(thePos <= theLength);
1553  Uint64 pos = thePos;
1554  Uint32 len = bytes;
1555  // any write makes blob not NULL
1556  if (theNullFlag) {
1557  theNullFlag = false;
1558  theHeadInlineUpdateFlag = true;
1559  }
1560  if (len > 0) {
1561  // inline part
1562  if (pos < theInlineSize) {
1563  Uint32 n = theInlineSize - Uint32(pos);
1564  if (n > len)
1565  n = len;
1566  memcpy(theInlineData + pos, buf, n);
1567  theHeadInlineUpdateFlag = true;
1568  pos += n;
1569  buf += n;
1570  len -= n;
1571  }
1572  }
1573  if (unlikely(len > 0 && thePartSize == 0)) {
1574  setErrorCode(NdbBlobImpl::ErrSeek);
1575  DBUG_RETURN(-1);
1576  }
1577  if (len > 0) {
1578  assert(pos >= theInlineSize);
1579  Uint32 off = (pos - theInlineSize) % thePartSize;
1580  // partial first block
1581  if (off != 0) {
1582  DBUG_PRINT("info", ("partial first block pos=%llu len=%u", pos, len));
1583  // flush writes to guarantee correct read
1584  if (executePendingBlobWrites() == -1)
1585  DBUG_RETURN(-1);
1586  Uint32 part = getPartNumber(pos);
1587  Uint16 sz = 0;
1588  if (readPart(thePartBuf.data, part, sz) == -1)
1589  DBUG_RETURN(-1);
1590  // need result now
1591  if (executePendingBlobReads() == -1)
1592  DBUG_RETURN(-1);
1593  DBUG_PRINT("info", ("part len=%u", (Uint32)sz));
1594  assert(sz >= off);
1595  Uint32 n = thePartSize - off;
1596  if (n > len)
1597  n = len;
1598  Uint16 newsz = sz;
1599  if (pos + n > theLength) {
1600  // this is last part and we are extending it
1601  newsz = off + n;
1602  }
1603  memcpy(thePartBuf.data + off, buf, n);
1604  if (updatePart(thePartBuf.data, part, newsz) == -1)
1605  DBUG_RETURN(-1);
1606  pos += n;
1607  buf += n;
1608  len -= n;
1609  }
1610  }
1611  if (len > 0) {
1612  assert((pos - theInlineSize) % thePartSize == 0);
1613  // complete blocks in the middle
1614  if (len >= thePartSize) {
1615  Uint32 part = getPartNumber(pos);
1616  Uint32 count = len / thePartSize;
1617  for (unsigned i = 0; i < count; i++) {
1618  if (part + i < getPartCount()) {
1619  if (updateParts(buf, part + i, 1) == -1)
1620  DBUG_RETURN(-1);
1621  } else {
1622  if (insertParts(buf, part + i, 1) == -1)
1623  DBUG_RETURN(-1);
1624  }
1625  Uint32 n = thePartSize;
1626  pos += n;
1627  buf += n;
1628  len -= n;
1629  if (theNdbCon->pendingBlobWriteBytes >
1630  theNdbCon->maxPendingBlobWriteBytes)
1631  {
1632  /* Flush defined part ops */
1633  if (executePendingBlobWrites() == -1)
1634  {
1635  DBUG_RETURN(-1);
1636  }
1637  }
1638  }
1639  }
1640  }
1641  if (len > 0) {
1642  // partial last block
1643  DBUG_PRINT("info", ("partial last block pos=%llu len=%u", pos, len));
1644  assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
1645  Uint32 part = getPartNumber(pos);
1646  if (theLength > pos + len) {
1647  // flush writes to guarantee correct read
1648  if (executePendingBlobWrites() == -1)
1649  DBUG_RETURN(-1);
1650  Uint16 sz = 0;
1651  if (readPart(thePartBuf.data, part, sz) == -1)
1652  DBUG_RETURN(-1);
1653  // need result now
1654  if (executePendingBlobReads() == -1)
1655  DBUG_RETURN(-1);
1656  memcpy(thePartBuf.data, buf, len);
1657  // no length change
1658  if (updatePart(thePartBuf.data, part, sz) == -1)
1659  DBUG_RETURN(-1);
1660  } else {
1661  memcpy(thePartBuf.data, buf, len);
1662  if (theFixedDataFlag) {
1663  memset(thePartBuf.data + len, theFillChar, thePartSize - len);
1664  }
1665  Uint16 sz = len;
1666  if (part < getPartCount()) {
1667  if (updatePart(thePartBuf.data, part, sz) == -1)
1668  DBUG_RETURN(-1);
1669  } else {
1670  if (insertPart(thePartBuf.data, part, sz) == -1)
1671  DBUG_RETURN(-1);
1672  }
1673  }
1674  Uint32 n = len;
1675  pos += n;
1676  buf += n;
1677  len -= n;
1678  }
1679  assert(len == 0);
1680  if (theLength < pos) {
1681  theLength = pos;
1682  theHeadInlineUpdateFlag = true;
1683  }
1684  thePos = pos;
1685  assert(thePos <= theLength);
1686  DBUG_RETURN(0);
1687 }
1688 
1689 /*
1690  * Operations on parts.
1691  *
1692  * - multi-part read/write operates only on full parts
1693  * - single-part read/write uses length
1694  * - single-part read requires caller to exec pending ops
1695  *
1696  * In V1 parts are striped. In V2 they are either striped
1697  * or use table row partition. The latter case applies both
1698  * to default and user-defined partitioning.
1699  */
1700 
1701 int
1702 NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
1703 {
1704  DBUG_ENTER("NdbBlob::readParts");
1705  DBUG_PRINT("info", ("part=%u count=%u", part, count));
1706  if (theEventBlobVersion == -1) {
1707  if (readTableParts(buf, part, count) == -1)
1708  DBUG_RETURN(-1);
1709  } else {
1710  if (readEventParts(buf, part, count) == -1)
1711  DBUG_RETURN(-1);
1712  }
1713  DBUG_RETURN(0);
1714 }
1715 
1716 int
1717 NdbBlob::readPart(char* buf, Uint32 part, Uint16& len)
1718 {
1719  DBUG_ENTER("NdbBlob::readPart");
1720  DBUG_PRINT("info", ("part=%u", part));
1721  if (theEventBlobVersion == -1) {
1722  if (readTablePart(buf, part, len) == -1)
1723  DBUG_RETURN(-1);
1724  } else {
1725  if (readEventPart(buf, part, len) == -1)
1726  DBUG_RETURN(-1);
1727  }
1728  DBUG_PRINT("info", ("part=%u len=%u", part, (Uint32)len));
1729  DBUG_RETURN(0);
1730 }
1731 
1732 int
1733 NdbBlob::readTableParts(char* buf, Uint32 part, Uint32 count)
1734 {
1735  DBUG_ENTER("NdbBlob::readTableParts");
1736  Uint32 n = 0;
1737  while (n < count) {
1738  // length is not checked but a non-stack buffer is needed
1739  if (readTablePart(buf + n * thePartSize, part + n, thePartLen) == -1)
1740  DBUG_RETURN(-1);
1741  n++;
1742  }
1743  DBUG_RETURN(0);
1744 }
1745 
1746 int
1747 NdbBlob::readTablePart(char* buf, Uint32 part, Uint16& len)
1748 {
1749  DBUG_ENTER("NdbBlob::readTablePart");
1750  NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
1751  if (tOp == NULL ||
1752  /*
1753  * This was committedRead() before. However lock on main
1754  * table tuple does not fully protect blob parts since DBTUP
1755  * commits each tuple separately.
1756  */
1757  tOp->readTuple(NdbOperation::LM_SimpleRead) == -1 ||
1758  setPartKeyValue(tOp, part) == -1 ||
1759  getPartDataValue(tOp, buf, &len) == -1) {
1760  setErrorCode(tOp);
1761  DBUG_RETURN(-1);
1762  }
1763 
1764  tOp->m_abortOption = NdbOperation::AbortOnError;
1765  thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
1766  theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
1767  theNdbCon->pendingBlobReadBytes += len;
1768  DBUG_RETURN(0);
1769 }
1770 
1771 int
1772 NdbBlob::readEventParts(char* buf, Uint32 part, Uint32 count)
1773 {
1774  DBUG_ENTER("NdbBlob::readEventParts");
1775  // length not asked for - event code checks each part is full
1776  if (theEventOp->readBlobParts(buf, this, part, count, (Uint16*)0) == -1) {
1777  setErrorCode(theEventOp);
1778  DBUG_RETURN(-1);
1779  }
1780  DBUG_RETURN(0);
1781 }
1782 
1783 int
1784 NdbBlob::readEventPart(char* buf, Uint32 part, Uint16& len)
1785 {
1786  DBUG_ENTER("NdbBlob::readEventPart");
1787  if (theEventOp->readBlobParts(buf, this, part, 1, &len) == -1) {
1788  setErrorCode(theEventOp);
1789  DBUG_RETURN(-1);
1790  }
1791  DBUG_RETURN(0);
1792 }
1793 
1794 int
1795 NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
1796 {
1797  DBUG_ENTER("NdbBlob::insertParts");
1798  DBUG_PRINT("info", ("part=%u count=%u", part, count));
1799  Uint32 n = 0;
1800  while (n < count) {
1801  // use non-stack variable for safety
1802  thePartLen = thePartSize;
1803  if (insertPart(buf + n * thePartSize, part + n, thePartLen) == -1)
1804  DBUG_RETURN(-1);
1805  n++;
1806  }
1807  DBUG_RETURN(0);
1808 }
1809 
1810 int
1811 NdbBlob::insertPart(const char* buf, Uint32 part, const Uint16& len)
1812 {
1813  DBUG_ENTER("NdbBlob::insertPart");
1814  DBUG_PRINT("info", ("part=%u len=%u", part, (Uint32)len));
1815  NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
1816  if (tOp == NULL ||
1817  tOp->insertTuple() == -1 ||
1818  setPartKeyValue(tOp, part) == -1 ||
1819  setPartPkidValue(tOp, theHead.pkid) == -1 ||
1820  setPartDataValue(tOp, buf, len) == -1) {
1821  setErrorCode(tOp);
1822  DBUG_RETURN(-1);
1823  }
1824 
1825  tOp->m_abortOption = NdbOperation::AbortOnError;
1826  thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
1827  theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
1828  theNdbCon->pendingBlobWriteBytes += len;
1829  DBUG_RETURN(0);
1830 }
1831 
1832 int
1833 NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
1834 {
1835  DBUG_ENTER("NdbBlob::updateParts");
1836  DBUG_PRINT("info", ("part=%u count=%u", part, count));
1837  Uint32 n = 0;
1838  while (n < count) {
1839  // use non-stack variable for safety
1840  thePartLen = thePartSize;
1841  if (updatePart(buf + n * thePartSize, part + n, thePartLen) == -1)
1842  DBUG_RETURN(-1);
1843  n++;
1844  }
1845  DBUG_RETURN(0);
1846 }
1847 
1848 int
1849 NdbBlob::updatePart(const char* buf, Uint32 part, const Uint16& len)
1850 {
1851  DBUG_ENTER("NdbBlob::updatePart");
1852  DBUG_PRINT("info", ("part=%u len=%u", part, (Uint32)len));
1853  NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
1854  if (tOp == NULL ||
1855  tOp->updateTuple() == -1 ||
1856  setPartKeyValue(tOp, part) == -1 ||
1857  setPartPkidValue(tOp, theHead.pkid) == -1 ||
1858  setPartDataValue(tOp, buf, len) == -1) {
1859  setErrorCode(tOp);
1860  DBUG_RETURN(-1);
1861  }
1862 
1863  tOp->m_abortOption = NdbOperation::AbortOnError;
1864  thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
1865  theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
1866  theNdbCon->pendingBlobWriteBytes += len;
1867  DBUG_RETURN(0);
1868 }
1869 
1870 int
1871 NdbBlob::deletePartsThrottled(Uint32 part, Uint32 count)
1872 {
1873  DBUG_ENTER("NdbBlob::deletePartsThrottled");
1874  DBUG_PRINT("info", ("part=%u count=%u maxPendingBlobWriteBytes=%u",
1875  part, count, theNdbCon->maxPendingBlobWriteBytes));
1876 
1877  if (thePartSize)
1878  {
1879  do
1880  {
1881  /* How much quota left, avoiding underflow? */
1882  const Uint32 remainingQuota =
1883  theNdbCon->maxPendingBlobWriteBytes -
1884  MIN(theNdbCon->maxPendingBlobWriteBytes, theNdbCon->pendingBlobWriteBytes);
1885  const Uint32 maxPartsThisTrip = MAX(remainingQuota / thePartSize, 1); // always read one part
1886  const Uint32 partsThisTrip= MIN(count, maxPartsThisTrip);
1887 
1888  int rc = deleteParts(part, partsThisTrip);
1889  if (rc != 0)
1890  DBUG_RETURN(rc);
1891 
1892  part+= partsThisTrip;
1893  count-= partsThisTrip;
1894 
1895  if (count != 0)
1896  {
1897  /* Execute this batch before defining next */
1898  if (executePendingBlobWrites() == -1)
1899  DBUG_RETURN(-1);
1900  }
1901  } while (count != 0);
1902  }
1903 
1904  DBUG_RETURN(0);
1905 }
1906 
1907 int
1908 NdbBlob::deleteParts(Uint32 part, Uint32 count)
1909 {
1910  DBUG_ENTER("NdbBlob::deleteParts");
1911  DBUG_PRINT("info", ("part=%u count=%u", part, count));
1912  Uint32 n = 0;
1913  while (n < count) {
1914  NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
1915  if (tOp == NULL ||
1916  tOp->deleteTuple() == -1 ||
1917  setPartKeyValue(tOp, part + n) == -1) {
1918  setErrorCode(tOp);
1919  DBUG_RETURN(-1);
1920  }
1921 
1922  tOp->m_abortOption = NdbOperation::AbortOnError;
1923  n++;
1924  thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
1925  theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
1926  theNdbCon->pendingBlobWriteBytes += thePartSize; /* Assume full part */
1927  }
1928  DBUG_RETURN(0);
1929 }
1930 
1931 /*
1932  * Number of blob parts not known. Used to check for race condition
1933  * when writeTuple is used for insert. Deletes all parts found.
1934  */
1935 int
1936 NdbBlob::deletePartsUnknown(Uint32 part)
1937 {
1938  DBUG_ENTER("NdbBlob::deletePartsUnknown");
1939  DBUG_PRINT("info", ("part=%u count=all", part));
1940  if (thePartSize == 0) // tinyblob
1941  DBUG_RETURN(0);
1942  static const unsigned maxbat = 256;
1943  static const unsigned minbat = 1;
1944  unsigned bat = minbat;
1945  NdbOperation* tOpList[maxbat];
1946  Uint32 count = 0;
1947  while (true) {
1948  Uint32 n;
1949  n = 0;
1950  /* How much quota left, avoiding underflow? */
1951  Uint32 remainingQuota = theNdbCon->maxPendingBlobWriteBytes -
1952  MIN(theNdbCon->maxPendingBlobWriteBytes, theNdbCon->pendingBlobWriteBytes);
1953  Uint32 deleteQuota = MAX(remainingQuota / thePartSize, 1);
1954  bat = MIN(deleteQuota, bat);
1955  while (n < bat) {
1956  NdbOperation*& tOp = tOpList[n]; // ref
1957  tOp = theNdbCon->getNdbOperation(theBlobTable);
1958  if (tOp == NULL ||
1959  tOp->deleteTuple() == -1 ||
1960  setPartKeyValue(tOp, part + count + n) == -1) {
1961  setErrorCode(tOp);
1962  DBUG_RETURN(-1);
1963  }
1964  tOp->m_abortOption= NdbOperation::AO_IgnoreError;
1965  tOp->m_noErrorPropagation = true;
1966  theNdbCon->pendingBlobWriteBytes += thePartSize;
1967  n++;
1968  }
1969  DBUG_PRINT("info", ("bat=%u", bat));
1970  if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
1971  DBUG_RETURN(-1);
1972  n = 0;
1973  while (n < bat) {
1974  NdbOperation* tOp = tOpList[n];
1975  if (tOp->theError.code != 0) {
1976  if (tOp->theError.code != 626) {
1977  setErrorCode(tOp);
1978  DBUG_RETURN(-1);
1979  }
1980  // first non-existent part
1981  DBUG_PRINT("info", ("count=%u", count));
1982  DBUG_RETURN(0);
1983  }
1984  n++;
1985  count++;
1986  }
1987  bat *= 4;
1988  if (bat > maxbat)
1989  bat = maxbat;
1990  }
1991 }
1992 
1993 // pending ops
1994 
1995 int
1996 NdbBlob::executePendingBlobReads()
1997 {
1998  DBUG_ENTER("NdbBlob::executePendingBlobReads");
1999  Uint8 flags = (1 << NdbOperation::ReadRequest);
2000  if (thePendingBlobOps & flags) {
2001  if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
2002  DBUG_RETURN(-1);
2003  thePendingBlobOps = 0;
2004  theNdbCon->thePendingBlobOps = 0;
2005  }
2006  DBUG_RETURN(0);
2007 }
2008 
2009 int
2010 NdbBlob::executePendingBlobWrites()
2011 {
2012  DBUG_ENTER("NdbBlob::executePendingBlobWrites");
2013  Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest);
2014  if (thePendingBlobOps & flags) {
2015  if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
2016  DBUG_RETURN(-1);
2017  thePendingBlobOps = 0;
2018  theNdbCon->thePendingBlobOps = 0;
2019  }
2020  DBUG_RETURN(0);
2021 }
2022 
2023 // callbacks
2024 
2025 int
2026 NdbBlob::invokeActiveHook()
2027 {
2028  DBUG_ENTER("NdbBlob::invokeActiveHook");
2029  assert(theState == Active && theActiveHook != NULL);
2030  int ret = (*theActiveHook)(this, theActiveHookArg);
2031  if (ret != 0) {
2032  // no error is set on blob level
2033  DBUG_RETURN(-1);
2034  }
2035  DBUG_RETURN(0);
2036 }
2037 
2038 // blob handle maintenance
2039 
2040 /*
2041  * Prepare blob handle linked to an operation.
2042  * This one for NdbRecAttr-based operation.
2043  *
2044  * For key operation fetches key data from signal data.
2045  */
2046 int
2047 NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn)
2048 {
2049  DBUG_ENTER("NdbBlob::atPrepare");
2050  DBUG_PRINT("info", ("this=%p op=%p con=%p version=%d fixed data=%d",
2051  this, theNdbOp, theNdbCon,
2052  theBlobVersion, theFixedDataFlag));
2053  if (atPrepareCommon(aCon, anOp, aColumn) == -1)
2054  DBUG_RETURN(-1);
2055 
2056  /* For scans using the old RecAttr API, we internally use an
2057  * NdbRecord.
2058  * For PK and Index ops, we do not
2059  */
2060  theNdbRecordFlag= isScanOp();
2061 
2062  // handle different operation types
2063  bool supportedOp = false;
2064  if (isKeyOp()) {
2065  if (isTableOp()) {
2066  // get table key
2067  Uint32* data = (Uint32*)thePackKeyBuf.data;
2068  Uint32 size = theTable->m_keyLenInWords; // in-out
2069  if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
2070  setErrorCode(NdbBlobImpl::ErrUsage);
2071  DBUG_RETURN(-1);
2072  }
2073  thePackKeyBuf.size = 4 * size;
2074  thePackKeyBuf.zerorest();
2075  if (unpackKeyValue(theTable, theKeyBuf) == -1)
2076  DBUG_RETURN(-1);
2077  }
2078  if (isIndexOp()) {
2079  // get index key
2080  Uint32* data = (Uint32*)thePackKeyBuf.data;
2081  Uint32 size = theAccessTable->m_keyLenInWords; // in-out
2082  if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
2083  setErrorCode(NdbBlobImpl::ErrUsage);
2084  DBUG_RETURN(-1);
2085  }
2086  thePackKeyBuf.size = 4 * size;
2087  thePackKeyBuf.zerorest();
2088  if (unpackKeyValue(theAccessTable, theAccessKeyBuf) == -1)
2089  DBUG_RETURN(-1);
2090  }
2091  supportedOp = true;
2092  }
2093  if (isScanOp())
2094  supportedOp = true;
2095 
2096  if (! supportedOp) {
2097  setErrorCode(NdbBlobImpl::ErrUsage);
2098  DBUG_RETURN(-1);
2099  }
2100  DBUG_RETURN(0);
2101 }
2102 
2103 /*
2104  * Common prepare code for NdbRecAttr and NdbRecord operations.
2105  * Checks blob table. Allocates buffers.
2106  * For read operation adds read of head+inline.
2107  */
2108 int
2109 NdbBlob::atPrepareCommon(NdbTransaction* aCon, NdbOperation* anOp,
2110  const NdbColumnImpl* aColumn)
2111 {
2112  assert(theState == Idle);
2113  init();
2114  // ndb api stuff
2115  theNdb = anOp->theNdb;
2116  theNdbCon = aCon; // for scan, this is the real transaction (m_transConnection)
2117  theNdbOp = anOp;
2118  theTable = anOp->m_currentTable;
2119  theAccessTable = anOp->m_accessTable;
2120  theColumn = aColumn;
2121  // prepare blob column and table
2122  if (prepareColumn() == -1)
2123  return -1;
2124  userDefinedPartitioning= (theTable->getFragmentType() ==
2125  NdbDictionary::Object::UserDefined);
2126  /* UserDefined Partitioning
2127  * If user has set partitionId specifically, take it for
2128  * Blob head and part operations
2129  */
2130  if (userDefinedPartitioning &&
2131  theNdbOp->theDistrKeyIndicator_) {
2132  thePartitionId = theNdbOp->getPartitionId();
2133  DBUG_PRINT("info", ("op partition id: %u", thePartitionId));
2134  }
2135  // extra buffers
2136  theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2);
2137  theHeadInlineCopyBuf.alloc(getHeadInlineSize());
2138 
2139  if (isKeyOp()) {
2140  if (isReadOp()) {
2141  // upgrade lock mode
2142  if ((theNdbOp->theLockMode == NdbOperation::LM_CommittedRead) ||
2143  (theNdbOp->theLockMode == NdbOperation::LM_SimpleRead))
2144  {
2145  assert(! theNdbOp->m_blob_lock_upgraded);
2146  theNdbOp->setReadLockMode(NdbOperation::LM_Read);
2147  theNdbOp->m_blob_lock_upgraded = true;
2148 
2149  if (!isIndexOp())
2150  {
2151  assert(theNdbOp->theLockHandle == NULL);
2152  /* If the kernel supports it we'll ask for a lockhandle
2153  * to allow us to unlock the main table row when the
2154  * Blob handle is closed
2155  */
2156  if (likely(theNdb->getMinDbNodeVersion() >=
2157  NDBD_UNLOCK_OP_SUPPORTED))
2158  {
2159  /* We've upgraded the lock from Committed/Simple to LM_Read
2160  * Now modify the read operation to request an NdbLockHandle
2161  * so that we can unlock the main table op on close()
2162  */
2163  if (theNdbOp->m_attribute_record)
2164  {
2165  /* NdbRecord op, need to set-up NdbLockHandle */
2166  int rc = theNdbOp->prepareGetLockHandleNdbRecord();
2167  if (rc != 0)
2168  {
2169  setErrorCode(rc, true);
2170  return -1;
2171  }
2172  }
2173  else
2174  {
2175  /* NdbRecAttr op, request lock handle read */
2176  int rc = theNdbOp->getLockHandleImpl();
2177  if (rc != 0)
2178  {
2179  setErrorCode(rc, true);
2180  return -1;
2181  }
2182  }
2183  }
2184  }
2185  }
2186  // add read of head+inline in this op
2187  if (getHeadInlineValue(theNdbOp) == -1)
2188  return -1;
2189  }
2190  if (isInsertOp()) {
2191  // becomes NULL unless set before execute
2192  theNullFlag = true;
2193  theLength = 0;
2194  }
2195  if (isWriteOp()) {
2196  // becomes NULL unless set before execute
2197  theNullFlag = true;
2198  theLength = 0;
2199  theHeadInlineUpdateFlag = true;
2200  }
2201  }
2202  if (isScanOp()) {
2203  /* Upgrade lock mode
2204  * Unfortunately, this is a bit messy, depending on which
2205  * type of underlying scan we have
2206  */
2207  NdbScanOperation *sop= reinterpret_cast<NdbScanOperation*> (theNdbOp);
2208 
2209  if (sop->m_scanUsingOldApi)
2210  {
2211  /* Old Api scans only have saved lockmode state at this pre-finalisation
2212  * point, so it's easy to change the mode
2213  */
2214  if ((sop->m_savedLockModeOldApi == NdbOperation::LM_CommittedRead) ||
2215  (sop->m_savedLockModeOldApi == NdbOperation::LM_SimpleRead))
2216  {
2217  assert(! theNdbOp->m_blob_lock_upgraded);
2218  sop->m_savedLockModeOldApi= NdbOperation::LM_Read;
2219  theNdbOp->m_blob_lock_upgraded = true;
2220  }
2221  }
2222  else
2223  {
2224  /* NdbRecord defined scans have had most signals built etc, so we need
2225  * to call the setReadLockMode method to do the right thing to change
2226  * the lockmode
2227  */
2228  if ((sop->theLockMode == NdbOperation::LM_CommittedRead) ||
2229  (sop->theLockMode == NdbOperation::LM_SimpleRead))
2230  {
2231  assert(! theNdbOp->m_blob_lock_upgraded);
2232  sop->setReadLockMode(NdbOperation::LM_Read);
2233  theNdbOp->m_blob_lock_upgraded = true;
2234  }
2235  }
2236 
2237  // add read of head+inline in this op
2238  if (getHeadInlineValue(sop) == -1)
2239  return -1;
2240  }
2241  setState(Prepared);
2242  return 0;
2243 }
2244 
2245 /* Prepare blob handle for key operation, NdbRecord version. */
2246 int
2247 NdbBlob::atPrepareNdbRecord(NdbTransaction* aCon, NdbOperation* anOp,
2248  const NdbColumnImpl* aColumn,
2249  const NdbRecord *key_record, const char *key_row)
2250 {
2251  int res;
2252  DBUG_ENTER("NdbBlob::atPrepareNdbRecord");
2253  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, anOp, aCon));
2254 
2255  theNdbRecordFlag= true;
2256  if (atPrepareCommon(aCon, anOp, aColumn) == -1)
2257  DBUG_RETURN(-1);
2258 
2259  assert(isKeyOp());
2260 
2261  if (isTableOp())
2262  {
2263  res= copyKeyFromRow(key_record, key_row, thePackKeyBuf, theKeyBuf);
2264 
2265  if (theNdbOp->theLockHandle)
2266  {
2267  /* Record in the lock handle that we have another
2268  * open Blob which must be closed before the
2269  * main table operation can be unlocked.
2270  */
2271  theNdbOp->theLockHandle->m_openBlobCount++;
2272  }
2273  }
2274  else if (isIndexOp())
2275  res= copyKeyFromRow(key_record, key_row, thePackKeyBuf, theAccessKeyBuf);
2276  if (res == -1)
2277  DBUG_RETURN(-1);
2278 
2279  DBUG_RETURN(0);
2280 }
2281 
2282 int
2283 NdbBlob::atPrepareNdbRecordTakeover(NdbTransaction* aCon, NdbOperation* anOp,
2284  const NdbColumnImpl* aColumn,
2285  const char *keyinfo, Uint32 keyinfo_bytes)
2286 {
2287  DBUG_ENTER("NdbBlob::atPrepareNdbRecordTakeover");
2288  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, anOp, aCon));
2289 
2290  theNdbRecordFlag= true;
2291  if (atPrepareCommon(aCon, anOp, aColumn) == -1)
2292  DBUG_RETURN(-1);
2293 
2294  assert(isKeyOp());
2295 
2296  /* Get primary key. */
2297  if (keyinfo_bytes > thePackKeyBuf.maxsize)
2298  {
2299  assert(false);
2300  DBUG_RETURN(-1);
2301  }
2302  memcpy(thePackKeyBuf.data, keyinfo, keyinfo_bytes);
2303  thePackKeyBuf.size= keyinfo_bytes;
2304  thePackKeyBuf.zerorest();
2305  if (unpackKeyValue(theTable, theKeyBuf) == -1)
2306  DBUG_RETURN(-1);
2307 
2308  if (theNdbOp->theLockHandle)
2309  {
2310  /* Record in the lock handle that we have another
2311  * open Blob which must be closed before the
2312  * main table operation can be unlocked.
2313  */
2314  theNdbOp->theLockHandle->m_openBlobCount++;
2315  }
2316 
2317  DBUG_RETURN(0);
2318 }
2319 
2320 /* Prepare blob handle for scan operation, NdbRecord version. */
2321 int
2322 NdbBlob::atPrepareNdbRecordScan(NdbTransaction* aCon, NdbOperation* anOp,
2323  const NdbColumnImpl* aColumn)
2324 {
2325  DBUG_ENTER("NdbBlob::atPrepareNdbRecordScan");
2326  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, anOp, aCon));
2327 
2328  theNdbRecordFlag= true;
2329  if (atPrepareCommon(aCon, anOp, aColumn) == -1)
2330  DBUG_RETURN(-1);
2331 
2332  assert(isScanOp());
2333 
2334  DBUG_RETURN(0);
2335 }
2336 
2337 int
2338 NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version)
2339 {
2340  DBUG_ENTER("NdbBlob::atPrepare [event]");
2341  assert(theState == Idle);
2342  init();
2343  assert(version == 0 || version == 1);
2344  theEventBlobVersion = version;
2345  // ndb api stuff
2346  theNdb = anOp->m_ndb;
2347  theEventOp = anOp;
2348  theBlobEventOp = aBlobOp;
2349  theTable = anOp->m_eventImpl->m_tableImpl;
2350  theAccessTable = theTable;
2351  theColumn = aColumn;
2352  // prepare blob column and table
2353  if (prepareColumn() == -1)
2354  DBUG_RETURN(-1);
2355  DBUG_PRINT("info", ("this=%p main op=%p blob op=%p version=%d fixed data=%d",
2356  this, anOp, aBlobOp,
2357  theBlobVersion, theFixedDataFlag));
2358  // tinyblob sanity
2359  assert((theBlobEventOp == NULL) == (theBlobTable == NULL));
2360  // extra buffers
2361  theBlobEventDataBuf.alloc(theVarsizeBytes + thePartSize);
2362  // prepare receive of head+inline
2363  theHeadInlineRecAttr = theEventOp->getValue(aColumn, theHeadInlineBuf.data, version);
2364  if (theHeadInlineRecAttr == NULL) {
2365  setErrorCode(theEventOp);
2366  DBUG_RETURN(-1);
2367  }
2368  // prepare receive of blob part
2369  if (theBlobEventOp != NULL) {
2370  const NdbColumnImpl* bc;
2371  char* buf;
2372  // one must subscribe to all primary keys
2373  if (unlikely(theBlobVersion == NDB_BLOB_V1)) {
2374  bc = theBlobTable->getColumn(theBtColumnNo[BtColumnPk]);
2375  buf = thePackKeyBuf.data;
2376  theBlobEventPkRecAttr = theBlobEventOp->getValue(bc, buf, version);
2377  //
2378  assert(theStripeSize != 0);
2379  bc = theBlobTable->getColumn(theBtColumnNo[BtColumnDist]);
2380  buf = (char*)&theBlobEventDistValue;
2381  theBlobEventDistRecAttr = theBlobEventOp->getValue(bc, buf, version);
2382  //
2383  bc = theBlobTable->getColumn(theBtColumnNo[BtColumnPart]);
2384  buf = (char*)&theBlobEventPartValue;
2385  theBlobEventPartRecAttr = theBlobEventOp->getValue(bc, buf, version);
2386  //
2387  bc = theBlobTable->getColumn(theBtColumnNo[BtColumnData]);
2388  buf = theBlobEventDataBuf.data;
2389  theBlobEventDataRecAttr = theBlobEventOp->getValue(bc, buf, version);
2390  if (unlikely(
2391  theBlobEventPkRecAttr == NULL ||
2392  theBlobEventDistRecAttr == NULL ||
2393  theBlobEventPartRecAttr == NULL ||
2394  theBlobEventDataRecAttr == NULL
2395  )) {
2396  setErrorCode(theBlobEventOp);
2397  DBUG_RETURN(-1);
2398  }
2399  } else {
2400  const uint columns = theTable->m_columns.size();
2401  const uint noOfKeys = theTable->m_noOfKeys;
2402  uint n = 0;
2403  uint i;
2404  for (i = 0; n < noOfKeys; i++) {
2405  assert(i < columns);
2406  const NdbColumnImpl* c = theTable->m_columns[i];
2407  assert(c != NULL);
2408  if (c->m_pk) {
2409  bc = theBlobTable->m_columns[n];
2410  assert(bc != NULL && bc->m_pk);
2411  NdbRecAttr* ra;
2412  ra = theBlobEventOp->getValue(bc, (char*)0, version);
2413  if (unlikely(ra == NULL)) {
2414  setErrorCode(theBlobEventOp);
2415  DBUG_RETURN(-1);
2416  }
2417  n++;
2418  }
2419  }
2420  if (theStripeSize != 0) {
2421  bc = theBlobTable->getColumn(theBtColumnNo[BtColumnDist]);
2422  buf = (char*)&theBlobEventDistValue;
2423  theBlobEventDistRecAttr = theBlobEventOp->getValue(bc, buf, version);
2424  }
2425  //
2426  bc = theBlobTable->getColumn(theBtColumnNo[BtColumnPart]);
2427  buf = (char*)&theBlobEventPartValue;
2428  theBlobEventPartRecAttr = theBlobEventOp->getValue(bc, buf, version);
2429  //
2430  bc = theBlobTable->getColumn(theBtColumnNo[BtColumnPkid]);
2431  buf = (char*)&theBlobEventPkidValue;
2432  theBlobEventPkidRecAttr = theBlobEventOp->getValue(bc, buf, version);
2433  //
2434  bc = theBlobTable->getColumn(theBtColumnNo[BtColumnData]);
2435  buf = theBlobEventDataBuf.data;
2436  theBlobEventDataRecAttr = theBlobEventOp->getValue(bc, buf, version);
2437  if (unlikely(
2438  (theStripeSize != 0 && theBlobEventDistRecAttr == NULL) ||
2439  theBlobEventPartRecAttr == NULL ||
2440  theBlobEventPkidRecAttr == NULL ||
2441  theBlobEventDataRecAttr == NULL
2442  )) {
2443  setErrorCode(theBlobEventOp);
2444  DBUG_RETURN(-1);
2445  }
2446  }
2447  }
2448  setState(Prepared);
2449  DBUG_RETURN(0);
2450 }
2451 
2452 int
2453 NdbBlob::prepareColumn()
2454 {
2455  DBUG_ENTER("prepareColumn");
2457  //
2458  theBlobVersion = theColumn->getBlobVersion();
2459  theInlineSize = theColumn->getInlineSize();
2460  thePartSize = theColumn->getPartSize();
2461  theStripeSize = theColumn->getStripeSize();
2462  //
2463  if (unlikely(theBlobVersion == NDB_BLOB_V1)) {
2464  theFixedDataFlag = true;
2465  theHeadSize = (NDB_BLOB_V1_HEAD_SIZE << 2);
2466  theVarsizeBytes = 0;
2467  switch (theColumn->getType()) {
2469  partType = NdbDictionary::Column::Binary;
2470  theFillChar = 0x0;
2471  break;
2473  partType = NdbDictionary::Column::Char;
2474  theFillChar = 0x20;
2475  break;
2476  default:
2477  setErrorCode(NdbBlobImpl::ErrUsage);
2478  DBUG_RETURN(-1);
2479  }
2480  // in V1 stripe size is != 0 (except tinyblob)
2481  assert(!(thePartSize != 0 && theStripeSize == 0));
2482  theBtColumnNo[BtColumnPk] = 0;
2483  theBtColumnNo[BtColumnDist] = 1;
2484  theBtColumnNo[BtColumnPart] = 2;
2485  theBtColumnNo[BtColumnData] = 3;
2486  } else if (theBlobVersion == NDB_BLOB_V2) {
2487  const Uint32 storageType = (Uint32)theColumn->getStorageType();
2488  theFixedDataFlag = (storageType != NDB_STORAGETYPE_MEMORY);
2489  theHeadSize = (NDB_BLOB_V2_HEAD_SIZE << 2);
2490  theVarsizeBytes = 2;
2491  switch (theColumn->getType()) {
2493  if (theFixedDataFlag) {
2494  partType = NdbDictionary::Column::Binary;
2495  theFillChar = 0x0;
2496  } else
2498  break;
2500  if (theFixedDataFlag) {
2501  partType = NdbDictionary::Column::Char;
2502  theFillChar = 0x20;
2503  } else
2505  break;
2506  default:
2507  setErrorCode(NdbBlobImpl::ErrUsage);
2508  DBUG_RETURN(-1);
2509  }
2510  uint off = theTable->m_noOfKeys;
2511  if (theStripeSize != 0) {
2512  theBtColumnNo[BtColumnDist] = off;
2513  off += 1;
2514  }
2515  theBtColumnNo[BtColumnPart] = off + 0;
2516  theBtColumnNo[BtColumnPkid] = off + 1;
2517  theBtColumnNo[BtColumnData] = off + 2;
2518  } else {
2519  setErrorCode(NdbBlobImpl::ErrUsage);
2520  DBUG_RETURN(-1);
2521  }
2522  // sanity check
2523  assert(theColumn->m_attrSize * theColumn->m_arraySize == getHeadInlineSize());
2524  if (thePartSize > 0) {
2525  const NdbTableImpl* bt = NULL;
2526  const NdbColumnImpl* bc = NULL;
2527  if ((bt = theColumn->m_blobTable) == NULL ||
2528  (bc = bt->getColumn(theBtColumnNo[BtColumnData])) == NULL ||
2529  bc->getType() != partType ||
2530  bc->getLength() != (int)thePartSize) {
2531  setErrorCode(NdbBlobImpl::ErrTable);
2532  DBUG_RETURN(-1);
2533  }
2534  // blob table
2535  theBlobTable = &NdbTableImpl::getImpl(*bt);
2536  }
2537  // these buffers are always used
2538  theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
2539  thePackKeyBuf.alloc(MAX(theTable->m_keyLenInWords, theAccessTable->m_keyLenInWords) << 2);
2540  theHeadInlineBuf.alloc(getHeadInlineSize());
2541  theInlineData = theHeadInlineBuf.data + theHeadSize;
2542  // no length bytes
2543  thePartBuf.alloc(thePartSize);
2544  DBUG_RETURN(0);
2545 }
2546 
2547 /*
2548  * Before execute of prepared operation.
2549  *
2550  * This method adds any extra operations required to perform the
2551  * requested Blob operations.
2552  * This can include :
2553  * Extra read operations added before the 'main table' operation
2554  * Read Blob head + inline bytes
2555  * Read original table key via access index
2556  * Extra operations added after the 'main table' operation
2557  * Update Blob head + inline bytes
2558  * Insert Blob parts
2559  *
2560  * Generally, operations are performed in preExecute() if possible,
2561  * and postExecute if not.
2562  *
2563  * If this method sets the batch parameter to true, then
2564  * - any remaining Blobs in the current user defined operation
2565  * will have their preExecute() method called.
2566  * - all operations up to the last one added will be executed with
2567  * NoCommit BEFORE the next user-defined operation is executed.
2568  * - NdbBlob::postExecute() will be called for all Blobs in the
2569  * executed batch.
2570  * - Processing will continue with the next user-defined operation
2571  * (if any)
2572  * This control flow can be seen in NdbTransaction::execute().
2573  */
2574 int
2575 NdbBlob::preExecute(NdbTransaction::ExecType anExecType,
2576  bool& batch)
2577 {
2578  DBUG_ENTER("NdbBlob::preExecute");
2579  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
2580  DBUG_PRINT("info", ("optype=%d theGetSetBytes=%d theSetFlag=%d",
2581  theNdbOp->theOperationType,
2582  theGetSetBytes,
2583  theSetFlag));
2584  if (theState == Invalid)
2585  DBUG_RETURN(-1);
2586  assert(theState == Prepared);
2587  // handle different operation types
2588  assert(isKeyOp());
2589 
2590  /* Check that a non-nullable blob handle has had a value set
2591  * before proceeding
2592  */
2593  if (!theColumn->m_nullable &&
2594  (isInsertOp() || isWriteOp()) &&
2595  !theSetFlag)
2596  {
2597  /* Illegal null attribute */
2598  setErrorCode(839);
2599  DBUG_RETURN(-1);
2600  }
2601 
2602  if (isReadOp()) {
2603  if (theGetFlag && theGetSetBytes > theInlineSize) {
2604  /* Need blob head before proceeding
2605  * Not safe to do a speculative read of parts, as we do not
2606  * yet hold a lock on the blob head+inline
2607  */
2608  batch = true;
2609  }
2610  }
2611  if (isInsertOp() && theSetFlag) {
2612  /* If the main operation uses AbortOnError then
2613  * we can add operations to insert parts and update
2614  * the Blob head+inline here.
2615  * If the main operation uses IgnoreError then
2616  * we have to wait until we are sure that the main
2617  * insert succeeded before performing any other
2618  * operations (Otherwise we may perform duplicate insert,
2619  * and the transaction can fail on the AbortOnError
2620  * part operations or corrupt the head with the
2621  * post-update operation)
2622  *
2623  * Additionally, if the insert is large, we'll defer to
2624  * postExecute, where we can perform the writes at a more
2625  * leisurely pace.
2626  * We defer if we are writing more part data than we have
2627  * remaining quota for.
2628  */
2629  theSetValueInPreExecFlag =
2630  ((theNdbOp->m_abortOption == NdbOperation::AbortOnError) &&
2631  ((theGetSetBytes <= theInlineSize) || // Parts being written
2632  ((theGetSetBytes - theInlineSize) <= // Total part size <=
2633  (theNdbCon->maxPendingBlobWriteBytes - // (Quota -
2634  MIN(theNdbCon->maxPendingBlobWriteBytes,
2635  theNdbCon->pendingBlobWriteBytes)) // bytes_written)
2636  )));
2637 
2638  if (theSetValueInPreExecFlag)
2639  {
2640  DBUG_PRINT("info",
2641  ("Insert extra ops added in preExecute"));
2642  /* Add operations to insert parts and update the
2643  * Blob head+inline in the main tables
2644  */
2645  if (theGetSetBytes > theInlineSize) {
2646  // add ops to write rest of a setValue
2647  assert(theSetBuf != NULL);
2648  const char* buf = theSetBuf + theInlineSize;
2649  Uint32 bytes = theGetSetBytes - theInlineSize;
2650  assert(thePos == theInlineSize);
2651  Uint32 savePendingBlobWriteBytes = theNdbCon->pendingBlobWriteBytes;
2652  if (writeDataPrivate(buf, bytes) == -1)
2653  DBUG_RETURN(-1);
2654  /* Assert that we didn't execute inline there */
2655  assert(theNdbCon->pendingBlobWriteBytes >
2656  savePendingBlobWriteBytes);
2657  }
2658 
2659  if (theHeadInlineUpdateFlag)
2660  {
2661  NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
2662  if (tOp == NULL ||
2663  tOp->updateTuple() == -1 ||
2664  setTableKeyValue(tOp) == -1 ||
2665  setHeadInlineValue(tOp) == -1) {
2666  setErrorCode(NdbBlobImpl::ErrAbort);
2667  DBUG_RETURN(-1);
2668  }
2669  setHeadPartitionId(tOp);
2670 
2671  DBUG_PRINT("info", ("Insert : added op to update head+inline in preExecute"));
2672  }
2673  }
2674  else
2675  {
2676  DBUG_PRINT("info",
2677  ("Insert waiting for Blob head insert"));
2678  /* Require that this insert op is completed
2679  * before beginning more user ops - avoid interleave
2680  * with delete etc.
2681  */
2682  batch= true;
2683  }
2684  }
2685 
2686  if (isTableOp()) {
2687  if (isUpdateOp() || isWriteOp() || isDeleteOp()) {
2688  // add operation before main table op to read head+inline
2689  NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp);
2690  /*
2691  * If main op is from take over scan lock, the added read is done
2692  * as committed read:
2693  *
2694  * In normal transactional case, the row is locked by us and
2695  * committed read returns same as normal read.
2696  *
2697  * In current TRUNCATE TABLE, the deleting trans is committed in
2698  * batches and then restarted with new trans id. A normal read
2699  * would hang on the scan delete lock and then fail.
2700  */
2701  NdbOperation::LockMode lockMode =
2702  ! isTakeOverOp() ?
2704  if (tOp == NULL ||
2705  tOp->readTuple(lockMode) == -1 ||
2706  setTableKeyValue(tOp) == -1 ||
2707  getHeadInlineValue(tOp) == -1) {
2708  setErrorCode(tOp);
2709  DBUG_RETURN(-1);
2710  }
2711  setHeadPartitionId(tOp);
2712 
2713  if (isWriteOp()) {
2714  /* There may be no data currently, so ignore tuple not found etc. */
2715  tOp->m_abortOption = NdbOperation::AO_IgnoreError;
2716  tOp->m_noErrorPropagation = true;
2717  }
2718  theHeadInlineReadOp = tOp;
2719  // TODO : Could reuse this op for fetching other blob heads in
2720  // the request?
2721  // Add their getHeadInlineValue() calls to this, rather
2722  // than having separate ops? (Similar to Index read below)
2723  // execute immediately
2724  // TODO : Why can't we continue with pre-execute of other user ops?
2725  // Rationales that occur:
2726  // - We're trying to keep user's op order consistent -
2727  // 1 op completes before another starts.
2728  // - They probably shouldn't rely on this
2729  // - Maybe it makes failure more atomic w.r.t. separate
2730  // operations on Blobs
2731  // - Or perhaps error handling is easier?
2732  batch = true;
2733  DBUG_PRINT("info", ("added op before to read head+inline"));
2734  }
2735  }
2736  if (isIndexOp()) {
2737  // add op before this one to read table key
2738  NdbBlob* tFirstBlob = theNdbOp->theBlobList;
2739  if (this == tFirstBlob) {
2740  // first blob does it for all
2741  if (g_ndb_blob_ok_to_read_index_table) {
2742  /* Cannot work for userDefinedPartitioning + write() op as
2743  * we need to read the 'main' partition Id
2744  * Maybe this branch should be removed?
2745  */
2746  assert(!userDefinedPartitioning);
2747  Uint32 pkAttrId = theAccessTable->getNoOfColumns() - 1;
2748  NdbOperation* tOp = theNdbCon->getNdbOperation(theAccessTable, theNdbOp);
2749  if (tOp == NULL ||
2750  tOp->readTuple() == -1 ||
2751  setAccessKeyValue(tOp) == -1 ||
2752  tOp->getValue(pkAttrId, thePackKeyBuf.data) == NULL) {
2753  setErrorCode(tOp);
2754  DBUG_RETURN(-1);
2755  }
2756  } else {
2757  NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
2758  if (tOp == NULL ||
2759  tOp->readTuple() == -1 ||
2760  setAccessKeyValue(tOp) == -1 ||
2761  getTableKeyValue(tOp) == -1) {
2762  setErrorCode(tOp);
2763  DBUG_RETURN(-1);
2764  }
2765  if (userDefinedPartitioning && isWriteOp())
2766  {
2767  /* Index Write op does not perform head read before deleting parts
2768  * as it cannot safely IgnoreErrors.
2769  * To get partitioning right we read partition id for main row
2770  * here.
2771  */
2772  thePartitionIdRecAttr = tOp->getValue_impl(&NdbColumnImpl::getImpl(*NdbDictionary::Column::FRAGMENT));
2773 
2774  if (thePartitionIdRecAttr == NULL) {
2775  setErrorCode(tOp);
2776  DBUG_RETURN(-1);
2777  }
2778  }
2779  }
2780  DBUG_PRINT("info", ("Index op : added op before to read table key"));
2781  }
2782  if (isUpdateOp() || isDeleteOp()) {
2783  // add op before this one to read head+inline via index
2784  NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
2785  if (tOp == NULL ||
2786  tOp->readTuple() == -1 ||
2787  setAccessKeyValue(tOp) == -1 ||
2788  getHeadInlineValue(tOp) == -1) {
2789  setErrorCode(tOp);
2790  DBUG_RETURN(-1);
2791  }
2792  theHeadInlineReadOp = tOp;
2793  // execute immediately
2794  // TODO : Why execute immediately? We could continue with other blobs
2795  // etc. here
2796  batch = true;
2797  DBUG_PRINT("info", ("added index op before to read head+inline"));
2798  }
2799  if (isWriteOp()) {
2800  // XXX until IgnoreError fixed for index op
2801  batch = true;
2802  }
2803  }
2804  if (isWriteOp()) {
2805  if (theSetFlag) {
2806  // write head+inline now
2807  theNullFlag = true;
2808  theLength = 0;
2809  /* Copy data into the headinline buffer */
2810  if (theSetBuf != NULL) {
2811  Uint32 n = theGetSetBytes;
2812  if (n > theInlineSize)
2813  n = theInlineSize;
2814  assert(thePos == 0);
2815  if (writeDataPrivate(theSetBuf, n) == -1)
2816  DBUG_RETURN(-1);
2817  }
2818  /*
2819  * We set the value of the blob head and inline data here if possible.
2820  * Note that the length is being set to max theInlineSize. This will
2821  * be written with the correct length later if necessary.
2822  */
2823  if (!theNdbRecordFlag)
2824  {
2825  if (setHeadInlineValue(theNdbOp) == -1)
2826  DBUG_RETURN(-1);
2827  }
2828  else
2829  {
2830  /* For table based NdbRecord writes we can set the head+inline
2831  * bytes here. For index based writes, we need to wait until
2832  * after the execute for the table key data to be available.
2833  * TODO : Is it worth doing this at all?
2834  */
2835  if (isTableOp())
2836  {
2837  /* NdbRecord - add an update operation after the main op */
2838  NdbOperation* tOp =
2839  theNdbCon->getNdbOperation(theTable);
2840  if (tOp == NULL ||
2841  tOp->updateTuple() == -1 ||
2842  setTableKeyValue(tOp) == -1 ||
2843  setHeadInlineValue(tOp) == -1) {
2844  setErrorCode(NdbBlobImpl::ErrAbort);
2845  DBUG_RETURN(-1);
2846  }
2847  setHeadPartitionId(tOp);
2848 
2849  DBUG_PRINT("info", ("NdbRecord table write : added op to update head+inline"));
2850  }
2851  }
2852  /* Save the contents of the head inline buf for postExecute
2853  * It may get overwritten by the read operation injected
2854  * above
2855  */
2856  theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf);
2857  }
2858  }
2859  if (theActiveHook != NULL) {
2860  // need blob head for callback
2861  batch = true;
2862  }
2863  DBUG_PRINT("info", ("batch=%u", batch));
2864  DBUG_RETURN(0);
2865 }
2866 
2867 /*
2868  * After execute, for each Blob in an operation. If already Active,
2869  * this routine has been done previously and is not rerun.
2870  * Operations which requested a no-commit batch can add new operations
2871  * after this one. They are added before any remaining prepared user
2872  * operations (See NdbTransaction::execute())
2873  *
2874  * This method has the following duties :
2875  * - Operation specific duties :
2876  * - Index based ops : Store main table key retrieved in preExecute
2877  * - Read ops : Store read head+inline and read parts (inline execute)
2878  * - Update ops : Store read head+inline and update parts (inline execute)
2879  * - Table based write : Either store read head+inline and delete then
2880  * insert parts and head+inline (inline execute) OR
2881  * Perform deletePartsUnknown() to avoid lockless
2882  * race with another transaction, then update head
2883  * and insert parts (inline execute)
2884  * - Index based write : Always perform deletePartsUnknown based on
2885  * fetched main table key then update head+inline
2886  * and insert parts (inline execute)
2887  * Rationale: Couldn't read head+inline safely as
2888  * Index ops don't support IgnoreError so could
2889  * cause Txn fail for write()?
2890  * - Delete op : Store read head+inline info and use to delete parts
2891  * (inline execute)
2892  * - Change Blob handle state to Active
2893  * - Execute user's activeHook function if set
2894  * - Add an operation to update the Blob's head+inline bytes if
2895  * necesary
2896  */
2897 int
2898 NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
2899 {
2900  DBUG_ENTER("NdbBlob::postExecute");
2901  DBUG_PRINT("info", ("this=%p op=%p con=%p anExecType=%u", this, theNdbOp, theNdbCon, anExecType));
2902  if (theState == Closed)
2903  DBUG_RETURN(0); // Nothing to do here
2904  if (theState == Invalid)
2905  DBUG_RETURN(-1);
2906  if (theState == Active) {
2907  setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
2908  DBUG_PRINT("info", ("skip active"));
2909  DBUG_RETURN(0);
2910  }
2911  assert(theState == Prepared);
2912  setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
2913  assert(isKeyOp());
2914  if (isIndexOp()) {
2915  NdbBlob* tFirstBlob = theNdbOp->theBlobList;
2916  if (this == tFirstBlob) {
2917  packKeyValue(theTable, theKeyBuf);
2918  } else {
2919  // copy key from first blob
2920  theKeyBuf.copyfrom(tFirstBlob->theKeyBuf);
2921  thePackKeyBuf.copyfrom(tFirstBlob->thePackKeyBuf);
2922  thePackKeyBuf.zerorest();
2923  }
2924  }
2925  if (isReadOp()) {
2926  /*
2927  We injected a read of blob head into the operation, and need to
2928  set theLength and theNullFlag from it.
2929  */
2930  getHeadFromRecAttr();
2931 
2932  if (setPos(0) == -1)
2933  DBUG_RETURN(-1);
2934  if (theGetFlag) {
2935  assert(theGetSetBytes == 0 || theGetBuf != 0);
2936  assert(theGetSetBytes <= theInlineSize ||
2937  anExecType == NdbTransaction::NoCommit);
2938  Uint32 bytes = theGetSetBytes;
2939  if (readDataPrivate(theGetBuf, bytes) == -1)
2940  DBUG_RETURN(-1);
2941  }
2942  }
2943  if (isInsertOp() && theSetFlag) {
2944  /* For Inserts where the main table operation is IgnoreError,
2945  * we perform extra operations on the head and inline parts
2946  * now, as we know that the main table row was inserted
2947  * successfully.
2948  *
2949  * Additionally, if the insert was large, we deferred writing
2950  * until now to better control the flow of part operations.
2951  * See preExecute()
2952  */
2953  if (! theSetValueInPreExecFlag)
2954  {
2955  DBUG_PRINT("info", ("Insert adding extra ops"));
2956  /* Check the main table op for an error (don't proceed if
2957  * it failed)
2958  */
2959  if (theNdbOp->theError.code == 0)
2960  {
2961  /* Add operations to insert parts and update the
2962  * Blob head+inline in the main table
2963  */
2964  if (theGetSetBytes > theInlineSize) {
2965  // add ops to write rest of a setValue
2966  assert(theSetBuf != NULL);
2967  const char* buf = theSetBuf + theInlineSize;
2968  Uint32 bytes = theGetSetBytes - theInlineSize;
2969  assert(thePos == theInlineSize);
2970  if (writeDataPrivate(buf, bytes) == -1)
2971  DBUG_RETURN(-1);
2972  }
2973 
2974  if (theHeadInlineUpdateFlag)
2975  {
2976  NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
2977  if (tOp == NULL ||
2978  tOp->updateTuple() == -1 ||
2979  setTableKeyValue(tOp) == -1 ||
2980  setHeadInlineValue(tOp) == -1) {
2981  setErrorCode(NdbBlobImpl::ErrAbort);
2982  DBUG_RETURN(-1);
2983  }
2984  setHeadPartitionId(tOp);
2985 
2986  DBUG_PRINT("info", ("Insert : added op to update head+inline"));
2987  }
2988  }
2989  // NOTE : Could map IgnoreError insert error onto Blob here
2990  }
2991  }
2992 
2993  if (isUpdateOp()) {
2994  assert(anExecType == NdbTransaction::NoCommit);
2995  getHeadFromRecAttr();
2996  if (theSetFlag) {
2997  // setValue overwrites everything
2998  if (theSetBuf != NULL) {
2999  if (truncate(0) == -1)
3000  DBUG_RETURN(-1);
3001  assert(thePos == 0);
3002  if (writeDataPrivate(theSetBuf, theGetSetBytes) == -1)
3003  DBUG_RETURN(-1);
3004  } else {
3005  if (setNull() == -1)
3006  DBUG_RETURN(-1);
3007  }
3008  }
3009  }
3010  if (isWriteOp() && isTableOp()) {
3011  assert(anExecType == NdbTransaction::NoCommit);
3012  if (theHeadInlineReadOp->theError.code == 0) {
3013  int tNullFlag = theNullFlag;
3014  Uint64 tLength = theLength;
3015  Uint64 tPos = thePos;
3016  getHeadFromRecAttr();
3017  DBUG_PRINT("info", ("tuple found"));
3018  if (truncate(0) == -1)
3019  DBUG_RETURN(-1);
3020  // restore previous head+inline
3021  theHeadInlineBuf.copyfrom(theHeadInlineCopyBuf);
3022  theNullFlag = tNullFlag;
3023  theLength = tLength;
3024  thePos = tPos;
3025  } else {
3026  if (theHeadInlineReadOp->theError.code != 626) {
3027  setErrorCode(theHeadInlineReadOp);
3028  DBUG_RETURN(-1);
3029  }
3030  DBUG_PRINT("info", ("tuple not found"));
3031  /*
3032  * Read found no tuple but it is possible that a tuple was
3033  * created after the read by another transaction. Delete all
3034  * blob parts which may exist.
3035  */
3036  if (deletePartsUnknown(0) == -1)
3037  DBUG_RETURN(-1);
3038  }
3039  if (theSetFlag && theGetSetBytes > theInlineSize) {
3040  assert(theSetBuf != NULL);
3041  const char* buf = theSetBuf + theInlineSize;
3042  Uint32 bytes = theGetSetBytes - theInlineSize;
3043  assert(thePos == theInlineSize);
3044  if (writeDataPrivate(buf, bytes) == -1)
3045  DBUG_RETURN(-1);
3046  }
3047  }
3048  if (isWriteOp() && isIndexOp()) {
3049  // XXX until IgnoreError fixed for index op
3050  if (userDefinedPartitioning)
3051  {
3052  /* For Index Write with UserDefined partitioning, we get the
3053  * partition id from the main table key read created in
3054  * preExecute().
3055  * Extra complexity as only the first Blob does the read, other
3056  * Blobs grab result from first.
3057  */
3058  if (thePartitionIdRecAttr != NULL)
3059  {
3060  assert( this == theNdbOp->theBlobList );
3061  Uint32 id= thePartitionIdRecAttr->u_32_value();
3062  assert( id != noPartitionId() );
3063  DBUG_PRINT("info", ("Index write, setting partition id to %d", id));
3064  thePartitionId= id;
3065  }
3066  else
3067  {
3068  /* First Blob (not us) in this op got the partition Id */
3069  assert( theNdbOp->theBlobList );
3070  assert( this != theNdbOp->theBlobList );
3071 
3072  thePartitionId= theNdbOp->theBlobList->thePartitionId;
3073 
3074  assert(thePartitionId != noPartitionId());
3075  }
3076  }
3077  if (deletePartsUnknown(0) == -1)
3078  DBUG_RETURN(-1);
3079  if (theSetFlag && theGetSetBytes > theInlineSize) {
3080  assert(theSetBuf != NULL);
3081  const char* buf = theSetBuf + theInlineSize;
3082  Uint32 bytes = theGetSetBytes - theInlineSize;
3083  assert(thePos == theInlineSize);
3084  if (writeDataPrivate(buf, bytes) == -1)
3085  DBUG_RETURN(-1);
3086  }
3087  }
3088  if (isDeleteOp()) {
3089  assert(anExecType == NdbTransaction::NoCommit);
3090  getHeadFromRecAttr();
3091  if (deletePartsThrottled(0, getPartCount()) == -1)
3092  DBUG_RETURN(-1);
3093  }
3094  setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
3095  // activation callback
3096  if (theActiveHook != NULL) {
3097  if (invokeActiveHook() == -1)
3098  DBUG_RETURN(-1);
3099  }
3100  /* Cope with any changes to the head */
3101  if (anExecType == NdbTransaction::NoCommit && theHeadInlineUpdateFlag) {
3102  NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
3103  if (tOp == NULL ||
3104  tOp->updateTuple() == -1 ||
3105  setTableKeyValue(tOp) == -1 ||
3106  setHeadInlineValue(tOp) == -1) {
3107  setErrorCode(NdbBlobImpl::ErrAbort);
3108  DBUG_RETURN(-1);
3109  }
3110  setHeadPartitionId(tOp);
3111 
3112  tOp->m_abortOption = NdbOperation::AbortOnError;
3113  DBUG_PRINT("info", ("added op to update head+inline"));
3114  }
3115  DBUG_RETURN(0);
3116 }
3117 
3118 /*
3119  * Before commit of completed operation. For write add operation to
3120  * update head+inline if necessary. This code is the same as the
3121  * last part of postExecute()
3122  */
3123 int
3124 NdbBlob::preCommit()
3125 {
3126  DBUG_ENTER("NdbBlob::preCommit");
3127  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
3128  if (theState == Closed)
3129  DBUG_RETURN(0); // Nothing to do here
3130  if (theState == Invalid)
3131  DBUG_RETURN(-1);
3132  if (unlikely((theState == Prepared) &&
3133  (theNdbCon->commitStatus() == NdbTransaction::Aborted)))
3134  {
3135  /* execute(Commit) called after transaction aborted from kernel
3136  * Do nothing here - the call will fail later.
3137  */
3138  DBUG_RETURN(0);
3139  }
3140  assert(theState == Active);
3141  assert(isKeyOp());
3142  if (isInsertOp() || isUpdateOp() || isWriteOp()) {
3143  if (theHeadInlineUpdateFlag) {
3144  // add an operation to update head+inline
3145  NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
3146  if (tOp == NULL ||
3147  tOp->updateTuple() == -1 ||
3148  setTableKeyValue(tOp) == -1 ||
3149  setHeadInlineValue(tOp) == -1) {
3150  setErrorCode(NdbBlobImpl::ErrAbort);
3151  DBUG_RETURN(-1);
3152  }
3153  setHeadPartitionId(tOp);
3154 
3155  tOp->m_abortOption = NdbOperation::AbortOnError;
3156  DBUG_PRINT("info", ("added op to update head+inline"));
3157  }
3158  }
3159  DBUG_RETURN(0);
3160 }
3161 
3162 /*
3163  After next scan result. Handle like read op above. NdbRecAttr version.
3164  Obtain the primary key from KEYINFO20.
3165  */
3166 int
3167 NdbBlob::atNextResult()
3168 {
3169  DBUG_ENTER("NdbBlob::atNextResult");
3170  DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
3171  if (theState == Invalid)
3172  DBUG_RETURN(-1);
3173  assert(isScanOp());
3174  // get primary key
3175  { NdbScanOperation* tScanOp = (NdbScanOperation*)theNdbOp;
3176  Uint32* data = (Uint32*)thePackKeyBuf.data;
3177  unsigned size = theTable->m_keyLenInWords; // in-out
3178  if (tScanOp->getKeyFromKEYINFO20(data, size) == -1) {
3179  setErrorCode(NdbBlobImpl::ErrUsage);
3180  DBUG_RETURN(-1);
3181  }
3182  thePackKeyBuf.size = 4 * size;
3183  thePackKeyBuf.zerorest();
3184  if (unpackKeyValue(theTable, theKeyBuf) == -1)
3185  DBUG_RETURN(-1);
3186  }
3187 
3188  DBUG_RETURN(atNextResultCommon());
3189 }
3190 
3191 /*
3192  After next scan result, NdbRecord version.
3193  For NdbRecord, the keyinfo is given as parameter.
3194 */
3195 int
3196 NdbBlob::atNextResultNdbRecord(const char *keyinfo, Uint32 keyinfo_bytes)
3197 {
3198  DBUG_ENTER("NdbBlob::atNextResultNdbRecord");
3199  DBUG_PRINT("info", ("this=%p op=%p con=%p keyinfo_bytes=%lu",
3200  this, theNdbOp, theNdbCon,
3201  (unsigned long)keyinfo_bytes));
3202  if (theState == Invalid)
3203  DBUG_RETURN(-1);
3204  assert(isScanOp());
3205  /* Get primary key. */
3206  memcpy(thePackKeyBuf.data, keyinfo, keyinfo_bytes);
3207  thePackKeyBuf.size= keyinfo_bytes;
3208  thePackKeyBuf.zerorest();
3209  if (unpackKeyValue(theTable, theKeyBuf) == -1)
3210  DBUG_RETURN(-1);
3211 
3212  DBUG_RETURN(atNextResultCommon());
3213 }
3214 
3215 /* After next scan result. Stuff common to NdbRecAttr and NdbRecord case. */
3216 int
3217 NdbBlob::atNextResultCommon()
3218 {
3219  DBUG_ENTER("NdbBlob::atNextResultCommon");
3220  // discard previous partition id before reading new one
3221  thePartitionId = noPartitionId();
3222  getHeadFromRecAttr();
3223  if (setPos(0) == -1)
3224  DBUG_RETURN(-1);
3225  if (theGetFlag) {
3226  assert(theGetSetBytes == 0 || theGetBuf != 0);
3227  Uint32 bytes = theGetSetBytes;
3228  if (readDataPrivate(theGetBuf, bytes) == -1)
3229  DBUG_RETURN(-1);
3230  }
3231  setState(Active);
3232  // activation callback
3233  if (theActiveHook != NULL) {
3234  if (invokeActiveHook() == -1)
3235  DBUG_RETURN(-1);
3236  }
3237  DBUG_RETURN(0);
3238 }
3239 
3240 /*
3241  * After next event on main table.
3242  */
3243 int
3244 NdbBlob::atNextEvent()
3245 {
3246  DBUG_ENTER("NdbBlob::atNextEvent");
3247  Uint32 optype =
3248  SubTableData::getOperation(theEventOp->m_data_item->sdata->requestInfo);
3249  DBUG_PRINT("info", ("this=%p op=%p blob op=%p version=%d optype=%u", this, theEventOp, theBlobEventOp, theEventBlobVersion, optype));
3250  if (theState == Invalid)
3251  DBUG_RETURN(-1);
3252  assert(theEventBlobVersion >= 0);
3253  if (optype >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT)
3254  DBUG_RETURN(0);
3255  getHeadFromRecAttr();
3256  if (theNullFlag == -1) // value not defined
3257  DBUG_RETURN(0);
3258  if (setPos(0) == -1)
3259  DBUG_RETURN(-1);
3260  setState(Active);
3261  DBUG_RETURN(0);
3262 }
3263 
3264 // misc
3265 
3266 const NdbDictionary::Column*
3268 {
3269  return theColumn;
3270 }
3271 
3272 // errors
3273 
3274 void
3275 NdbBlob::setErrorCode(int anErrorCode, bool invalidFlag)
3276 {
3277  DBUG_ENTER("NdbBlob::setErrorCode");
3278  DBUG_PRINT("info", ("this=%p code=%u", this, anErrorCode));
3279  theError.code = anErrorCode;
3280  // conditionally copy error to operation level
3281  if (theNdbOp != NULL && theNdbOp->theError.code == 0)
3282  theNdbOp->setErrorCode(theError.code);
3283  if (invalidFlag)
3284  setState(Invalid);
3285 #ifdef VM_TRACE
3286  if (NdbEnv_GetEnv("NDB_BLOB_ABORT_ON_ERROR", (char*)0, 0)) {
3287  abort();
3288  }
3289 #endif
3290  DBUG_VOID_RETURN;
3291 }
3292 
3293 void
3294 NdbBlob::setErrorCode(NdbOperation* anOp, bool invalidFlag)
3295 {
3296  int code = 0;
3297  if (anOp != NULL && (code = anOp->theError.code) != 0)
3298  ;
3299  else if ((code = theNdbCon->theError.code) != 0)
3300  ;
3301  else if ((code = theNdb->theError.code) != 0)
3302  ;
3303  else
3304  code = NdbBlobImpl::ErrUnknown;
3305  setErrorCode(code, invalidFlag);
3306 }
3307 
3308 void
3309 NdbBlob::setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag)
3310 {
3311  int code = 0;
3312  if ((code = anOp->m_error.code) != 0)
3313  ;
3314  else
3315  code = NdbBlobImpl::ErrUnknown;
3316  setErrorCode(code, invalidFlag);
3317 }
3318 
3319 // info about all blobs in this operation
3320 
3321 NdbBlob*
3323 {
3324  return theNdbOp->theBlobList;
3325 }
3326 
3327 NdbBlob*
3329 {
3330  return theNext;
3331 }
3332 
3333 const NdbOperation*
3335 {
3336  return theNdbOp;
3337 }
3338 
3339 int
3340 NdbBlob::close(bool execPendingBlobOps)
3341 {
3342  DBUG_ENTER("NdbBlob::close");
3343  DBUG_PRINT("info", ("this=%p state=%u", this, theState));
3344 
3345  /* A Blob can only be closed if it is in the Active state
3346  * with no pending operations
3347  */
3348  if (theState != Active)
3349  {
3350  /* NdbBlob can only be closed from Active state */
3351  setErrorCode(4554);
3352  DBUG_RETURN(-1);
3353  }
3354 
3355  if (execPendingBlobOps)
3356  {
3357  if (thePendingBlobOps != 0)
3358  {
3359  if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1)
3360  DBUG_RETURN(-1);
3361  thePendingBlobOps = 0;
3362  theNdbCon->thePendingBlobOps = 0;
3363  }
3364  }
3365  else if (thePendingBlobOps != 0)
3366  {
3367  /* NdbBlob cannot be closed with pending operations */
3368  setErrorCode(4555);
3369  DBUG_RETURN(-1);
3370  }
3371 
3372  setState(Closed);
3373 
3374  if (theNdbOp->theLockHandle)
3375  {
3376  DBUG_PRINT("info",
3377  ("Decrementing lockhandle Blob ref count to %d",
3378  theNdbOp->theLockHandle->m_openBlobCount -1));
3379 
3380  /* Reduce open blob ref count in main table
3381  * operation's lock handle
3382  * The main table operation can only be unlocked when
3383  * the LockHandle's open blob refcount is zero.
3384  */
3385  assert(theNdbOp->theLockHandle->m_openBlobCount > 0);
3386 
3387  theNdbOp->theLockHandle->m_openBlobCount --;
3388  }
3389 
3390  if (theNdbOp->m_blob_lock_upgraded)
3391  {
3392  assert( theNdbOp->theLockMode == NdbOperation::LM_Read );
3393 
3394  /* In some upgrade scenarios, kernel may not support
3395  * unlock, so there will be no LockHandle
3396  * In that case we revert to the old behaviour -
3397  * do nothing and the main table row stays locked until
3398  * commit / abort.
3399  */
3400  if (likely(theNdbOp->theLockHandle != NULL))
3401  {
3402  if (theNdbOp->theLockHandle->m_openBlobCount == 0)
3403  {
3404  DBUG_PRINT("info",
3405  ("Upgraded -> LM_Read lock "
3406  "now no longer required. Issuing unlock "
3407  " operation"));
3408  /* We can now issue an unlock operation for the main
3409  * table row - it was supposed to be LM_CommittedRead / LM_SimpleRead
3410  */
3411  const NdbOperation* op = theNdbCon->unlock(theNdbOp->theLockHandle,
3413 
3414  if (unlikely(op == NULL))
3415  {
3416  /* setErrorCode will extract the error from the transaction... */
3417  setErrorCode((NdbOperation*) NULL, true); // Set Blob to invalid state
3418  DBUG_RETURN(-1);
3419  }
3420 
3421  thePendingBlobOps |= (1 << NdbOperation::UnlockRequest);
3422  theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UnlockRequest);
3423 
3424  if (unlikely(theNdbCon->releaseLockHandle(theNdbOp->theLockHandle) != 0))
3425  {
3426  setErrorCode(theNdbCon->theError.code, true); // Set Blob to invalid state
3427  DBUG_RETURN(-1);
3428  }
3429  }
3430  }
3431  }
3432 
3433  /*
3434  * TODO : Release some other resources in the close() call to make it
3435  * worthwhile for more than unlocking.
3436  */
3437 
3438  DBUG_RETURN(0);
3439 }