MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DbtupExecQuery.cpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #define DBTUP_C
20 #include <dblqh/Dblqh.hpp>
21 #include "Dbtup.hpp"
22 #include <RefConvert.hpp>
23 #include <ndb_limits.h>
24 #include <pc.hpp>
25 #include <AttributeDescriptor.hpp>
26 #include "AttributeOffset.hpp"
27 #include <AttributeHeader.hpp>
28 #include <Interpreter.hpp>
29 #include <signaldata/TupKey.hpp>
30 #include <signaldata/AttrInfo.hpp>
31 #include <NdbSqlUtil.hpp>
32 
33 // #define TRACE_INTERPRETER
34 
35 /* For debugging */
36 static void
37 dump_hex(const Uint32 *p, Uint32 len)
38 {
39  if(len > 2560)
40  len= 160;
41  if(len==0)
42  return;
43  for(;;)
44  {
45  if(len>=4)
46  ndbout_c("%8p %08X %08X %08X %08X", p, p[0], p[1], p[2], p[3]);
47  else if(len>=3)
48  ndbout_c("%8p %08X %08X %08X", p, p[0], p[1], p[2]);
49  else if(len>=2)
50  ndbout_c("%8p %08X %08X", p, p[0], p[1]);
51  else
52  ndbout_c("%8p %08X", p, p[0]);
53  if(len <= 4)
54  break;
55  len-= 4;
56  p+= 4;
57  }
58 }
59 
67 int Dbtup::getStoredProcAttrInfo(Uint32 storedId,
68  KeyReqStruct* req_struct,
69  Uint32& attrInfoIVal)
70 {
71  jam();
72  StoredProcPtr storedPtr;
73  c_storedProcPool.getPtr(storedPtr, storedId);
74  if (storedPtr.i != RNIL) {
75  if ((storedPtr.p->storedCode == ZSCAN_PROCEDURE) ||
76  (storedPtr.p->storedCode == ZCOPY_PROCEDURE)) {
77  /* Setup OperationRec with stored procedure AttrInfo section */
78  SegmentedSectionPtr sectionPtr;
79  getSection(sectionPtr, storedPtr.p->storedProcIVal);
80  Uint32 storedProcLen= sectionPtr.sz;
81 
82  ndbassert( attrInfoIVal == RNIL );
83  attrInfoIVal= storedPtr.p->storedProcIVal;
84  req_struct->attrinfo_len= storedProcLen;
85  return ZOK;
86  }
87  }
88  terrorCode= ZSTORED_PROC_ID_ERROR;
89  return terrorCode;
90 }
91 
92 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
93  Uint32* inBuffer,
94  Uint32 expectedLen,
95  Uint32 attrInfoIVal)
96 {
97  ndbassert( expectedLen > 0 || attrInfoIVal == RNIL );
98 
99  if (expectedLen > 0)
100  {
101  ndbassert( attrInfoIVal != RNIL );
102 
103  /* Check length in section is as we expect */
104  SegmentedSectionPtr sectionPtr;
105  getSection(sectionPtr, attrInfoIVal);
106 
107  ndbrequire(sectionPtr.sz == expectedLen);
108  ndbrequire(sectionPtr.sz < ZATTR_BUFFER_SIZE);
109 
110  /* Copy attrInfo data into linear buffer */
111  // TODO : Consider operating TUP out of first segment where
112  // appropriate
113  copy(inBuffer, attrInfoIVal);
114  }
115 
116  regOperPtr->m_any_value= 0;
117 
118  return;
119 }
120 
121 void
122 Dbtup::setChecksum(Tuple_header* tuple_ptr,
123  Tablerec* regTabPtr)
124 {
125  tuple_ptr->m_checksum= 0;
126  tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
127 }
128 
129 Uint32
130 Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
131  Tablerec* regTabPtr)
132 {
133  Uint32 checksum;
134  Uint32 i, rec_size, *tuple_header;
135  rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
136  tuple_header= tuple_ptr->m_data;
137  checksum= 0;
138  // includes tupVersion
139  //printf("%p - ", tuple_ptr);
140 
141  for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) {
142  checksum ^= tuple_header[i];
143  //printf("%.8x ", tuple_header[i]);
144  }
145 
146  //printf("-> %.8x\n", checksum);
147 
148 #if 0
149  if (var_sized) {
150  /*
151  if (! req_struct->fix_var_together) {
152  jam();
153  checksum ^= tuple_header[rec_size];
154  }
155  */
156  jam();
157  var_data_part= req_struct->var_data_start;
158  vsize_words= calculate_total_var_size(req_struct->var_len_array,
159  regTabPtr->no_var_attr);
160  ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
161  for (i= 0; i < vsize_words; i++) {
162  checksum ^= var_data_part[i];
163  }
164  }
165 #endif
166  return checksum;
167 }
168 
169 /* ----------------------------------------------------------------- */
170 /* ----------- INSERT_ACTIVE_OP_LIST -------------- */
171 /* ----------------------------------------------------------------- */
172 bool
173 Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
174  KeyReqStruct* req_struct)
175 {
176  OperationrecPtr prevOpPtr;
177  ndbrequire(!regOperPtr.p->op_struct.in_active_list);
178  regOperPtr.p->op_struct.in_active_list= true;
179  req_struct->prevOpPtr.i=
180  prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
181  regOperPtr.p->prevActiveOp= prevOpPtr.i;
182  regOperPtr.p->nextActiveOp= RNIL;
183  regOperPtr.p->m_undo_buffer_space= 0;
184  req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
185  if (prevOpPtr.i == RNIL) {
186  return true;
187  } else {
188  req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
189  prevOpPtr.p->nextActiveOp= regOperPtr.i;
190 
191  regOperPtr.p->op_struct.m_wait_log_buffer=
192  prevOpPtr.p->op_struct.m_wait_log_buffer;
193  regOperPtr.p->op_struct.m_load_diskpage_on_commit=
194  prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
195  regOperPtr.p->op_struct.m_gci_written=
196  prevOpPtr.p->op_struct.m_gci_written;
197  regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
198  // start with prev mask (matters only for UPD o UPD)
199 
200  regOperPtr.p->m_any_value = prevOpPtr.p->m_any_value;
201 
202  prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
203  prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
204 
205  if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
206  {
207  Uint32 op= regOperPtr.p->op_struct.op_type;
208  Uint32 prevOp= prevOpPtr.p->op_struct.op_type;
209  if (prevOp == ZDELETE)
210  {
211  if(op == ZINSERT)
212  {
213  // mark both
214  prevOpPtr.p->op_struct.delete_insert_flag= true;
215  regOperPtr.p->op_struct.delete_insert_flag= true;
216  return true;
217  }
218  else if (op == ZREFRESH)
219  {
220  /* ZREFRESH after Delete - ok */
221  return true;
222  }
223  else
224  {
225  terrorCode= ZTUPLE_DELETED_ERROR;
226  return false;
227  }
228  }
229  else if(op == ZINSERT && prevOp != ZDELETE)
230  {
231  terrorCode= ZINSERT_ERROR;
232  return false;
233  }
234  else if (prevOp == ZREFRESH)
235  {
236  /* No operation after a ZREFRESH */
237  terrorCode= ZOP_AFTER_REFRESH_ERROR;
238  return false;
239  }
240  return true;
241  }
242  else
243  {
244  terrorCode= ZMUST_BE_ABORTED_ERROR;
245  return false;
246  }
247  }
248 }
249 
250 bool
251 Dbtup::setup_read(KeyReqStruct *req_struct,
252  Operationrec* regOperPtr,
253  Fragrecord* regFragPtr,
254  Tablerec* regTabPtr,
255  bool disk)
256 {
257  OperationrecPtr currOpPtr;
258  currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
259  Uint32 bits = req_struct->m_tuple_ptr->m_header_bits;
260 
261  if (unlikely(req_struct->m_reorg))
262  {
263  Uint32 moved = bits & Tuple_header::REORG_MOVE;
264  if (! ((req_struct->m_reorg == 1 && moved == 0) ||
265  (req_struct->m_reorg == 2 && moved != 0)))
266  {
267  terrorCode= ZTUPLE_DELETED_ERROR;
268  return false;
269  }
270  }
271  if (currOpPtr.i == RNIL)
272  {
273  if (regTabPtr->need_expand(disk))
274  prepare_read(req_struct, regTabPtr, disk);
275  return true;
276  }
277 
278  do {
279  Uint32 savepointId= regOperPtr->savepointId;
280  bool dirty= req_struct->dirty_op;
281 
282  c_operation_pool.getPtr(currOpPtr);
283  bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
284  req_struct->trans_id1,
285  req_struct->trans_id2);
289  if(dirty && !sameTrans)
290  {
291  savepointId= 0;
292  }
293  else if(sameTrans)
294  {
295  // Use savepoint even in read committed mode
296  dirty= false;
297  }
298 
299  /* found == true indicates that savepoint is some state
300  * within tuple's current transaction's uncommitted operations
301  */
302  bool found= find_savepoint(currOpPtr, savepointId);
303 
304  Uint32 currOp= currOpPtr.p->op_struct.op_type;
305 
306  /* is_insert==true if tuple did not exist before its current
307  * transaction
308  */
309  bool is_insert = (bits & Tuple_header::ALLOC);
310 
311  /* If savepoint is in transaction, and post-delete-op
312  * OR
313  * Tuple didn't exist before
314  * AND
315  * Read is dirty
316  * OR
317  * Savepoint is before-transaction
318  *
319  * Tuple does not exist in read's view
320  */
321  if((found && currOp == ZDELETE) ||
322  ((dirty || !found) && is_insert))
323  {
324  /* Tuple not visible to this read operation */
325  terrorCode= ZTUPLE_DELETED_ERROR;
326  break;
327  }
328 
329  if(dirty || !found)
330  {
331  /* Read existing committed tuple */
332  }
333  else
334  {
335  req_struct->m_tuple_ptr=
336  get_copy_tuple(&currOpPtr.p->m_copy_tuple_location);
337  }
338 
339  if (regTabPtr->need_expand(disk))
340  prepare_read(req_struct, regTabPtr, disk);
341 
342 #if 0
343  ndbout_c("reading copy");
344  Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
345  req_struct->m_tuple_ptr= fixed_ptr;
346  req_struct->fix_var_together= true;
347  req_struct->var_len_array= (Uint16*)var_ptr;
348  req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
349  Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
350  req_struct->var_pos_array,
351  regTabPtr->no_var_attr);
352  req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
353 #endif
354  return true;
355  } while(0);
356 
357  return false;
358 }
359 
360 int
361 Dbtup::load_diskpage(Signal* signal,
362  Uint32 opRec, Uint32 fragPtrI,
363  Uint32 lkey1, Uint32 lkey2, Uint32 flags)
364 {
365  Ptr<Tablerec> tabptr;
366  Ptr<Fragrecord> fragptr;
367  Ptr<Operationrec> operPtr;
368 
369  c_operation_pool.getPtr(operPtr, opRec);
370  fragptr.i= fragPtrI;
371  ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
372 
373  Operationrec * regOperPtr= operPtr.p;
374  Fragrecord * regFragPtr= fragptr.p;
375 
376  tabptr.i = regFragPtr->fragTableId;
377  ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
378  Tablerec* regTabPtr = tabptr.p;
379 
380  if (Local_key::ref(lkey1, lkey2) == ~(Uint32)0)
381  {
382  jam();
383  regOperPtr->op_struct.m_wait_log_buffer= 1;
384  regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
385  if (unlikely((flags & 7) == ZREFRESH))
386  {
387  jam();
388  /* Refresh of previously nonexistant DD tuple.
389  * No diskpage to load at commit time
390  */
391  regOperPtr->op_struct.m_wait_log_buffer= 0;
392  regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
393  }
394 
395  /* In either case return 1 for 'proceed' */
396  return 1;
397  }
398 
399  jam();
400  Uint32 page_idx= lkey2;
401  Uint32 frag_page_id= lkey1;
402  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
403  frag_page_id);
404  regOperPtr->m_tuple_location.m_page_idx= page_idx;
405 
406  PagePtr page_ptr;
407  Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
408  Tuple_header* ptr= (Tuple_header*)tmp;
409 
410  int res= 1;
411  if(ptr->m_header_bits & Tuple_header::DISK_PART)
412  {
414  memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
415  req.m_callback.m_callbackData= opRec;
416  req.m_callback.m_callbackFunction=
417  safe_cast(&Dbtup::disk_page_load_callback);
418 
419 #ifdef ERROR_INSERT
420  if (ERROR_INSERTED(4022))
421  {
422  flags |= Page_cache_client::DELAY_REQ;
423  req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
424  }
425 #endif
426 
427  Page_cache_client pgman(this, c_pgman);
428  res= pgman.get_page(signal, req, flags);
429  m_pgman_ptr = pgman.m_ptr;
430  if(res > 0)
431  {
432  //ndbout_c("in cache");
433  // In cache
434  }
435  else if(res == 0)
436  {
437  //ndbout_c("waiting for callback");
438  // set state
439  }
440  else
441  {
442  // Error
443  }
444  }
445 
446  switch(flags & 7)
447  {
448  case ZREAD:
449  case ZREAD_EX:
450  break;
451  case ZDELETE:
452  case ZUPDATE:
453  case ZINSERT:
454  case ZWRITE:
455  case ZREFRESH:
456  regOperPtr->op_struct.m_wait_log_buffer= 1;
457  regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
458  }
459  return res;
460 }
461 
462 void
463 Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
464 {
465  Ptr<Operationrec> operPtr;
466  c_operation_pool.getPtr(operPtr, opRec);
467  c_lqh->acckeyconf_load_diskpage_callback(signal,
468  operPtr.p->userpointer, page_id);
469 }
470 
471 int
472 Dbtup::load_diskpage_scan(Signal* signal,
473  Uint32 opRec, Uint32 fragPtrI,
474  Uint32 lkey1, Uint32 lkey2, Uint32 flags)
475 {
476  Ptr<Tablerec> tabptr;
477  Ptr<Fragrecord> fragptr;
478  Ptr<Operationrec> operPtr;
479 
480  c_operation_pool.getPtr(operPtr, opRec);
481  fragptr.i= fragPtrI;
482  ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
483 
484  Operationrec * regOperPtr= operPtr.p;
485  Fragrecord * regFragPtr= fragptr.p;
486 
487  tabptr.i = regFragPtr->fragTableId;
488  ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
489  Tablerec* regTabPtr = tabptr.p;
490 
491  jam();
492  Uint32 page_idx= lkey2;
493  Uint32 frag_page_id= lkey1;
494  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
495  frag_page_id);
496  regOperPtr->m_tuple_location.m_page_idx= page_idx;
497  regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
498 
499  PagePtr page_ptr;
500  Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
501  Tuple_header* ptr= (Tuple_header*)tmp;
502 
503  int res= 1;
504  if(ptr->m_header_bits & Tuple_header::DISK_PART)
505  {
507  memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
508  req.m_callback.m_callbackData= opRec;
509  req.m_callback.m_callbackFunction=
510  safe_cast(&Dbtup::disk_page_load_scan_callback);
511 
512  Page_cache_client pgman(this, c_pgman);
513  res= pgman.get_page(signal, req, flags);
514  m_pgman_ptr = pgman.m_ptr;
515  if(res > 0)
516  {
517  // ndbout_c("in cache");
518  // In cache
519  }
520  else if(res == 0)
521  {
522  //ndbout_c("waiting for callback");
523  // set state
524  }
525  else
526  {
527  // Error
528  }
529  }
530  return res;
531 }
532 
533 void
534 Dbtup::disk_page_load_scan_callback(Signal* signal,
535  Uint32 opRec, Uint32 page_id)
536 {
537  Ptr<Operationrec> operPtr;
538  c_operation_pool.getPtr(operPtr, opRec);
539  c_lqh->next_scanconf_load_diskpage_callback(signal,
540  operPtr.p->userpointer, page_id);
541 }
542 
543 void Dbtup::execTUPKEYREQ(Signal* signal)
544 {
545  TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
546  Ptr<Tablerec> tabptr;
547  Ptr<Fragrecord> fragptr;
548  Ptr<Operationrec> operPtr;
549  KeyReqStruct req_struct(this);
550  Uint32 sig1, sig2, sig3, sig4;
551 
552  Uint32 RoperPtr= tupKeyReq->connectPtr;
553  Uint32 Rfragptr= tupKeyReq->fragPtr;
554 
555  Uint32 RnoOfFragrec= cnoOfFragrec;
556  Uint32 RnoOfTablerec= cnoOfTablerec;
557 
558  jamEntry();
559  fragptr.i= Rfragptr;
560 
561  ndbrequire(Rfragptr < RnoOfFragrec);
562 
563  c_operation_pool.getPtr(operPtr, RoperPtr);
564  ptrAss(fragptr, fragrecord);
565 
566  Uint32 TrequestInfo= tupKeyReq->request;
567 
568  Operationrec * regOperPtr= operPtr.p;
569  Fragrecord * regFragPtr= fragptr.p;
570 
571  tabptr.i = regFragPtr->fragTableId;
572  ptrCheckGuard(tabptr, RnoOfTablerec, tablerec);
573  Tablerec* regTabPtr = tabptr.p;
574 
575  req_struct.tablePtrP = tabptr.p;
576  req_struct.fragPtrP = fragptr.p;
577  req_struct.operPtrP = operPtr.p;
578  req_struct.signal= signal;
579  req_struct.dirty_op= TrequestInfo & 1;
580  req_struct.interpreted_exec= (TrequestInfo >> 10) & 1;
581  req_struct.no_fired_triggers= 0;
582  req_struct.read_length= 0;
583  req_struct.last_row= false;
584  req_struct.changeMask.clear();
585  req_struct.m_is_lcp = false;
586 
587  if (unlikely(get_trans_state(regOperPtr) != TRANS_IDLE))
588  {
589  TUPKEY_abort(&req_struct, 39);
590  return;
591  }
592 
593  /* ----------------------------------------------------------------- */
594  // Operation is ZREAD when we arrive here so no need to worry about the
595  // abort process.
596  /* ----------------------------------------------------------------- */
597  /* ----------- INITIATE THE OPERATION RECORD -------------- */
598  /* ----------------------------------------------------------------- */
599  Uint32 Rstoredid= tupKeyReq->storedProcedure;
600 
601  regOperPtr->fragmentPtr= Rfragptr;
602  regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0x7;
603  regOperPtr->op_struct.delete_insert_flag = false;
604  regOperPtr->op_struct.m_reorg = (TrequestInfo >> 12) & 3;
605 
606  regOperPtr->m_copy_tuple_location.setNull();
607  regOperPtr->tupVersion= ZNIL;
608 
609  sig1= tupKeyReq->savePointId;
610  sig2= tupKeyReq->primaryReplica;
611  sig3= tupKeyReq->keyRef2;
612 
613  regOperPtr->savepointId= sig1;
614  regOperPtr->op_struct.primary_replica= sig2;
615  Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
616 
617  sig1= tupKeyReq->opRef;
618  sig2= tupKeyReq->tcOpIndex;
619  sig3= tupKeyReq->coordinatorTC;
620  sig4= tupKeyReq->keyRef1;
621 
622  req_struct.tc_operation_ptr= sig1;
623  req_struct.TC_index= sig2;
624  req_struct.TC_ref= sig3;
625  Uint32 pageid = req_struct.frag_page_id= sig4;
626  req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
627  req_struct.m_reorg = (TrequestInfo >> 12) & 3;
628 
629  sig1= tupKeyReq->attrBufLen;
630  sig2= tupKeyReq->applRef;
631  sig3= tupKeyReq->transId1;
632  sig4= tupKeyReq->transId2;
633 
634  Uint32 disk_page= tupKeyReq->disk_page;
635 
636  req_struct.log_size= sig1;
637  req_struct.attrinfo_len= sig1;
638  req_struct.rec_blockref= sig2;
639  req_struct.trans_id1= sig3;
640  req_struct.trans_id2= sig4;
641  req_struct.m_disk_page_ptr.i= disk_page;
642 
643  sig1 = tupKeyReq->m_row_id_page_no;
644  sig2 = tupKeyReq->m_row_id_page_idx;
645  sig3 = tupKeyReq->deferred_constraints;
646 
647  req_struct.m_row_id.m_page_no = sig1;
648  req_struct.m_row_id.m_page_idx = sig2;
649  req_struct.m_deferred_constraints = sig3;
650 
651  /* Get AttrInfo section if this is a long TUPKEYREQ */
652  Uint32 attrInfoIVal= tupKeyReq->attrInfoIVal;
653 
654  /* If we have AttrInfo, check we expected it, and
655  * that we don't have AttrInfo by another means
656  */
657  ndbassert( (attrInfoIVal == RNIL) ||
658  (tupKeyReq->attrBufLen > 0));
659 
660  Uint32 Roptype = regOperPtr->op_struct.op_type;
661 
662  if (Rstoredid != ZNIL) {
663  /* This is part of a scan, get attrInfoIVal for
664  * given stored procedure
665  */
666  ndbrequire(getStoredProcAttrInfo(Rstoredid,
667  &req_struct,
668  attrInfoIVal) == ZOK);
669  }
670 
671  /* Copy AttrInfo from section into linear in-buffer */
672  copyAttrinfo(regOperPtr,
673  &cinBuffer[0],
674  req_struct.attrinfo_len,
675  attrInfoIVal);
676 
677  regOperPtr->op_struct.m_gci_written = 0;
678 
679  if (Roptype == ZINSERT && Local_key::isInvalid(pageid, pageidx))
680  {
681  // No tuple allocated yet
682  goto do_insert;
683  }
684 
685  if (Roptype == ZREFRESH && Local_key::isInvalid(pageid, pageidx))
686  {
687  // No tuple allocated yet
688  goto do_refresh;
689  }
690 
691  if (unlikely(isCopyTuple(pageid, pageidx)))
692  {
696  ndbassert(Roptype == ZREAD);
697  ndbassert(disk_page == RNIL);
698  setup_lcp_read_copy_tuple(&req_struct, regOperPtr, regFragPtr, regTabPtr);
699  goto do_read;
700  }
701 
705  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
706  req_struct.frag_page_id);
707 
708  setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
709 
713  if (Roptype == ZREAD) {
714  jam();
715 
716  if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr,
717  disk_page != RNIL))
718  {
719  do_read:
720  if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
721  {
722  req_struct.log_size= 0;
723  sendTUPKEYCONF(signal, &req_struct, regOperPtr);
724  /* ---------------------------------------------------------------- */
725  // Read Operations need not to be taken out of any lists.
726  // We also do not need to wait for commit since there is no changes
727  // to commit. Thus we
728  // prepare the operation record already now for the next operation.
729  // Write operations have set the state to STARTED above indicating
730  // that they are waiting for the Commit or Abort decision.
731  /* ---------------------------------------------------------------- */
732  set_trans_state(regOperPtr, TRANS_IDLE);
733  }
734  return;
735  }
736  tupkeyErrorLab(&req_struct);
737  return;
738  }
739 
740  if(insertActiveOpList(operPtr, &req_struct))
741  {
742  if(Roptype == ZINSERT)
743  {
744  jam();
745  do_insert:
746  Local_key accminupdate;
747  Local_key * accminupdateptr = &accminupdate;
748  if (unlikely(handleInsertReq(signal, operPtr,
749  fragptr, regTabPtr, &req_struct,
750  &accminupdateptr) == -1))
751  {
752  return;
753  }
754 
755  terrorCode = 0;
756  checkImmediateTriggersAfterInsert(&req_struct,
757  regOperPtr,
758  regTabPtr,
759  disk_page != RNIL);
760 
761  if (unlikely(terrorCode != 0))
762  {
763  tupkeyErrorLab(&req_struct);
764  return;
765  }
766 
767  if (!regTabPtr->tuxCustomTriggers.isEmpty())
768  {
769  jam();
770  if (unlikely(executeTuxInsertTriggers(signal,
771  regOperPtr,
772  regFragPtr,
773  regTabPtr) != 0))
774  {
775  jam();
776  /*
777  * TUP insert succeeded but add of TUX entries failed. All
778  * TUX changes have been rolled back at this point.
779  *
780  * We will abort via tupkeyErrorLab() as usual. This routine
781  * however resets the operation to ZREAD. The TUP_ABORTREQ
782  * arriving later cannot then undo the insert.
783  *
784  * Therefore we call TUP_ABORTREQ already now. Diskdata etc
785  * should be in memory and timeslicing cannot occur. We must
786  * skip TUX abort triggers since TUX is already aborted.
787  */
788  signal->theData[0] = operPtr.i;
789  do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
790  tupkeyErrorLab(&req_struct);
791  return;
792  }
793  }
794 
795  if (accminupdateptr)
796  {
800  c_lqh->accminupdate(signal,
801  regOperPtr->userpointer,
802  accminupdateptr);
803  }
804 
805  sendTUPKEYCONF(signal, &req_struct, regOperPtr);
806  return;
807  }
808 
809  if (Roptype == ZUPDATE) {
810  jam();
811  if (unlikely(handleUpdateReq(signal, regOperPtr,
812  regFragPtr, regTabPtr,
813  &req_struct, disk_page != RNIL) == -1))
814  {
815  return;
816  }
817 
818  terrorCode = 0;
819  checkImmediateTriggersAfterUpdate(&req_struct,
820  regOperPtr,
821  regTabPtr,
822  disk_page != RNIL);
823 
824  if (unlikely(terrorCode != 0))
825  {
826  tupkeyErrorLab(&req_struct);
827  return;
828  }
829 
830  if (!regTabPtr->tuxCustomTriggers.isEmpty())
831  {
832  jam();
833  if (unlikely(executeTuxUpdateTriggers(signal,
834  regOperPtr,
835  regFragPtr,
836  regTabPtr) != 0))
837  {
838  jam();
839  /*
840  * See insert case.
841  */
842  signal->theData[0] = operPtr.i;
843  do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
844  tupkeyErrorLab(&req_struct);
845  return;
846  }
847  }
848 
849  sendTUPKEYCONF(signal, &req_struct, regOperPtr);
850  return;
851  }
852  else if(Roptype == ZDELETE)
853  {
854  jam();
855  req_struct.log_size= 0;
856  if (unlikely(handleDeleteReq(signal, regOperPtr,
857  regFragPtr, regTabPtr,
858  &req_struct,
859  disk_page != RNIL) == -1))
860  {
861  return;
862  }
863 
864  terrorCode = 0;
865  checkImmediateTriggersAfterDelete(&req_struct,
866  regOperPtr,
867  regTabPtr,
868  disk_page != RNIL);
869 
870  if (unlikely(terrorCode != 0))
871  {
872  tupkeyErrorLab(&req_struct);
873  return;
874  }
875 
876  /*
877  * TUX doesn't need to check for triggers at delete since entries in
878  * the index are kept until commit time.
879  */
880 
881  sendTUPKEYCONF(signal, &req_struct, regOperPtr);
882  return;
883  }
884  else if (Roptype == ZREFRESH)
885  {
889  do_refresh:
890  if (unlikely(handleRefreshReq(signal, operPtr,
891  fragptr, regTabPtr,
892  &req_struct, disk_page != RNIL) == -1))
893  {
894  return;
895  }
896 
897  sendTUPKEYCONF(signal, &req_struct, regOperPtr);
898  return;
899 
900  }
901  else
902  {
903  ndbrequire(false); // Invalid op type
904  }
905  }
906 
907  tupkeyErrorLab(&req_struct);
908 }
909 
910 void
911 Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
912  Operationrec* regOperPtr,
913  Tablerec* regTabPtr)
914 {
915  PagePtr page_ptr;
916  Uint32* ptr= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
917  req_struct->m_page_ptr = page_ptr;
918  req_struct->m_tuple_ptr = (Tuple_header*)ptr;
919 
920  ndbassert(regOperPtr->op_struct.op_type == ZINSERT || (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
921 
922  req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
923  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
924 
925  Uint32 num_attr= regTabPtr->m_no_of_attributes;
926  Uint32 descr_start= regTabPtr->tabDescriptor;
927  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
928  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
929  req_struct->attr_descr= tab_descr;
930 }
931 
932 void
933 Dbtup::setup_lcp_read_copy_tuple(KeyReqStruct* req_struct,
934  Operationrec* regOperPtr,
935  Fragrecord* regFragPtr,
936  Tablerec* regTabPtr)
937 {
938  Local_key tmp;
939  tmp.m_page_no = req_struct->frag_page_id;
940  tmp.m_page_idx = regOperPtr->m_tuple_location.m_page_idx;
941  clearCopyTuple(tmp.m_page_no, tmp.m_page_idx);
942 
943  Uint32 * copytuple = get_copy_tuple_raw(&tmp);
944  Local_key rowid;
945  memcpy(&rowid, copytuple+0, sizeof(Local_key));
946 
947  req_struct->frag_page_id = rowid.m_page_no;
948  regOperPtr->m_tuple_location.m_page_idx = rowid.m_page_idx;
949 
950  Tuple_header * th = get_copy_tuple(copytuple);
951  req_struct->m_page_ptr.setNull();
952  req_struct->m_tuple_ptr = (Tuple_header*)th;
953  th->m_operation_ptr_i = RNIL;
954  ndbassert((th->m_header_bits & Tuple_header::COPY_TUPLE) != 0);
955 
956  Uint32 num_attr= regTabPtr->m_no_of_attributes;
957  Uint32 descr_start= regTabPtr->tabDescriptor;
958  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
959  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
960  req_struct->attr_descr= tab_descr;
961 
962  bool disk = false;
963  if (regTabPtr->need_expand(disk))
964  {
965  jam();
966  prepare_read(req_struct, regTabPtr, disk);
967  }
968 }
969 
970  /* ---------------------------------------------------------------- */
971  /* ------------------------ CONFIRM REQUEST ----------------------- */
972  /* ---------------------------------------------------------------- */
973  void Dbtup::sendTUPKEYCONF(Signal* signal,
974  KeyReqStruct *req_struct,
975  Operationrec * regOperPtr)
976 {
977  TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
978 
979  Uint32 Rcreate_rowid = req_struct->m_use_rowid;
980  Uint32 RuserPointer= regOperPtr->userpointer;
981  Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
982  Uint32 log_size= req_struct->log_size;
983  Uint32 read_length= req_struct->read_length;
984  Uint32 last_row= req_struct->last_row;
985 
986  set_trans_state(regOperPtr, TRANS_STARTED);
987  set_tuple_state(regOperPtr, TUPLE_PREPARED);
988  tupKeyConf->userPtr= RuserPointer;
989  tupKeyConf->readLength= read_length;
990  tupKeyConf->writeLength= log_size;
991  tupKeyConf->noFiredTriggers= RnoFiredTriggers;
992  tupKeyConf->lastRow= last_row;
993  tupKeyConf->rowid = Rcreate_rowid;
994 
995  EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
996  TupKeyConf::SignalLength);
997 
998 }
999 
1000 
1001 #define MAX_READ (MIN(sizeof(signal->theData), MAX_SEND_MESSAGE_BYTESIZE))
1002 
1003 /* ---------------------------------------------------------------- */
1004 /* ----------------------------- READ ---------------------------- */
1005 /* ---------------------------------------------------------------- */
1006 int Dbtup::handleReadReq(Signal* signal,
1007  Operationrec* regOperPtr,
1008  Tablerec* regTabPtr,
1009  KeyReqStruct* req_struct)
1010 {
1011  Uint32 *dst;
1012  Uint32 dstLen, start_index;
1013  const BlockReference sendBref= req_struct->rec_blockref;
1014  if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1015  (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) {
1016  jam();
1017  ndbout_c("here2");
1018  terrorCode= ZTUPLE_CORRUPTED_ERROR;
1019  tupkeyErrorLab(req_struct);
1020  return -1;
1021  }
1022 
1023  const Uint32 node = refToNode(sendBref);
1024  if(node != 0 && node != getOwnNodeId()) {
1025  start_index= 25;
1026  } else {
1027  jam();
1031  start_index= 3;
1032  }
1033  dst= &signal->theData[start_index];
1034  dstLen= (MAX_READ / 4) - start_index;
1035  if (!req_struct->interpreted_exec) {
1036  jam();
1037  int ret = readAttributes(req_struct,
1038  &cinBuffer[0],
1039  req_struct->attrinfo_len,
1040  dst,
1041  dstLen,
1042  false);
1043  if (likely(ret >= 0)) {
1044 /* ------------------------------------------------------------------------- */
1045 // We have read all data into coutBuffer. Now send it to the API.
1046 /* ------------------------------------------------------------------------- */
1047  jam();
1048  Uint32 TnoOfDataRead= (Uint32) ret;
1049  req_struct->read_length += TnoOfDataRead;
1050  sendReadAttrinfo(signal, req_struct, TnoOfDataRead, regOperPtr);
1051  return 0;
1052  }
1053  else
1054  {
1055  terrorCode = Uint32(-ret);
1056  }
1057  } else {
1058  jam();
1059  if (likely(interpreterStartLab(signal, req_struct) != -1)) {
1060  return 0;
1061  }
1062  return -1;
1063  }
1064 
1065  jam();
1066  tupkeyErrorLab(req_struct);
1067  return -1;
1068 }
1069 
1070 static
1071 void
1072 handle_reorg(Dbtup::KeyReqStruct * req_struct,
1073  Dbtup::Fragrecord::FragState state)
1074 {
1075  Uint32 reorg = req_struct->m_reorg;
1076  switch(state){
1077  case Dbtup::Fragrecord::FS_FREE:
1078  case Dbtup::Fragrecord::FS_REORG_NEW:
1079  case Dbtup::Fragrecord::FS_REORG_COMMIT_NEW:
1080  case Dbtup::Fragrecord::FS_REORG_COMPLETE_NEW:
1081  return;
1082  case Dbtup::Fragrecord::FS_REORG_COMMIT:
1083  case Dbtup::Fragrecord::FS_REORG_COMPLETE:
1084  if (reorg != 1)
1085  return;
1086  break;
1087  case Dbtup::Fragrecord::FS_ONLINE:
1088  if (reorg != 2)
1089  return;
1090  break;
1091  default:
1092  return;
1093  }
1094  req_struct->m_tuple_ptr->m_header_bits |= Dbtup::Tuple_header::REORG_MOVE;
1095 }
1096 
1097 /* ---------------------------------------------------------------- */
1098 /* ---------------------------- UPDATE ---------------------------- */
1099 /* ---------------------------------------------------------------- */
1100 int Dbtup::handleUpdateReq(Signal* signal,
1101  Operationrec* operPtrP,
1102  Fragrecord* regFragPtr,
1103  Tablerec* regTabPtr,
1104  KeyReqStruct* req_struct,
1105  bool disk)
1106 {
1107  Tuple_header *dst;
1108  Tuple_header *base= req_struct->m_tuple_ptr, *org;
1109  ChangeMask * change_mask_ptr;
1110  if ((dst= alloc_copy_tuple(regTabPtr, &operPtrP->m_copy_tuple_location))== 0)
1111  {
1112  terrorCode= ZMEM_NOMEM_ERROR;
1113  goto error;
1114  }
1115 
1116  Uint32 tup_version;
1117  change_mask_ptr = get_change_mask_ptr(regTabPtr, dst);
1118  if(operPtrP->is_first_operation())
1119  {
1120  org= req_struct->m_tuple_ptr;
1121  tup_version= org->get_tuple_version();
1122  clear_change_mask_info(regTabPtr, change_mask_ptr);
1123  }
1124  else
1125  {
1126  Operationrec* prevOp= req_struct->prevOpPtr.p;
1127  tup_version= prevOp->tupVersion;
1128  Uint32 * rawptr = get_copy_tuple_raw(&prevOp->m_copy_tuple_location);
1129  org= get_copy_tuple(rawptr);
1130  copy_change_mask_info(regTabPtr,
1131  change_mask_ptr,
1132  get_change_mask_ptr(rawptr));
1133  }
1134 
1138  req_struct->m_tuple_ptr= org;
1139  if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1140  (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
1141  {
1142  terrorCode= ZTUPLE_CORRUPTED_ERROR;
1143  goto error;
1144  }
1145 
1146  req_struct->m_tuple_ptr= dst;
1147 
1148  union {
1149  Uint32 sizes[4];
1150  Uint64 cmp[2];
1151  };
1152 
1153  disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
1154  if (regTabPtr->need_expand(disk))
1155  {
1156  expand_tuple(req_struct, sizes, org, regTabPtr, disk);
1157  if(disk && operPtrP->m_undo_buffer_space == 0)
1158  {
1159  operPtrP->op_struct.m_wait_log_buffer = 1;
1160  operPtrP->op_struct.m_load_diskpage_on_commit = 1;
1161  Uint32 sz= operPtrP->m_undo_buffer_space=
1162  (sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
1163 
1164  D("Logfile_client - handleUpdateReq");
1165  Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1166  terrorCode= lgman.alloc_log_space(sz);
1167  if(unlikely(terrorCode))
1168  {
1169  operPtrP->m_undo_buffer_space= 0;
1170  goto error;
1171  }
1172  }
1173  }
1174  else
1175  {
1176  memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1177  req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1178  }
1179 
1180  tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
1181  operPtrP->tupVersion= tup_version;
1182 
1183  req_struct->optimize_options = 0;
1184 
1185  if (!req_struct->interpreted_exec) {
1186  jam();
1187 
1188  if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1189  {
1190  jam();
1191  Uint32 attrId =
1192  regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1193 
1194  store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
1195  }
1196  int retValue = updateAttributes(req_struct,
1197  &cinBuffer[0],
1198  req_struct->attrinfo_len);
1199  if (unlikely(retValue < 0))
1200  {
1201  terrorCode = Uint32(-retValue);
1202  goto error;
1203  }
1204  } else {
1205  jam();
1206  if (unlikely(interpreterStartLab(signal, req_struct) == -1))
1207  return -1;
1208  }
1209 
1210  update_change_mask_info(regTabPtr,
1211  change_mask_ptr,
1212  req_struct->changeMask.rep.data);
1213 
1214  switch (req_struct->optimize_options) {
1215  case AttributeHeader::OPTIMIZE_MOVE_VARPART:
1220  if(base->m_header_bits & Tuple_header::VAR_PART)
1221  optimize_var_part(req_struct, base, operPtrP,
1222  regFragPtr, regTabPtr);
1223  break;
1224  case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
1225  //TODO: move fix part of tuple
1226  break;
1227  default:
1228  break;
1229  }
1230 
1231  if (regTabPtr->need_shrink())
1232  {
1233  shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
1234  if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
1235  base,
1236  operPtrP,
1237  regFragPtr,
1238  regTabPtr,
1239  sizes)) {
1240  goto error;
1241  }
1242  }
1243 
1244  if (req_struct->m_reorg)
1245  {
1246  handle_reorg(req_struct, regFragPtr->fragStatus);
1247  }
1248 
1249  req_struct->m_tuple_ptr->set_tuple_version(tup_version);
1250  if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
1251  jam();
1252  setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1253  }
1254 
1255  set_tuple_state(operPtrP, TUPLE_PREPARED);
1256 
1257  return 0;
1258 
1259 error:
1260  tupkeyErrorLab(req_struct);
1261  return -1;
1262 }
1263 
1264 /*
1265  expand_dyn_part - copy dynamic attributes to fully expanded size.
1266 
1267  Both variable-sized and fixed-size attributes are stored in the same way
1268  in the expanded form as variable-sized attributes (in expand_var_part()).
1269 
1270  This method is used for both mem and disk dynamic data.
1271 
1272  dst Destination for expanded data
1273  tabPtrP Table descriptor
1274  src Pointer to the start of dynamic bitmap in source row
1275  row_len Total number of 32-bit words in dynamic part of row
1276  tabDesc Array of table descriptors
1277  order Array of indexes into tabDesc, dynfix followed by dynvar
1278 */
1279 static
1280 Uint32*
1281 expand_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1282  const Uint32* src,
1283  Uint32 row_len,
1284  const Uint32 * tabDesc,
1285  const Uint16* order,
1286  Uint32 dynvar,
1287  Uint32 dynfix,
1288  Uint32 max_bmlen)
1289 {
1290  /* Copy the bitmap, zeroing out any words not stored in the row. */
1291  Uint32 *dst_bm_ptr= (Uint32*)dst->m_dyn_data_ptr;
1292  Uint32 bm_len = row_len ? (* src & Dbtup::DYN_BM_LEN_MASK) : 0;
1293 
1294  assert(bm_len <= max_bmlen);
1295 
1296  if(bm_len > 0)
1297  memcpy(dst_bm_ptr, src, 4*bm_len);
1298  if(bm_len < max_bmlen)
1299  bzero(dst_bm_ptr + bm_len, 4 * (max_bmlen - bm_len));
1300 
1304  Uint32 tmp = (* dst_bm_ptr);
1305  * dst_bm_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | max_bmlen;
1306 
1307  char *src_off_start= (char*)(src + bm_len);
1308  assert((UintPtr(src_off_start)&3) == 0);
1309  Uint16 *src_off_ptr= (Uint16*)src_off_start;
1310 
1311  /*
1312  Prepare the variable-sized dynamic attributes, copying out data from the
1313  source row for any that are not NULL.
1314  */
1315  Uint32 no_attr= dst->m_dyn_len_offset;
1316  Uint16* dst_off_ptr= dst->m_dyn_offset_arr_ptr;
1317  Uint16* dst_len_ptr= dst_off_ptr + no_attr;
1318  Uint16 this_src_off= row_len ? * src_off_ptr++ : 0;
1319  /* We need to reserve room for the offsets written by shrink_tuple+padding.*/
1320  Uint16 dst_off= 4 * (max_bmlen + ((dynvar+2)>>1));
1321  char *dst_ptr= (char*)dst_bm_ptr + dst_off;
1322  for(Uint32 i= 0; i<dynvar; i++)
1323  {
1324  Uint16 j= order[dynfix+i];
1325  Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
1326  Uint32 len;
1327  Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1328  if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1329  {
1330  Uint16 next_src_off= *src_off_ptr++;
1331  len= next_src_off - this_src_off;
1332  memcpy(dst_ptr, src_off_start+this_src_off, len);
1333  this_src_off= next_src_off;
1334  }
1335  else
1336  {
1337  len= 0;
1338  }
1339  dst_off_ptr[i]= dst_off;
1340  dst_len_ptr[i]= dst_off+len;
1341  dst_off+= max_len;
1342  dst_ptr+= max_len;
1343  }
1344  /*
1345  The fixed-size data is stored 32-bit aligned after the variable-sized
1346  data.
1347  */
1348  char *src_ptr= src_off_start+this_src_off;
1349  src_ptr= (char *)(ALIGN_WORD(src_ptr));
1350 
1351  /*
1352  Prepare the fixed-size dynamic attributes, copying out data from the
1353  source row for any that are not NULL.
1354  Note that the fixed-size data is stored in reverse from the end of the
1355  dynamic part of the row. This is true both for the stored/shrunken and
1356  for the expanded form.
1357  */
1358  for(Uint32 i= dynfix; i>0; )
1359  {
1360  i--;
1361  Uint16 j= order[i];
1362  Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1363  dst_off_ptr[dynvar+i]= dst_off;
1364  /* len offset array is not used for fixed size. */
1365  Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1366  if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1367  {
1368  assert((UintPtr(dst_ptr)&3) == 0);
1369  memcpy(dst_ptr, src_ptr, fix_size);
1370  src_ptr+= fix_size;
1371  }
1372  dst_off+= fix_size;
1373  dst_ptr+= fix_size;
1374  }
1375 
1376  return (Uint32 *)dst_ptr;
1377 }
1378 
1379 static
1380 Uint32*
1381 shrink_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1382  Uint32 *dst_ptr,
1383  const Dbtup::Tablerec* tabPtrP,
1384  const Uint32 * tabDesc,
1385  const Uint16* order,
1386  Uint32 dynvar,
1387  Uint32 dynfix,
1388  Uint32 ind)
1389 {
1395  assert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
1396  char *dyn_src_ptr= dst->m_dyn_data_ptr;
1397  Uint32 bm_len = tabPtrP->m_offsets[ind].m_dyn_null_words; // In words
1398 
1399  /* If no dynamic variables, store nothing. */
1400  assert(bm_len);
1401  {
1406  * ((Uint32 *)dyn_src_ptr) &= ~Uint32(Dbtup::DYN_BM_LEN_MASK);
1407 
1408  Uint32 *bm_ptr= (Uint32 *)dyn_src_ptr + bm_len - 1;
1409  while(*bm_ptr == 0)
1410  {
1411  bm_ptr--;
1412  bm_len--;
1413  if(bm_len == 0)
1414  break;
1415  }
1416  }
1417 
1418  if (bm_len)
1419  {
1424  Uint32 *dyn_dst_ptr= dst_ptr;
1425  Uint32 dyn_var_count= 0;
1426  const Uint32 *src_bm_ptr= (Uint32 *)(dyn_src_ptr);
1427  Uint32 *dst_bm_ptr= (Uint32 *)dyn_dst_ptr;
1428 
1429  /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... },
1430  * split to separate function. */
1431  Uint16 dyn_dst_data_offset= 0;
1432  const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask[ind];
1433  for(Uint16 i= 0; i< bm_len; i++)
1434  {
1435  Uint32 v= src_bm_ptr[i];
1436  dyn_var_count+= BitmaskImpl::count_bits(v & *dyn_bm_var_mask_ptr++);
1437  dst_bm_ptr[i]= v;
1438  }
1439 
1440  Uint32 tmp = *dyn_dst_ptr;
1441  assert(bm_len <= Dbtup::DYN_BM_LEN_MASK);
1442  * dyn_dst_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | bm_len;
1443  dyn_dst_ptr+= bm_len;
1444  dyn_dst_data_offset= 2*dyn_var_count + 2;
1445 
1446  Uint16 *dyn_src_off_array= dst->m_dyn_offset_arr_ptr;
1447  Uint16 *dyn_src_lenoff_array=
1448  dyn_src_off_array + dst->m_dyn_len_offset;
1449  Uint16* dyn_dst_off_array = (Uint16*)dyn_dst_ptr;
1450 
1457  Uint16 off_idx= 0;
1458  for(Uint32 i= 0; i<dynvar; i++)
1459  {
1465  Uint32 attrDesc2 = tabDesc[order[dynfix+i]+1];
1466  Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1467  if (bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1468  {
1469  dyn_dst_off_array[off_idx++]= dyn_dst_data_offset;
1470  Uint32 dyn_src_off= dyn_src_off_array[i];
1471  Uint32 dyn_len= dyn_src_lenoff_array[i] - dyn_src_off;
1472  memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
1473  dyn_src_ptr + dyn_src_off,
1474  dyn_len);
1475  dyn_dst_data_offset+= dyn_len;
1476  }
1477  }
1478  /* If all dynamic attributes are NULL, we store nothing. */
1479  dyn_dst_off_array[off_idx]= dyn_dst_data_offset;
1480  assert(dyn_dst_off_array + off_idx == (Uint16*)dyn_dst_ptr+dyn_var_count);
1481 
1482  char *dynvar_end_ptr= ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
1483  char *dyn_dst_data_ptr= (char *)(ALIGN_WORD(dynvar_end_ptr));
1484 
1489  bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
1490 
1491  /* *
1492  * Copy over the fixed-sized not-NULL attributes.
1493  * Note that attributes are copied in reverse order; this is to avoid
1494  * overwriting not-yet-copied data, as the data is also stored in
1495  * reverse order.
1496  */
1497  for(Uint32 i= dynfix; i > 0; )
1498  {
1499  i--;
1500  Uint16 j= order[i];
1501  Uint32 attrDesc2 = tabDesc[j+1];
1502  Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1503  if(bm_len > (pos >>5 ) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1504  {
1505  Uint32 fixsize=
1506  4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1507  memmove(dyn_dst_data_ptr,
1508  dyn_src_ptr + dyn_src_off_array[dynvar+i],
1509  fixsize);
1510  dyn_dst_data_ptr += fixsize;
1511  }
1512  }
1513  dst_ptr = (Uint32*)dyn_dst_data_ptr;
1514  assert((UintPtr(dst_ptr) & 3) == 0);
1515  }
1516  return (Uint32 *)dst_ptr;
1517 }
1518 
1519 /* ---------------------------------------------------------------- */
1520 /* ----------------------------- INSERT --------------------------- */
1521 /* ---------------------------------------------------------------- */
1522 void
1523 Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
1524  Operationrec* regOperPtr,
1525  Tablerec* regTabPtr)
1526 {
1527  Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ?
1528  sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
1529  regOperPtr->nextActiveOp= RNIL;
1530  regOperPtr->prevActiveOp= RNIL;
1531  regOperPtr->op_struct.in_active_list= true;
1532  regOperPtr->m_undo_buffer_space= disk_undo;
1533 
1534  req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1535  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1536 
1537  Uint32 num_attr= regTabPtr->m_no_of_attributes;
1538  Uint32 descr_start= regTabPtr->tabDescriptor;
1539  Uint32 order_desc= regTabPtr->m_real_order_descriptor;
1540  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1541  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1542  req_struct->attr_descr= tab_descr;
1543  Uint16* order= (Uint16*)&tableDescriptor[order_desc];
1544  order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
1545 
1546  Uint32 bits = Tuple_header::COPY_TUPLE;
1547  bits |= disk_undo ? (Tuple_header::DISK_ALLOC|Tuple_header::DISK_INLINE) : 0;
1548 
1549  const Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
1550  const Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
1551  const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
1552  const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
1553  const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
1554  Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
1555  Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
1556 
1557  if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
1558  {
1559  ref->m_page_no = RNIL;
1560  ref->m_page_idx = Tup_varsize_page::END_OF_FREE_LIST;
1561  }
1562 
1563  if(mm_vars || mm_dyns)
1564  {
1565  jam();
1566  /* Init Varpart_copy struct */
1567  Varpart_copy * cp = (Varpart_copy*)ptr;
1568  cp->m_len = 0;
1569  ptr += Varpart_copy::SZ32;
1570 
1571  /* Prepare empty varsize part. */
1572  KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
1573 
1574  if (mm_vars)
1575  {
1576  dst->m_data_ptr= (char*)(((Uint16*)ptr)+mm_vars+1);
1577  dst->m_offset_array_ptr= req_struct->var_pos_array;
1578  dst->m_var_len_offset= mm_vars;
1579  dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
1580 
1581  Uint32 pos= 0;
1582  Uint16 *pos_ptr = req_struct->var_pos_array;
1583  Uint16 *len_ptr = pos_ptr + mm_vars;
1584  for(Uint32 i= 0; i<mm_vars; i++)
1585  {
1586  * pos_ptr++ = pos;
1587  * len_ptr++ = pos;
1588  pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
1589  }
1590 
1591  // Disk/dynamic part is 32-bit aligned
1592  ptr = ALIGN_WORD(dst->m_data_ptr+pos);
1593  ndbassert(ptr == ALIGN_WORD(dst->m_data_ptr +
1594  regTabPtr->m_offsets[MM].m_max_var_offset));
1595  }
1596 
1597  if (mm_dyns)
1598  {
1599  jam();
1600  /* Prepare empty dynamic part. */
1601  dst->m_dyn_data_ptr= (char *)ptr;
1602  dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
1603  dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
1604  dst->m_max_dyn_offset= regTabPtr->m_offsets[MM].m_max_dyn_offset;
1605 
1606  ptr = expand_dyn_part(dst, 0, 0,
1607  (Uint32*)tab_descr, order,
1608  mm_dynvar, mm_dynfix,
1609  regTabPtr->m_offsets[MM].m_dyn_null_words);
1610  }
1611 
1612  ndbassert((UintPtr(ptr)&3) == 0);
1613  }
1614 
1615  req_struct->m_disk_ptr= (Tuple_header*)ptr;
1616 
1617  ndbrequire(dd_vars == 0);
1618 
1619  req_struct->m_tuple_ptr->m_header_bits= bits;
1620 
1621  // Set all null bits
1622  memset(req_struct->m_tuple_ptr->m_null_bits+
1623  regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1624  4*regTabPtr->m_offsets[MM].m_null_words);
1625  memset(req_struct->m_disk_ptr->m_null_bits+
1626  regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1627  4*regTabPtr->m_offsets[DD].m_null_words);
1628 }
1629 
1630 int Dbtup::handleInsertReq(Signal* signal,
1631  Ptr<Operationrec> regOperPtr,
1632  Ptr<Fragrecord> fragPtr,
1633  Tablerec* regTabPtr,
1634  KeyReqStruct *req_struct,
1635  Local_key ** accminupdateptr)
1636 {
1637  Uint32 tup_version = 1;
1638  Fragrecord* regFragPtr = fragPtr.p;
1639  Uint32 *ptr= 0;
1640  Tuple_header *dst;
1641  Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
1642  Tuple_header *tuple_ptr;
1643 
1644  bool disk = regTabPtr->m_no_of_disk_attributes > 0;
1645  bool mem_insert = regOperPtr.p->is_first_operation();
1646  bool disk_insert = mem_insert && disk;
1647  bool vardynsize = (regTabPtr->m_attributes[MM].m_no_of_varsize ||
1648  regTabPtr->m_attributes[MM].m_no_of_dynamic);
1649  bool varalloc = vardynsize || regTabPtr->m_bits & Tablerec::TR_ForceVarPart;
1650  bool rowid = req_struct->m_use_rowid;
1651  bool update_acc = false;
1652  Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1653  Uint32 frag_page_id = req_struct->frag_page_id;
1654 
1655  union {
1656  Uint32 sizes[4];
1657  Uint64 cmp[2];
1658  };
1659  cmp[0] = cmp[1] = 0;
1660 
1661  if (ERROR_INSERTED(4014))
1662  {
1663  dst = 0;
1664  goto undo_buffer_error;
1665  }
1666 
1667  dst= alloc_copy_tuple(regTabPtr, &regOperPtr.p->m_copy_tuple_location);
1668 
1669  if (unlikely(dst == 0))
1670  {
1671  goto undo_buffer_error;
1672  }
1673  tuple_ptr= req_struct->m_tuple_ptr= dst;
1674  set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
1675 
1676  if(mem_insert)
1677  {
1678  jam();
1679  prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
1680  }
1681  else
1682  {
1683  Operationrec* prevOp= req_struct->prevOpPtr.p;
1684  ndbassert(prevOp->op_struct.op_type == ZDELETE);
1685  tup_version= prevOp->tupVersion + 1;
1686 
1687  if(!prevOp->is_first_operation())
1688  org= get_copy_tuple(&prevOp->m_copy_tuple_location);
1689  if (regTabPtr->need_expand())
1690  {
1691  expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
1692  memset(req_struct->m_disk_ptr->m_null_bits+
1693  regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1694  4*regTabPtr->m_offsets[DD].m_null_words);
1695 
1696  Uint32 bm_size_in_bytes= 4*(regTabPtr->m_offsets[MM].m_dyn_null_words);
1697  if (bm_size_in_bytes)
1698  {
1699  Uint32* ptr =
1700  (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
1701  bzero(ptr, bm_size_in_bytes);
1702  * ptr = bm_size_in_bytes >> 2;
1703  }
1704  }
1705  else
1706  {
1707  memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1708  tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1709  }
1710  memset(tuple_ptr->m_null_bits+
1711  regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1712  4*regTabPtr->m_offsets[MM].m_null_words);
1713  }
1714 
1715  int res;
1716  if (disk_insert)
1717  {
1718  if (ERROR_INSERTED(4015))
1719  {
1720  terrorCode = 1501;
1721  goto log_space_error;
1722  }
1723 
1724  D("Logfile_client - handleInsertReq");
1725  Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1726  res= lgman.alloc_log_space(regOperPtr.p->m_undo_buffer_space);
1727  if(unlikely(res))
1728  {
1729  terrorCode= res;
1730  goto log_space_error;
1731  }
1732  }
1733 
1734  regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
1735  tuple_ptr->set_tuple_version(tup_version);
1736 
1737  if (ERROR_INSERTED(4016))
1738  {
1739  terrorCode = ZAI_INCONSISTENCY_ERROR;
1740  goto update_error;
1741  }
1742 
1743  if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1744  {
1745  Uint32 attrId =
1746  regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1747 
1748  store_extra_row_bits(attrId, regTabPtr, tuple_ptr, /* default */ 0, false);
1749  }
1750 
1751  if (!regTabPtr->m_default_value_location.isNull())
1752  {
1753  jam();
1754  Uint32 default_values_len;
1755  /* Get default values ptr + len for this table */
1756  Uint32* default_values = get_default_ptr(regTabPtr, default_values_len);
1757  ndbrequire(default_values_len != 0 && default_values != NULL);
1758  /*
1759  * Update default values into row first,
1760  * next update with data received from the client.
1761  */
1762  if(unlikely((res = updateAttributes(req_struct, default_values,
1763  default_values_len)) < 0))
1764  {
1765  jam();
1766  terrorCode = Uint32(-res);
1767  goto update_error;
1768  }
1769  }
1770 
1771  if(unlikely((res = updateAttributes(req_struct, &cinBuffer[0],
1772  req_struct->attrinfo_len)) < 0))
1773  {
1774  terrorCode = Uint32(-res);
1775  goto update_error;
1776  }
1777 
1778  if (ERROR_INSERTED(4017))
1779  {
1780  goto null_check_error;
1781  }
1782  if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
1783  {
1784  goto null_check_error;
1785  }
1786 
1787  if (req_struct->m_is_lcp)
1788  {
1789  jam();
1790  sizes[2+MM] = req_struct->m_lcp_varpart_len;
1791  }
1792  else if (regTabPtr->need_shrink())
1793  {
1794  shrink_tuple(req_struct, sizes+2, regTabPtr, true);
1795  }
1796 
1797  if (ERROR_INSERTED(4025))
1798  {
1799  goto mem_error;
1800  }
1801 
1802  if (ERROR_INSERTED(4026))
1803  {
1804  CLEAR_ERROR_INSERT_VALUE;
1805  goto mem_error;
1806  }
1807 
1808  if (ERROR_INSERTED(4027) && (rand() % 100) > 25)
1809  {
1810  goto mem_error;
1811  }
1812 
1813  if (ERROR_INSERTED(4028) && (rand() % 100) > 25)
1814  {
1815  CLEAR_ERROR_INSERT_VALUE;
1816  goto mem_error;
1817  }
1818 
1822  if(mem_insert)
1823  {
1824  if (!rowid)
1825  {
1826  if (ERROR_INSERTED(4018))
1827  {
1828  goto mem_error;
1829  }
1830 
1831  if (!varalloc)
1832  {
1833  jam();
1834  ptr= alloc_fix_rec(&terrorCode,
1835  regFragPtr,
1836  regTabPtr,
1837  &regOperPtr.p->m_tuple_location,
1838  &frag_page_id);
1839  }
1840  else
1841  {
1842  jam();
1843  regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1844  ptr= alloc_var_rec(&terrorCode,
1845  regFragPtr, regTabPtr,
1846  sizes[2+MM],
1847  &regOperPtr.p->m_tuple_location,
1848  &frag_page_id);
1849  }
1850  if (unlikely(ptr == 0))
1851  {
1852  goto mem_error;
1853  }
1854  req_struct->m_use_rowid = true;
1855  }
1856  else
1857  {
1858  regOperPtr.p->m_tuple_location = req_struct->m_row_id;
1859  if (ERROR_INSERTED(4019))
1860  {
1861  terrorCode = ZROWID_ALLOCATED;
1862  goto alloc_rowid_error;
1863  }
1864 
1865  if (!varalloc)
1866  {
1867  jam();
1868  ptr= alloc_fix_rowid(&terrorCode,
1869  regFragPtr,
1870  regTabPtr,
1871  &regOperPtr.p->m_tuple_location,
1872  &frag_page_id);
1873  }
1874  else
1875  {
1876  jam();
1877  regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1878  ptr= alloc_var_rowid(&terrorCode,
1879  regFragPtr, regTabPtr,
1880  sizes[2+MM],
1881  &regOperPtr.p->m_tuple_location,
1882  &frag_page_id);
1883  }
1884  if (unlikely(ptr == 0))
1885  {
1886  jam();
1887  goto alloc_rowid_error;
1888  }
1889  }
1890  real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1891  update_acc = true; /* Will be updated later once success is known */
1892 
1893  base = (Tuple_header*)ptr;
1894  base->m_operation_ptr_i= regOperPtr.i;
1895  base->m_header_bits= Tuple_header::ALLOC |
1896  (sizes[2+MM] > 0 ? Tuple_header::VAR_PART : 0);
1897  }
1898  else
1899  {
1900  if (ERROR_INSERTED(4020))
1901  {
1902  goto size_change_error;
1903  }
1904 
1905  if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
1906  unlikely(handle_size_change_after_update(req_struct,
1907  base,
1908  regOperPtr.p,
1909  regFragPtr,
1910  regTabPtr,
1911  sizes) != 0))
1912  {
1913  goto size_change_error;
1914  }
1915  req_struct->m_use_rowid = false;
1916  base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
1917  }
1918 
1919  if (disk_insert)
1920  {
1921  Local_key tmp;
1922  Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
1923  1 : sizes[2+DD];
1924 
1925  if (ERROR_INSERTED(4021))
1926  {
1927  terrorCode = 1601;
1928  goto disk_prealloc_error;
1929  }
1930 
1931  int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
1932  if (unlikely(ret < 0))
1933  {
1934  terrorCode = -ret;
1935  goto disk_prealloc_error;
1936  }
1937 
1938  regOperPtr.p->op_struct.m_disk_preallocated= 1;
1939  tmp.m_page_idx= size;
1940  memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
1941 
1945  Local_key ref = regOperPtr.p->m_tuple_location;
1946  ref.m_page_no = frag_page_id;
1947 
1948  Tuple_header* disk_ptr= req_struct->m_disk_ptr;
1949  disk_ptr->m_header_bits = 0;
1950  disk_ptr->m_base_record_ref= ref.ref();
1951  }
1952 
1953  if (req_struct->m_reorg)
1954  {
1955  handle_reorg(req_struct, regFragPtr->fragStatus);
1956  }
1957 
1958  /* Have been successful with disk + mem, update ACC to point to
1959  * new record if necessary
1960  * Failures in disk alloc will skip this part
1961  */
1962  if (update_acc)
1963  {
1964  /* Acc stores the local key with the frag_page_id rather
1965  * than the real_page_id
1966  */
1967  ndbassert(regOperPtr.p->m_tuple_location.m_page_no == real_page_id);
1968 
1969  Local_key accKey = regOperPtr.p->m_tuple_location;
1970  accKey.m_page_no = frag_page_id;
1971  ** accminupdateptr = accKey;
1972  }
1973  else
1974  {
1975  * accminupdateptr = 0; // No accminupdate should be performed
1976  }
1977 
1978  if (regTabPtr->m_bits & Tablerec::TR_Checksum)
1979  {
1980  jam();
1981  setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1982  }
1983 
1984  set_tuple_state(regOperPtr.p, TUPLE_PREPARED);
1985 
1986  return 0;
1987 
1988 size_change_error:
1989  jam();
1990  terrorCode = ZMEM_NOMEM_ERROR;
1991  goto exit_error;
1992 
1993 undo_buffer_error:
1994  jam();
1995  terrorCode= ZMEM_NOMEM_ERROR;
1996  regOperPtr.p->m_undo_buffer_space = 0;
1997  if (mem_insert)
1998  regOperPtr.p->m_tuple_location.setNull();
1999  regOperPtr.p->m_copy_tuple_location.setNull();
2000  tupkeyErrorLab(req_struct);
2001  return -1;
2002 
2003 null_check_error:
2004  jam();
2005  terrorCode= ZNO_ILLEGAL_NULL_ATTR;
2006  goto update_error;
2007 
2008 mem_error:
2009  jam();
2010  terrorCode= ZMEM_NOMEM_ERROR;
2011  goto update_error;
2012 
2013 log_space_error:
2014  jam();
2015  regOperPtr.p->m_undo_buffer_space = 0;
2016 alloc_rowid_error:
2017  jam();
2018 update_error:
2019  jam();
2020  if (mem_insert)
2021  {
2022  regOperPtr.p->op_struct.in_active_list = false;
2023  regOperPtr.p->m_tuple_location.setNull();
2024  }
2025 exit_error:
2026  tupkeyErrorLab(req_struct);
2027  return -1;
2028 
2029 disk_prealloc_error:
2030  base->m_header_bits |= Tuple_header::FREED;
2031  goto exit_error;
2032 }
2033 
2034 /* ---------------------------------------------------------------- */
2035 /* ---------------------------- DELETE ---------------------------- */
2036 /* ---------------------------------------------------------------- */
2037 int Dbtup::handleDeleteReq(Signal* signal,
2038  Operationrec* regOperPtr,
2039  Fragrecord* regFragPtr,
2040  Tablerec* regTabPtr,
2041  KeyReqStruct *req_struct,
2042  bool disk)
2043 {
2044  Tuple_header* dst = alloc_copy_tuple(regTabPtr,
2045  &regOperPtr->m_copy_tuple_location);
2046  if (dst == 0) {
2047  terrorCode = ZMEM_NOMEM_ERROR;
2048  goto error;
2049  }
2050 
2051  // delete must set but not increment tupVersion
2052  if (!regOperPtr->is_first_operation())
2053  {
2054  Operationrec* prevOp= req_struct->prevOpPtr.p;
2055  regOperPtr->tupVersion= prevOp->tupVersion;
2056  // make copy since previous op is committed before this one
2057  const Tuple_header* org = get_copy_tuple(&prevOp->m_copy_tuple_location);
2058  Uint32 len = regTabPtr->total_rec_size -
2059  Uint32(((Uint32*)dst) -
2060  get_copy_tuple_raw(&regOperPtr->m_copy_tuple_location));
2061  memcpy(dst, org, 4 * len);
2062  req_struct->m_tuple_ptr = dst;
2063  }
2064  else
2065  {
2066  regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
2067  if (regTabPtr->m_no_of_disk_attributes)
2068  {
2069  dst->m_header_bits = req_struct->m_tuple_ptr->m_header_bits;
2070  memcpy(dst->get_disk_ref_ptr(regTabPtr),
2071  req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
2072  sizeof(Local_key));
2073  }
2074  }
2075  req_struct->changeMask.set();
2076  set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
2077 
2078  if(disk && regOperPtr->m_undo_buffer_space == 0)
2079  {
2080  regOperPtr->op_struct.m_wait_log_buffer = 1;
2081  regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
2082  Uint32 sz= regOperPtr->m_undo_buffer_space=
2083  (sizeof(Dbtup::Disk_undo::Free) >> 2) +
2084  regTabPtr->m_offsets[DD].m_fix_header_size - 1;
2085 
2086  D("Logfile_client - handleDeleteReq");
2087  Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
2088  terrorCode= lgman.alloc_log_space(sz);
2089  if(unlikely(terrorCode))
2090  {
2091  regOperPtr->m_undo_buffer_space= 0;
2092  goto error;
2093  }
2094  }
2095 
2096  set_tuple_state(regOperPtr, TUPLE_PREPARED);
2097 
2098  if (req_struct->attrinfo_len == 0)
2099  {
2100  return 0;
2101  }
2102 
2103  if (regTabPtr->need_expand(disk))
2104  {
2105  prepare_read(req_struct, regTabPtr, disk);
2106  }
2107 
2108  {
2109  Uint32 RlogSize;
2110  int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
2111  if (ret == 0 && (RlogSize= req_struct->log_size))
2112  {
2113  jam();
2114  sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2115  }
2116  return ret;
2117  }
2118 
2119 error:
2120  tupkeyErrorLab(req_struct);
2121  return -1;
2122 }
2123 
2124 int
2125 Dbtup::handleRefreshReq(Signal* signal,
2126  Ptr<Operationrec> regOperPtr,
2127  Ptr<Fragrecord> regFragPtr,
2128  Tablerec* regTabPtr,
2129  KeyReqStruct *req_struct,
2130  bool disk)
2131 {
2132  /* Here we setup the tuple so that a transition to its current
2133  * state can be observed by SUMA's detached triggers.
2134  *
2135  * If the tuple does not exist then we fabricate a tuple
2136  * so that it can appear to be 'deleted'.
2137  * The fabricated tuple may have invalid NULL values etc.
2138  * If the tuple does exist then we fabricate a null-change
2139  * update to the tuple.
2140  *
2141  * The logic differs depending on whether there are already
2142  * other operations on the tuple in this transaction.
2143  * No other operations (including Refresh) are allowed after
2144  * a refresh.
2145  */
2146  Uint32 refresh_case;
2147  if (regOperPtr.p->is_first_operation())
2148  {
2149  jam();
2150  if (Local_key::isInvalid(req_struct->frag_page_id,
2151  regOperPtr.p->m_tuple_location.m_page_idx))
2152  {
2153  jam();
2154  refresh_case = Operationrec::RF_SINGLE_NOT_EXIST;
2155  //ndbout_c("case 1");
2160  Local_key accminupdate;
2161  Local_key * accminupdateptr = &accminupdate;
2162 
2168  Uint32 save_disk = regTabPtr->m_no_of_disk_attributes;
2169  Local_key save_defaults = regTabPtr->m_default_value_location;
2171  regTabPtr->notNullAttributeMask;
2172 
2173  regTabPtr->m_no_of_disk_attributes = 0;
2174  regTabPtr->m_default_value_location.setNull();
2175  regOperPtr.p->op_struct.op_type = ZINSERT;
2176 
2180  regTabPtr->notNullAttributeMask.clear();
2181  const Uint32 * primarykeys =
2182  (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2183  for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2184  regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2185 
2186  int res = handleInsertReq(signal, regOperPtr,
2187  regFragPtr, regTabPtr, req_struct,
2188  &accminupdateptr);
2189 
2190  regTabPtr->m_no_of_disk_attributes = save_disk;
2191  regTabPtr->m_default_value_location = save_defaults;
2192  regTabPtr->notNullAttributeMask = save_mask;
2193 
2194  if (unlikely(res == -1))
2195  {
2196  return -1;
2197  }
2198 
2199  regOperPtr.p->op_struct.op_type = ZREFRESH;
2200 
2201  if (accminupdateptr)
2202  {
2206  c_lqh->accminupdate(signal,
2207  regOperPtr.p->userpointer,
2208  accminupdateptr);
2209  }
2210  }
2211  else
2212  {
2213  refresh_case = Operationrec::RF_SINGLE_EXIST;
2214  //ndbout_c("case 2");
2215  jam();
2216 
2217  Uint32 tup_version_save = req_struct->m_tuple_ptr->get_tuple_version();
2218  Uint32 new_tup_version = decr_tup_version(tup_version_save);
2219  Tuple_header* origTuple = req_struct->m_tuple_ptr;
2220  origTuple->set_tuple_version(new_tup_version);
2221  int res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2222  regTabPtr, req_struct, disk);
2223  /* Now we must reset the original tuple header back
2224  * to the original version.
2225  * The copy tuple will have the correct version due to
2226  * the update incrementing it.
2227  * On commit, the tuple becomes the copy tuple.
2228  * On abort, the original tuple remains. If we don't
2229  * reset it here, then aborts cause the version to
2230  * decrease
2231  */
2232  origTuple->set_tuple_version(tup_version_save);
2233  if (res == -1)
2234  return -1;
2235  }
2236  }
2237  else
2238  {
2239  /* Not first operation on tuple in transaction */
2240  jam();
2241 
2242  Uint32 tup_version_save = req_struct->prevOpPtr.p->tupVersion;
2243  Uint32 new_tup_version = decr_tup_version(tup_version_save);
2244  req_struct->prevOpPtr.p->tupVersion = new_tup_version;
2245 
2246  int res;
2247  if (req_struct->prevOpPtr.p->op_struct.op_type == ZDELETE)
2248  {
2249  refresh_case = Operationrec::RF_MULTI_NOT_EXIST;
2250  //ndbout_c("case 3");
2251 
2252  jam();
2259  Local_key save_defaults = regTabPtr->m_default_value_location;
2261  regTabPtr->notNullAttributeMask;
2262 
2263  regTabPtr->m_default_value_location.setNull();
2264  regOperPtr.p->op_struct.op_type = ZINSERT;
2265 
2269  regTabPtr->notNullAttributeMask.clear();
2270  const Uint32 * primarykeys =
2271  (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2272  for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2273  regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2274 
2278  Local_key * accminupdateptr = 0;
2279  res = handleInsertReq(signal, regOperPtr,
2280  regFragPtr, regTabPtr, req_struct,
2281  &accminupdateptr);
2282 
2283  regTabPtr->m_default_value_location = save_defaults;
2284  regTabPtr->notNullAttributeMask = save_mask;
2285 
2286  if (unlikely(res == -1))
2287  {
2288  return -1;
2289  }
2290 
2291  regOperPtr.p->op_struct.op_type = ZREFRESH;
2292  }
2293  else
2294  {
2295  jam();
2296  refresh_case = Operationrec::RF_MULTI_EXIST;
2297  //ndbout_c("case 4");
2301  res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2302  regTabPtr, req_struct, disk);
2303  }
2304  req_struct->prevOpPtr.p->tupVersion = tup_version_save;
2305  if (res == -1)
2306  return -1;
2307  }
2308 
2309  /* Store the refresh scenario in the copy tuple location */
2310  // TODO : Verify this is never used as a copy tuple location!
2311  regOperPtr.p->m_copy_tuple_location.m_file_no = refresh_case;
2312  return 0;
2313 }
2314 
2315 bool
2316 Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
2317  Tablerec* regTabPtr)
2318 {
2319 // Implement checking of updating all not null attributes in an insert here.
2320  Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
2321  /*
2322  * The idea here is maybe that changeMask is not-null attributes
2323  * and must contain notNullAttributeMask. But:
2324  *
2325  * 1. changeMask has all bits set on insert
2326  * 2. not-null is checked in each UpdateFunction
2327  * 3. the code below does not work except trivially due to 1.
2328  *
2329  * XXX remove or fix
2330  */
2331  attributeMask.clear();
2332  attributeMask.bitOR(req_struct->changeMask);
2333  attributeMask.bitAND(regTabPtr->notNullAttributeMask);
2334  attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
2335  if (!attributeMask.isclear()) {
2336  return false;
2337  }
2338  return true;
2339 }
2340 
2341 /* ---------------------------------------------------------------- */
2342 /* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE */
2343 /* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
2344 /* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL */
2345 /* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR PHASES*/
2346 /* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
2347 /* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
2348 /* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE */
2349 /* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
2350 /* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL. */
2351 /* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
2352 /* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN */
2353 /* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND */
2354 /* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM */
2355 /* TO THE CLIENT APPLICATION. */
2356 /* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
2357 /* THE INTERPRETER EXECUTION REGION. */
2358 /* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS */
2359 /* */
2360 /* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
2361 /* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
2362 /* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED */
2363 /* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER. */
2364 /* */
2365 /* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY */
2366 /* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO */
2367 /* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN */
2368 /* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS. */
2369 /* */
2370 /* */
2371 /* ----------------------------------------- */
2372 /* + INITIAL READ REGION + */
2373 /* ----------------------------------------- */
2374 /* + INTERPRETED EXECUTE REGION + */
2375 /* ----------------------------------------- */
2376 /* + FINAL UPDATE REGION + */
2377 /* ----------------------------------------- */
2378 /* + FINAL READ REGION + */
2379 /* ----------------------------------------- */
2380 /* + SUBROUTINE REGION + */
2381 /* ----------------------------------------- */
2382 /* ---------------------------------------------------------------- */
2383 /* ---------------------------------------------------------------- */
2384 /* ----------------- INTERPRETED EXECUTION ----------------------- */
2385 /* ---------------------------------------------------------------- */
2386 int Dbtup::interpreterStartLab(Signal* signal,
2387  KeyReqStruct *req_struct)
2388 {
2389  Operationrec * const regOperPtr = req_struct->operPtrP;
2390  int TnoDataRW;
2391  Uint32 RtotalLen, start_index, dstLen;
2392  Uint32 *dst;
2393 
2394  Uint32 RinitReadLen= cinBuffer[0];
2395  Uint32 RexecRegionLen= cinBuffer[1];
2396  Uint32 RfinalUpdateLen= cinBuffer[2];
2397  Uint32 RfinalRLen= cinBuffer[3];
2398  Uint32 RsubLen= cinBuffer[4];
2399 
2400  Uint32 RattrinbufLen= req_struct->attrinfo_len;
2401  const BlockReference sendBref= req_struct->rec_blockref;
2402 
2403  const Uint32 node = refToNode(sendBref);
2404  if(node != 0 && node != getOwnNodeId()) {
2405  start_index= 25;
2406  } else {
2407  jam();
2411  start_index= 3;
2412  }
2413  dst= &signal->theData[start_index];
2414  dstLen= (MAX_READ / 4) - start_index;
2415 
2416  RtotalLen= RinitReadLen;
2417  RtotalLen += RexecRegionLen;
2418  RtotalLen += RfinalUpdateLen;
2419  RtotalLen += RfinalRLen;
2420  RtotalLen += RsubLen;
2421 
2422  Uint32 RattroutCounter= 0;
2423  Uint32 RinstructionCounter= 5;
2424 
2425  /* All information to be logged/propagated to replicas
2426  * is generated from here on so reset the log word count
2427  */
2428  Uint32 RlogSize= req_struct->log_size= 0;
2429  if (((RtotalLen + 5) == RattrinbufLen) &&
2430  (RattrinbufLen >= 5) &&
2431  (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
2432  /* ---------------------------------------------------------------- */
2433  // We start by checking consistency. We must have the first five
2434  // words of the ATTRINFO to give us the length of the regions. The
2435  // size of these regions must be the same as the total ATTRINFO
2436  // length and finally the total length must be within the limits.
2437  /* ---------------------------------------------------------------- */
2438 
2439  if (RinitReadLen > 0) {
2440  jam();
2441  /* ---------------------------------------------------------------- */
2442  // The first step that can be taken in the interpreter is to read
2443  // data of the tuple before any updates have been applied.
2444  /* ---------------------------------------------------------------- */
2445  TnoDataRW= readAttributes(req_struct,
2446  &cinBuffer[5],
2447  RinitReadLen,
2448  &dst[0],
2449  dstLen,
2450  false);
2451  if (TnoDataRW >= 0) {
2452  RattroutCounter= TnoDataRW;
2453  RinstructionCounter += RinitReadLen;
2454  } else {
2455  jam();
2456  terrorCode = Uint32(-TnoDataRW);
2457  tupkeyErrorLab(req_struct);
2458  return -1;
2459  }
2460  }
2461  if (RexecRegionLen > 0) {
2462  jam();
2463  /* ---------------------------------------------------------------- */
2464  // The next step is the actual interpreted execution. This executes
2465  // a register-based virtual machine which can read and write attributes
2466  // to and from registers.
2467  /* ---------------------------------------------------------------- */
2468  Uint32 RsubPC= RinstructionCounter + RexecRegionLen
2469  + RfinalUpdateLen + RfinalRLen;
2470  TnoDataRW= interpreterNextLab(signal,
2471  req_struct,
2472  &clogMemBuffer[0],
2473  &cinBuffer[RinstructionCounter],
2474  RexecRegionLen,
2475  &cinBuffer[RsubPC],
2476  RsubLen,
2477  &coutBuffer[0],
2478  sizeof(coutBuffer) / 4);
2479  if (TnoDataRW != -1) {
2480  RinstructionCounter += RexecRegionLen;
2481  RlogSize= TnoDataRW;
2482  } else {
2483  jam();
2487  return -1;
2488  }
2489  }
2490 
2491  if ((RlogSize > 0) ||
2492  (RfinalUpdateLen > 0))
2493  {
2494  /* Operation updates row,
2495  * reset author pseudo-col before update takes effect
2496  * This should probably occur only if the interpreted program
2497  * did not explicitly write the value, but that requires a bit
2498  * to record whether the value has been written.
2499  */
2500  Tablerec* regTabPtr = req_struct->tablePtrP;
2501  Tuple_header* dst = req_struct->m_tuple_ptr;
2502 
2503  if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
2504  {
2505  Uint32 attrId =
2506  regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
2507 
2508  store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
2509  }
2510  }
2511 
2512  if (RfinalUpdateLen > 0) {
2513  jam();
2514  /* ---------------------------------------------------------------- */
2515  // We can also apply a set of updates without any conditions as part
2516  // of the interpreted execution.
2517  /* ---------------------------------------------------------------- */
2518  if (regOperPtr->op_struct.op_type == ZUPDATE) {
2519  TnoDataRW= updateAttributes(req_struct,
2520  &cinBuffer[RinstructionCounter],
2521  RfinalUpdateLen);
2522  if (TnoDataRW >= 0) {
2523  MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
2524  &cinBuffer[RinstructionCounter],
2525  RfinalUpdateLen);
2526  RinstructionCounter += RfinalUpdateLen;
2527  RlogSize += RfinalUpdateLen;
2528  } else {
2529  jam();
2530  terrorCode = Uint32(-TnoDataRW);
2531  tupkeyErrorLab(req_struct);
2532  return -1;
2533  }
2534  } else {
2535  return TUPKEY_abort(req_struct, 19);
2536  }
2537  }
2538  if (RfinalRLen > 0) {
2539  jam();
2540  /* ---------------------------------------------------------------- */
2541  // The final action is that we can also read the tuple after it has
2542  // been updated.
2543  /* ---------------------------------------------------------------- */
2544  TnoDataRW= readAttributes(req_struct,
2545  &cinBuffer[RinstructionCounter],
2546  RfinalRLen,
2547  &dst[RattroutCounter],
2548  (dstLen - RattroutCounter),
2549  false);
2550  if (TnoDataRW >= 0) {
2551  RattroutCounter += TnoDataRW;
2552  } else {
2553  jam();
2554  terrorCode = Uint32(-TnoDataRW);
2555  tupkeyErrorLab(req_struct);
2556  return -1;
2557  }
2558  }
2559  /* Add log words explicitly generated here to existing log size
2560  * - readAttributes can generate log for ANYVALUE column
2561  * It adds the words directly to req_struct->log_size
2562  * This is used for ANYVALUE and interpreted delete.
2563  */
2564  req_struct->log_size+= RlogSize;
2565  req_struct->read_length += RattroutCounter;
2566  sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
2567  if (RlogSize > 0) {
2568  return sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2569  }
2570  return 0;
2571  } else {
2572  return TUPKEY_abort(req_struct, 22);
2573  }
2574 }
2575 
2576 /* ---------------------------------------------------------------- */
2577 /* WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
2578 /* BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY */
2579 /* NODES. */
2580 /* INPUT: LOG_ATTRINFOPTR WHERE TO FETCH DATA FROM */
2581 /* TLOG_START FIRST INDEX TO LOG */
2582 /* TLOG_END LAST INDEX + 1 TO LOG */
2583 /* ---------------------------------------------------------------- */
2584 int Dbtup::sendLogAttrinfo(Signal* signal,
2585  KeyReqStruct * req_struct,
2586  Uint32 TlogSize,
2587  Operationrec * const regOperPtr)
2588 
2589 {
2590  /* Copy from Log buffer to segmented section,
2591  * then attach to ATTRINFO and execute direct
2592  * to LQH
2593  */
2594  ndbrequire( TlogSize > 0 );
2595  Uint32 longSectionIVal= RNIL;
2596  bool ok= appendToSection(longSectionIVal,
2597  &clogMemBuffer[0],
2598  TlogSize);
2599  if (unlikely(!ok))
2600  {
2601  /* Resource error, abort transaction */
2602  terrorCode = ZSEIZE_ATTRINBUFREC_ERROR;
2603  tupkeyErrorLab(req_struct);
2604  return -1;
2605  }
2606 
2607  /* Send a TUP_ATTRINFO signal to LQH, which contains
2608  * the relevant user pointer and the attrinfo section's
2609  * IVAL
2610  */
2611  signal->theData[0]= regOperPtr->userpointer;
2612  signal->theData[1]= TlogSize;
2613  signal->theData[2]= longSectionIVal;
2614 
2615  EXECUTE_DIRECT(DBLQH,
2616  GSN_TUP_ATTRINFO,
2617  signal,
2618  3);
2619  return 0;
2620 }
2621 
2622 inline
2623 Uint32
2624 Dbtup::brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
2625 {
2626  Uint32 TbranchDirection= TheInstruction >> 31;
2627  Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
2628  TprogramCounter--;
2629  if (TbranchDirection == 1) {
2630  jam();
2631  /* ---------------------------------------------------------------- */
2632  /* WE JUMP BACKWARDS. */
2633  /* ---------------------------------------------------------------- */
2634  return (TprogramCounter - TbranchLength);
2635  } else {
2636  jam();
2637  /* ---------------------------------------------------------------- */
2638  /* WE JUMP FORWARD. */
2639  /* ---------------------------------------------------------------- */
2640  return (TprogramCounter + TbranchLength);
2641  }
2642 }
2643 
2644 const Uint32 *
2645 Dbtup::lookupInterpreterParameter(Uint32 paramNo,
2646  const Uint32 * subptr,
2647  Uint32 sublen) const
2648 {
2658  Uint32 pos = 0;
2659  while (paramNo)
2660  {
2661  const Uint32 * head = subptr + pos;
2662  Uint32 len = AttributeHeader::getDataSize(* head);
2663  paramNo --;
2664  pos += 1 + len;
2665  if (unlikely(pos >= sublen))
2666  return 0;
2667  }
2668 
2669  const Uint32 * head = subptr + pos;
2670  Uint32 len = AttributeHeader::getDataSize(* head);
2671  if (unlikely(pos + 1 + len > sublen))
2672  return 0;
2673 
2674  return head;
2675 }
2676 
2677 int Dbtup::interpreterNextLab(Signal* signal,
2678  KeyReqStruct* req_struct,
2679  Uint32* logMemory,
2680  Uint32* mainProgram,
2681  Uint32 TmainProgLen,
2682  Uint32* subroutineProg,
2683  Uint32 TsubroutineLen,
2684  Uint32 * tmpArea,
2685  Uint32 tmpAreaSz)
2686 {
2687  register Uint32* TcurrentProgram= mainProgram;
2688  register Uint32 TcurrentSize= TmainProgLen;
2689  register Uint32 RnoOfInstructions= 0;
2690  register Uint32 TprogramCounter= 0;
2691  register Uint32 theInstruction;
2692  register Uint32 theRegister;
2693  Uint32 TdataWritten= 0;
2694  Uint32 RstackPtr= 0;
2695  union {
2696  Uint32 TregMemBuffer[32];
2697  Uint64 align[16];
2698  };
2699  (void)align; // kill warning
2700  Uint32 TstackMemBuffer[32];
2701 
2702  /* ---------------------------------------------------------------- */
2703  // Initialise all 8 registers to contain the NULL value.
2704  // In this version we can handle 32 and 64 bit unsigned integers.
2705  // They are handled as 64 bit values. Thus the 32 most significant
2706  // bits are zeroed for 32 bit values.
2707  /* ---------------------------------------------------------------- */
2708  TregMemBuffer[0]= 0;
2709  TregMemBuffer[4]= 0;
2710  TregMemBuffer[8]= 0;
2711  TregMemBuffer[12]= 0;
2712  TregMemBuffer[16]= 0;
2713  TregMemBuffer[20]= 0;
2714  TregMemBuffer[24]= 0;
2715  TregMemBuffer[28]= 0;
2716  Uint32 tmpHabitant= ~0;
2717 
2718  while (RnoOfInstructions < 8000) {
2719  /* ---------------------------------------------------------------- */
2720  /* EXECUTE THE NEXT INTERPRETER INSTRUCTION. */
2721  /* ---------------------------------------------------------------- */
2722  RnoOfInstructions++;
2723  theInstruction= TcurrentProgram[TprogramCounter];
2724  theRegister= Interpreter::getReg1(theInstruction) << 2;
2725 #ifdef TRACE_INTERPRETER
2726  ndbout_c("Interpreter : RnoOfInstructions : %u. TprogramCounter : %u. Opcode : %u",
2727  RnoOfInstructions, TprogramCounter, Interpreter::getOpCode(theInstruction));
2728 #endif
2729  if (TprogramCounter < TcurrentSize) {
2730  TprogramCounter++;
2731  switch (Interpreter::getOpCode(theInstruction)) {
2732  case Interpreter::READ_ATTR_INTO_REG:
2733  jam();
2734  /* ---------------------------------------------------------------- */
2735  // Read an attribute from the tuple into a register.
2736  // While reading an attribute we allow the attribute to be an array
2737  // as long as it fits in the 64 bits of the register.
2738  /* ---------------------------------------------------------------- */
2739  {
2740  Uint32 theAttrinfo= theInstruction;
2741  int TnoDataRW= readAttributes(req_struct,
2742  &theAttrinfo,
2743  (Uint32)1,
2744  &TregMemBuffer[theRegister],
2745  (Uint32)3,
2746  false);
2747  if (TnoDataRW == 2) {
2748  /* ------------------------------------------------------------- */
2749  // Two words read means that we get the instruction plus one 32
2750  // word read. Thus we set the register to be a 32 bit register.
2751  /* ------------------------------------------------------------- */
2752  TregMemBuffer[theRegister]= 0x50;
2753  // arithmetic conversion if big-endian
2754  * (Int64*)(TregMemBuffer+theRegister+2)= TregMemBuffer[theRegister+1];
2755  } else if (TnoDataRW == 3) {
2756  /* ------------------------------------------------------------- */
2757  // Three words read means that we get the instruction plus two
2758  // 32 words read. Thus we set the register to be a 64 bit register.
2759  /* ------------------------------------------------------------- */
2760  TregMemBuffer[theRegister]= 0x60;
2761  TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
2762  TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
2763  } else if (TnoDataRW == 1) {
2764  /* ------------------------------------------------------------- */
2765  // One word read means that we must have read a NULL value. We set
2766  // the register to indicate a NULL value.
2767  /* ------------------------------------------------------------- */
2768  TregMemBuffer[theRegister]= 0;
2769  TregMemBuffer[theRegister + 2]= 0;
2770  TregMemBuffer[theRegister + 3]= 0;
2771  } else if (TnoDataRW < 0) {
2772  jam();
2773  terrorCode = Uint32(-TnoDataRW);
2774  tupkeyErrorLab(req_struct);
2775  return -1;
2776  } else {
2777  /* ------------------------------------------------------------- */
2778  // Any other return value from the read attribute here is not
2779  // allowed and will lead to a system crash.
2780  /* ------------------------------------------------------------- */
2781  ndbrequire(false);
2782  }
2783  break;
2784  }
2785 
2786  case Interpreter::WRITE_ATTR_FROM_REG:
2787  jam();
2788  {
2789  Uint32 TattrId= theInstruction >> 16;
2790  Uint32 TattrDescrIndex= req_struct->tablePtrP->tabDescriptor +
2791  (TattrId << ZAD_LOG_SIZE);
2792  Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
2793  Uint32 TregType= TregMemBuffer[theRegister];
2794 
2795  /* --------------------------------------------------------------- */
2796  // Calculate the number of words of this attribute.
2797  // We allow writes into arrays as long as they fit into the 64 bit
2798  // register size.
2799  /* --------------------------------------------------------------- */
2800  Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
2801  Uint32 Toptype = req_struct->operPtrP->op_struct.op_type;
2802  Uint32 TdataForUpdate[3];
2803  Uint32 Tlen;
2804 
2805  AttributeHeader ah(TattrId, TattrNoOfWords << 2);
2806  TdataForUpdate[0]= ah.m_value;
2807  TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
2808  TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
2809  Tlen= TattrNoOfWords + 1;
2810  if (Toptype == ZUPDATE) {
2811  if (TattrNoOfWords <= 2) {
2812  if (TattrNoOfWords == 1) {
2813  // arithmetic conversion if big-endian
2814  Int64 * tmp = new (&TregMemBuffer[theRegister + 2]) Int64;
2815  TdataForUpdate[1] = Uint32(* tmp);
2816  TdataForUpdate[2] = 0;
2817  }
2818  if (TregType == 0) {
2819  /* --------------------------------------------------------- */
2820  // Write a NULL value into the attribute
2821  /* --------------------------------------------------------- */
2822  ah.setNULL();
2823  TdataForUpdate[0]= ah.m_value;
2824  Tlen= 1;
2825  }
2826  int TnoDataRW= updateAttributes(req_struct,
2827  &TdataForUpdate[0],
2828  Tlen);
2829  if (TnoDataRW >= 0) {
2830  /* --------------------------------------------------------- */
2831  // Write the written data also into the log buffer so that it
2832  // will be logged.
2833  /* --------------------------------------------------------- */
2834  logMemory[TdataWritten + 0]= TdataForUpdate[0];
2835  logMemory[TdataWritten + 1]= TdataForUpdate[1];
2836  logMemory[TdataWritten + 2]= TdataForUpdate[2];
2837  TdataWritten += Tlen;
2838  } else {
2839  terrorCode = Uint32(-TnoDataRW);
2840  tupkeyErrorLab(req_struct);
2841  return -1;
2842  }
2843  } else {
2844  return TUPKEY_abort(req_struct, 15);
2845  }
2846  } else {
2847  return TUPKEY_abort(req_struct, 16);
2848  }
2849  break;
2850  }
2851 
2852  case Interpreter::LOAD_CONST_NULL:
2853  jam();
2854  TregMemBuffer[theRegister]= 0; /* NULL INDICATOR */
2855  break;
2856 
2857  case Interpreter::LOAD_CONST16:
2858  jam();
2859  TregMemBuffer[theRegister]= 0x50; /* 32 BIT UNSIGNED CONSTANT */
2860  * (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
2861  break;
2862 
2863  case Interpreter::LOAD_CONST32:
2864  jam();
2865  TregMemBuffer[theRegister]= 0x50; /* 32 BIT UNSIGNED CONSTANT */
2866  * (Int64*)(TregMemBuffer+theRegister+2)= *
2867  (TcurrentProgram+TprogramCounter);
2868  TprogramCounter++;
2869  break;
2870 
2871  case Interpreter::LOAD_CONST64:
2872  jam();
2873  TregMemBuffer[theRegister]= 0x60; /* 64 BIT UNSIGNED CONSTANT */
2874  TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
2875  TprogramCounter++);
2876  TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
2877  TprogramCounter++);
2878  break;
2879 
2880  case Interpreter::ADD_REG_REG:
2881  jam();
2882  {
2883  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2884  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2885 
2886  Uint32 TrightType= TregMemBuffer[TrightRegister];
2887  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2888 
2889 
2890  Uint32 TleftType= TregMemBuffer[theRegister];
2891  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2892 
2893  if ((TleftType | TrightType) != 0) {
2894  Uint64 Tdest0= Tleft0 + Tright0;
2895  * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2896  TregMemBuffer[TdestRegister]= 0x60;
2897  } else {
2898  return TUPKEY_abort(req_struct, 20);
2899  }
2900  break;
2901  }
2902 
2903  case Interpreter::SUB_REG_REG:
2904  jam();
2905  {
2906  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2907  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2908 
2909  Uint32 TrightType= TregMemBuffer[TrightRegister];
2910  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2911 
2912  Uint32 TleftType= TregMemBuffer[theRegister];
2913  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2914 
2915  if ((TleftType | TrightType) != 0) {
2916  Int64 Tdest0= Tleft0 - Tright0;
2917  * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2918  TregMemBuffer[TdestRegister]= 0x60;
2919  } else {
2920  return TUPKEY_abort(req_struct, 20);
2921  }
2922  break;
2923  }
2924 
2925  case Interpreter::BRANCH:
2926  TprogramCounter= brancher(theInstruction, TprogramCounter);
2927  break;
2928 
2929  case Interpreter::BRANCH_REG_EQ_NULL:
2930  if (TregMemBuffer[theRegister] != 0) {
2931  jam();
2932  continue;
2933  } else {
2934  jam();
2935  TprogramCounter= brancher(theInstruction, TprogramCounter);
2936  }
2937  break;
2938 
2939  case Interpreter::BRANCH_REG_NE_NULL:
2940  if (TregMemBuffer[theRegister] == 0) {
2941  jam();
2942  continue;
2943  } else {
2944  jam();
2945  TprogramCounter= brancher(theInstruction, TprogramCounter);
2946  }
2947  break;
2948 
2949 
2950  case Interpreter::BRANCH_EQ_REG_REG:
2951  {
2952  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2953 
2954  Uint32 TleftType= TregMemBuffer[theRegister];
2955  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2956  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2957 
2958  Uint32 TrightType= TregMemBuffer[TrightRegister];
2959  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2960  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2961  if ((TrightType | TleftType) != 0) {
2962  jam();
2963  if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
2964  TprogramCounter= brancher(theInstruction, TprogramCounter);
2965  }
2966  } else {
2967  return TUPKEY_abort(req_struct, 23);
2968  }
2969  break;
2970  }
2971 
2972  case Interpreter::BRANCH_NE_REG_REG:
2973  {
2974  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2975 
2976  Uint32 TleftType= TregMemBuffer[theRegister];
2977  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2978  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2979 
2980  Uint32 TrightType= TregMemBuffer[TrightRegister];
2981  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2982  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2983  if ((TrightType | TleftType) != 0) {
2984  jam();
2985  if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
2986  TprogramCounter= brancher(theInstruction, TprogramCounter);
2987  }
2988  } else {
2989  return TUPKEY_abort(req_struct, 24);
2990  }
2991  break;
2992  }
2993 
2994  case Interpreter::BRANCH_LT_REG_REG:
2995  {
2996  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2997 
2998  Uint32 TrightType= TregMemBuffer[TrightRegister];
2999  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3000 
3001  Uint32 TleftType= TregMemBuffer[theRegister];
3002  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3003 
3004 
3005  if ((TrightType | TleftType) != 0) {
3006  jam();
3007  if (Tleft0 < Tright0) {
3008  TprogramCounter= brancher(theInstruction, TprogramCounter);
3009  }
3010  } else {
3011  return TUPKEY_abort(req_struct, 24);
3012  }
3013  break;
3014  }
3015 
3016  case Interpreter::BRANCH_LE_REG_REG:
3017  {
3018  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3019 
3020  Uint32 TrightType= TregMemBuffer[TrightRegister];
3021  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3022 
3023  Uint32 TleftType= TregMemBuffer[theRegister];
3024  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3025 
3026 
3027  if ((TrightType | TleftType) != 0) {
3028  jam();
3029  if (Tleft0 <= Tright0) {
3030  TprogramCounter= brancher(theInstruction, TprogramCounter);
3031  }
3032  } else {
3033  return TUPKEY_abort(req_struct, 26);
3034  }
3035  break;
3036  }
3037 
3038  case Interpreter::BRANCH_GT_REG_REG:
3039  {
3040  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3041 
3042  Uint32 TrightType= TregMemBuffer[TrightRegister];
3043  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3044 
3045  Uint32 TleftType= TregMemBuffer[theRegister];
3046  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3047 
3048 
3049  if ((TrightType | TleftType) != 0) {
3050  jam();
3051  if (Tleft0 > Tright0){
3052  TprogramCounter= brancher(theInstruction, TprogramCounter);
3053  }
3054  } else {
3055  return TUPKEY_abort(req_struct, 27);
3056  }
3057  break;
3058  }
3059 
3060  case Interpreter::BRANCH_GE_REG_REG:
3061  {
3062  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3063 
3064  Uint32 TrightType= TregMemBuffer[TrightRegister];
3065  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3066 
3067  Uint32 TleftType= TregMemBuffer[theRegister];
3068  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3069 
3070 
3071  if ((TrightType | TleftType) != 0) {
3072  jam();
3073  if (Tleft0 >= Tright0){
3074  TprogramCounter= brancher(theInstruction, TprogramCounter);
3075  }
3076  } else {
3077  return TUPKEY_abort(req_struct, 28);
3078  }
3079  break;
3080  }
3081 
3082  case Interpreter::BRANCH_ATTR_OP_ARG_2:
3083  case Interpreter::BRANCH_ATTR_OP_ARG:{
3084  jam();
3085  Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
3086  Uint32 ins2 = TcurrentProgram[TprogramCounter];
3087  Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
3088  Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
3089  Uint32 step = argLen;
3090 
3091  if(tmpHabitant != attrId){
3092  Int32 TnoDataR = readAttributes(req_struct,
3093  &attrId, 1,
3094  tmpArea, tmpAreaSz,
3095  false);
3096 
3097  if (TnoDataR < 0) {
3098  jam();
3099  terrorCode = Uint32(-TnoDataR);
3100  tupkeyErrorLab(req_struct);
3101  return -1;
3102  }
3103  tmpHabitant= attrId;
3104  }
3105 
3106  // get type
3107  attrId >>= 16;
3108  Uint32 TattrDescrIndex = req_struct->tablePtrP->tabDescriptor +
3109  (attrId << ZAD_LOG_SIZE);
3110  Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
3111  Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
3112  Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
3113  void * cs = 0;
3114  if(AttributeOffset::getCharsetFlag(TattrDesc2))
3115  {
3116  Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
3117  cs = req_struct->tablePtrP->charsetArray[pos];
3118  }
3119  const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
3120 
3121  // get data
3122  AttributeHeader ah(tmpArea[0]);
3123  const char* s1 = (char*)&tmpArea[1];
3124  const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
3125  // fixed length in 5.0
3126  Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
3127 
3128  if (Interpreter::getOpCode(theInstruction) ==
3129  Interpreter::BRANCH_ATTR_OP_ARG_2)
3130  {
3131  jam();
3132  Uint32 paramNo = Interpreter::getBranchCol_ParamNo(ins2);
3133  const Uint32 * paramptr = lookupInterpreterParameter(paramNo,
3134  subroutineProg,
3135  TsubroutineLen);
3136  if (unlikely(paramptr == 0))
3137  {
3138  jam();
3139  terrorCode = 99; // TODO
3140  tupkeyErrorLab(req_struct);
3141  return -1;
3142  }
3143 
3144  argLen = AttributeHeader::getByteSize(* paramptr);
3145  step = 0;
3146  s2 = (char*)(paramptr + 1);
3147  }
3148 
3149  if (typeId == NDB_TYPE_BIT)
3150  {
3151  /* Size in bytes for bit fields can be incorrect due to
3152  * rounding down
3153  */
3154  Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1)
3155  + 7) / 8;
3156  attrLen= bitFieldAttrLen;
3157  }
3158 
3159  bool r1_null = ah.isNULL();
3160  bool r2_null = argLen == 0;
3161  int res1;
3162  if (cond <= Interpreter::GE)
3163  {
3164  /* Inequality - EQ, NE, LT, LE, GT, GE */
3165  if (r1_null || r2_null) {
3166  // NULL==NULL and NULL<not-NULL
3167  res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
3168  } else {
3169  jam();
3170  if (unlikely(sqlType.m_cmp == 0))
3171  {
3172  return TUPKEY_abort(req_struct, 40);
3173  }
3174  res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen);
3175  }
3176  } else {
3177  if ((cond == Interpreter::LIKE) ||
3178  (cond == Interpreter::NOT_LIKE))
3179  {
3180  if (r1_null || r2_null) {
3181  // NULL like NULL is true (has no practical use)
3182  res1 = r1_null && r2_null ? 0 : -1;
3183  } else {
3184  jam();
3185  if (unlikely(sqlType.m_like == 0))
3186  {
3187  return TUPKEY_abort(req_struct, 40);
3188  }
3189  res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
3190  }
3191  }
3192  else
3193  {
3194  /* AND_XX_MASK condition */
3195  ndbassert(cond <= Interpreter::AND_NE_ZERO);
3196  if (unlikely(sqlType.m_mask == 0))
3197  {
3198  return TUPKEY_abort(req_struct,40);
3199  }
3200  /* If either arg is NULL, we say COL AND MASK
3201  * NE_ZERO and NE_MASK.
3202  */
3203  if (r1_null || r2_null) {
3204  res1= 1;
3205  } else {
3206 
3207  bool cmpZero=
3208  (cond == Interpreter::AND_EQ_ZERO) ||
3209  (cond == Interpreter::AND_NE_ZERO);
3210 
3211  res1 = (*sqlType.m_mask)(s1, attrLen, s2, argLen, cmpZero);
3212  }
3213  }
3214  }
3215 
3216  int res = 0;
3217  switch ((Interpreter::BinaryCondition)cond) {
3218  case Interpreter::EQ:
3219  res = (res1 == 0);
3220  break;
3221  case Interpreter::NE:
3222  res = (res1 != 0);
3223  break;
3224  // note the condition is backwards
3225  case Interpreter::LT:
3226  res = (res1 > 0);
3227  break;
3228  case Interpreter::LE:
3229  res = (res1 >= 0);
3230  break;
3231  case Interpreter::GT:
3232  res = (res1 < 0);
3233  break;
3234  case Interpreter::GE:
3235  res = (res1 <= 0);
3236  break;
3237  case Interpreter::LIKE:
3238  res = (res1 == 0);
3239  break;
3240  case Interpreter::NOT_LIKE:
3241  res = (res1 == 1);
3242  break;
3243  case Interpreter::AND_EQ_MASK:
3244  res = (res1 == 0);
3245  break;
3246  case Interpreter::AND_NE_MASK:
3247  res = (res1 != 0);
3248  break;
3249  case Interpreter::AND_EQ_ZERO:
3250  res = (res1 == 0);
3251  break;
3252  case Interpreter::AND_NE_ZERO:
3253  res = (res1 != 0);
3254  break;
3255  // XXX handle invalid value
3256  }
3257 #ifdef TRACE_INTERPRETER
3258  ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
3259  cond, attrId >> 16,
3260  attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
3261 #endif
3262  if (res)
3263  TprogramCounter = brancher(theInstruction, TprogramCounter);
3264  else
3265  {
3266  Uint32 tmp = ((step + 3) >> 2) + 1;
3267  TprogramCounter += tmp;
3268  }
3269  break;
3270  }
3271 
3272  case Interpreter::BRANCH_ATTR_EQ_NULL:{
3273  jam();
3274  Uint32 ins2= TcurrentProgram[TprogramCounter];
3275  Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3276 
3277  if (tmpHabitant != attrId){
3278  Int32 TnoDataR= readAttributes(req_struct,
3279  &attrId, 1,
3280  tmpArea, tmpAreaSz,
3281  false);
3282 
3283  if (TnoDataR < 0) {
3284  jam();
3285  terrorCode = Uint32(-TnoDataR);
3286  tupkeyErrorLab(req_struct);
3287  return -1;
3288  }
3289  tmpHabitant= attrId;
3290  }
3291 
3292  AttributeHeader ah(tmpArea[0]);
3293  if (ah.isNULL()){
3294  TprogramCounter= brancher(theInstruction, TprogramCounter);
3295  } else {
3296  TprogramCounter ++;
3297  }
3298  break;
3299  }
3300 
3301  case Interpreter::BRANCH_ATTR_NE_NULL:{
3302  jam();
3303  Uint32 ins2= TcurrentProgram[TprogramCounter];
3304  Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3305 
3306  if (tmpHabitant != attrId){
3307  Int32 TnoDataR= readAttributes(req_struct,
3308  &attrId, 1,
3309  tmpArea, tmpAreaSz,
3310  false);
3311 
3312  if (TnoDataR < 0) {
3313  jam();
3314  terrorCode = Uint32(-TnoDataR);
3315  tupkeyErrorLab(req_struct);
3316  return -1;
3317  }
3318  tmpHabitant= attrId;
3319  }
3320 
3321  AttributeHeader ah(tmpArea[0]);
3322  if (ah.isNULL()){
3323  TprogramCounter ++;
3324  } else {
3325  TprogramCounter= brancher(theInstruction, TprogramCounter);
3326  }
3327  break;
3328  }
3329 
3330  case Interpreter::EXIT_OK:
3331  jam();
3332 #ifdef TRACE_INTERPRETER
3333  ndbout_c(" - exit_ok");
3334 #endif
3335  return TdataWritten;
3336 
3337  case Interpreter::EXIT_OK_LAST:
3338  jam();
3339 #ifdef TRACE_INTERPRETER
3340  ndbout_c(" - exit_ok_last");
3341 #endif
3342  req_struct->last_row= true;
3343  return TdataWritten;
3344 
3345  case Interpreter::EXIT_REFUSE:
3346  jam();
3347 #ifdef TRACE_INTERPRETER
3348  ndbout_c(" - exit_nok");
3349 #endif
3350  terrorCode= theInstruction >> 16;
3351  return TUPKEY_abort(req_struct, 29);
3352 
3353  case Interpreter::CALL:
3354  jam();
3355 #ifdef TRACE_INTERPRETER
3356  ndbout_c(" - call addr=%u, subroutine len=%u ret addr=%u",
3357  theInstruction >> 16, TsubroutineLen, TprogramCounter);
3358 #endif
3359  RstackPtr++;
3360  if (RstackPtr < 32) {
3361  TstackMemBuffer[RstackPtr]= TprogramCounter;
3362  TprogramCounter= theInstruction >> 16;
3363  if (TprogramCounter < TsubroutineLen) {
3364  TcurrentProgram= subroutineProg;
3365  TcurrentSize= TsubroutineLen;
3366  } else {
3367  return TUPKEY_abort(req_struct, 30);
3368  }
3369  } else {
3370  return TUPKEY_abort(req_struct, 31);
3371  }
3372  break;
3373 
3374  case Interpreter::RETURN:
3375  jam();
3376 #ifdef TRACE_INTERPRETER
3377  ndbout_c(" - return to %u from stack level %u",
3378  TstackMemBuffer[RstackPtr],
3379  RstackPtr);
3380 #endif
3381  if (RstackPtr > 0) {
3382  TprogramCounter= TstackMemBuffer[RstackPtr];
3383  RstackPtr--;
3384  if (RstackPtr == 0) {
3385  jam();
3386  /* ------------------------------------------------------------- */
3387  // We are back to the main program.
3388  /* ------------------------------------------------------------- */
3389  TcurrentProgram= mainProgram;
3390  TcurrentSize= TmainProgLen;
3391  }
3392  } else {
3393  return TUPKEY_abort(req_struct, 32);
3394  }
3395  break;
3396 
3397  default:
3398  return TUPKEY_abort(req_struct, 33);
3399  }
3400  } else {
3401  return TUPKEY_abort(req_struct, 34);
3402  }
3403  }
3404  return TUPKEY_abort(req_struct, 35);
3405 }
3406 
3416 static
3417 Uint32*
3418 expand_var_part(Dbtup::KeyReqStruct::Var_data *dst,
3419  const Uint32* src,
3420  const Uint32 * tabDesc,
3421  const Uint16* order)
3422 {
3423  char* dst_ptr= dst->m_data_ptr;
3424  Uint32 no_attr= dst->m_var_len_offset;
3425  Uint16* dst_off_ptr= dst->m_offset_array_ptr;
3426  Uint16* dst_len_ptr= dst_off_ptr + no_attr;
3427  const Uint16* src_off_ptr= (const Uint16*)src;
3428  const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
3429 
3430  Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
3431  for(Uint32 i = 0; i<no_attr; i++)
3432  {
3433  next_pos= *src_off_ptr++;
3434  len= next_pos - tmp;
3435 
3436  *dst_off_ptr++ = dst_off;
3437  *dst_len_ptr++ = dst_off + len;
3438  memcpy(dst_ptr, src_ptr, len);
3439  src_ptr += len;
3440 
3441  max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
3442  dst_ptr += max_len; // Max size
3443  dst_off += max_len;
3444 
3445  tmp= next_pos;
3446  }
3447 
3448  return ALIGN_WORD(dst_ptr);
3449 }
3450 
3451 void
3452 Dbtup::expand_tuple(KeyReqStruct* req_struct,
3453  Uint32 sizes[2],
3454  Tuple_header* src,
3455  const Tablerec* tabPtrP,
3456  bool disk)
3457 {
3458  Uint32 bits= src->m_header_bits;
3459  Uint32 extra_bits = bits;
3460  Tuple_header* ptr= req_struct->m_tuple_ptr;
3461 
3462  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3463  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3464  Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
3465  Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
3466  Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3467  Uint32 fix_size= tabPtrP->m_offsets[MM].m_fix_header_size;
3468  Uint32 order_desc= tabPtrP->m_real_order_descriptor;
3469 
3470  Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3471  const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
3472  const Uint32 *src_ptr= src->get_end_of_fix_part_ptr(tabPtrP);
3473  const Var_part_ref* var_ref = src->get_var_part_ref_ptr(tabPtrP);
3474  const Uint32 *desc= (Uint32*)req_struct->attr_descr;
3475  const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
3476  order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
3477 
3478  // Copy fix part
3479  sizes[MM]= 1;
3480  memcpy(ptr, src, 4*fix_size);
3481  if(mm_vars || mm_dyns)
3482  {
3483  /*
3484  * Reserve place for initial length word and offset array (with one extra
3485  * offset). This will be filled-in in later, in shrink_tuple().
3486  */
3487  dst_ptr += Varpart_copy::SZ32;
3488 
3489  KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3490  Uint32 step; // in bytes
3491  Uint32 src_len;
3492  const Uint32 *src_data;
3493  if (bits & Tuple_header::VAR_PART)
3494  {
3495  KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3496  if(! (bits & Tuple_header::COPY_TUPLE))
3497  {
3498  /* This is for the initial expansion of a stored row. */
3499  Ptr<Page> var_page;
3500  src_data= get_ptr(&var_page, *var_ref);
3501  src_len= get_len(&var_page, *var_ref);
3502  sizes[MM]= src_len;
3503  step= 0;
3504  req_struct->m_varpart_page_ptr = var_page;
3505 
3506  /* An original tuple cant have grown as we're expanding it...
3507  * else we would be "re-expand"*/
3508  ndbassert(! (bits & Tuple_header::MM_GROWN));
3509  }
3510  else
3511  {
3512  /* This is for the re-expansion of a shrunken row (update2 ...) */
3513 
3514  Varpart_copy* vp = (Varpart_copy*)src_ptr;
3515  src_len = vp->m_len;
3516  src_data= vp->m_data;
3517  step= (Varpart_copy::SZ32 + src_len); // 1+ is for extra word
3518  req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
3519  sizes[MM]= src_len;
3520  }
3521 
3522  if (mm_vars)
3523  {
3524  dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
3525  dst->m_offset_array_ptr= req_struct->var_pos_array;
3526  dst->m_var_len_offset= mm_vars;
3527  dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
3528 
3529  dst_ptr= expand_var_part(dst, src_data, desc, order);
3530  ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
3534  char* varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
3535  Uint32 varlen = ((Uint16*)src_data)[mm_vars];
3536  Uint32 *dynstart = ALIGN_WORD(varstart + varlen);
3537 
3538  ndbassert(src_len >= (dynstart - src_data));
3539  src_len -= Uint32(dynstart - src_data);
3540  src_data = dynstart;
3541  }
3542  }
3543  else
3544  {
3548  ndbassert(mm_vars == 0);
3549  src_len = step = sizes[MM] = 0;
3550  src_data = 0;
3551  }
3552 
3553  if (mm_dyns)
3554  {
3558  dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
3559  dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
3560  dst->m_max_dyn_offset= tabPtrP->m_offsets[MM].m_max_dyn_offset;
3561  dst->m_dyn_data_ptr= (char*)dst_ptr;
3562  dst_ptr= expand_dyn_part(dst, src_data,
3563  src_len,
3564  desc, order + mm_vars,
3565  mm_dynvar, mm_dynfix,
3566  tabPtrP->m_offsets[MM].m_dyn_null_words);
3567  }
3568 
3569  ndbassert((UintPtr(src_ptr) & 3) == 0);
3570  src_ptr = src_ptr + step;
3571  }
3572 
3573  src->m_header_bits= bits &
3574  ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
3575 
3576  sizes[DD]= 0;
3577  if(disk && dd_tot)
3578  {
3579  const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3580  order+= mm_vars+mm_dynvar+mm_dynfix;
3581 
3582  if(bits & Tuple_header::DISK_INLINE)
3583  {
3584  // Only on copy tuple
3585  ndbassert(bits & Tuple_header::COPY_TUPLE);
3586  }
3587  else
3588  {
3589  Local_key key;
3590  memcpy(&key, disk_ref, sizeof(key));
3591  key.m_page_no= req_struct->m_disk_page_ptr.i;
3592  src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3593  }
3594  extra_bits |= Tuple_header::DISK_INLINE;
3595 
3596  // Fix diskpart
3597  req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
3598  memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
3599  sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
3600 
3601  ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
3602 
3603  ndbrequire(dd_vars == 0);
3604  }
3605 
3606  ptr->m_header_bits= (extra_bits | Tuple_header::COPY_TUPLE);
3607  req_struct->is_expanded= true;
3608 }
3609 
3610 void
3611 Dbtup::dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP)
3612 {
3613  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3614  Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3615  //Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3616  const Tuple_header* ptr= req_struct->m_tuple_ptr;
3617  Uint32 bits= ptr->m_header_bits;
3618  const Uint32 *tuple_words= (Uint32 *)ptr;
3619  const Uint32 *fix_p;
3620  Uint32 fix_len;
3621  const Uint32 *var_p;
3622  Uint32 var_len;
3623  //const Uint32 *disk_p;
3624  //Uint32 disk_len;
3625  const char *typ;
3626 
3627  fix_p= tuple_words;
3628  fix_len= tabPtrP->m_offsets[MM].m_fix_header_size;
3629  if(req_struct->is_expanded)
3630  {
3631  typ= "expanded";
3632  var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3633  var_len= 0; // No dump of varpart in expanded
3634 #if 0
3635  disk_p= (Uint32 *)req_struct->m_disk_ptr;
3636  disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
3637 #endif
3638  }
3639  else if(! (bits & Tuple_header::COPY_TUPLE))
3640  {
3641  typ= "stored";
3642  if(mm_vars+mm_dyns)
3643  {
3644  //const KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3645  const Var_part_ref *varref= ptr->get_var_part_ref_ptr(tabPtrP);
3646  Ptr<Page> tmp;
3647  var_p= get_ptr(&tmp, * varref);
3648  var_len= get_len(&tmp, * varref);
3649  }
3650  else
3651  {
3652  var_p= 0;
3653  var_len= 0;
3654  }
3655 #if 0
3656  if(dd_tot)
3657  {
3658  Local_key key;
3659  memcpy(&key, ptr->get_disk_ref_ptr(tabPtrP), sizeof(key));
3660  key.m_page_no= req_struct->m_disk_page_ptr.i;
3661  disk_p= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3662  disk_len= tabPtrP->m_offsets[DD].m_fix_header_size;
3663  }
3664  else
3665  {
3666  disk_p= var_p;
3667  disk_len= 0;
3668  }
3669 #endif
3670  }
3671  else
3672  {
3673  typ= "shrunken";
3674  if(mm_vars+mm_dyns)
3675  {
3676  var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3677  var_len= *((Uint16 *)var_p) + 1;
3678  }
3679  else
3680  {
3681  var_p= 0;
3682  var_len= 0;
3683  }
3684 #if 0
3685  disk_p= (Uint32 *)(req_struct->m_disk_ptr);
3686  disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
3687 #endif
3688  }
3689  ndbout_c("Fixed part[%s](%p len=%u words)",typ, fix_p, fix_len);
3690  dump_hex(fix_p, fix_len);
3691  ndbout_c("Varpart part[%s](%p len=%u words)", typ , var_p, var_len);
3692  dump_hex(var_p, var_len);
3693 #if 0
3694  ndbout_c("Disk part[%s](%p len=%u words)", typ, disk_p, disk_len);
3695  dump_hex(disk_p, disk_len);
3696 #endif
3697 }
3698 
3699 void
3700 Dbtup::prepare_read(KeyReqStruct* req_struct,
3701  Tablerec* tabPtrP, bool disk)
3702 {
3703  Tuple_header* ptr= req_struct->m_tuple_ptr;
3704 
3705  Uint32 bits= ptr->m_header_bits;
3706  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3707  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3708  Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3709 
3710  const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3711  const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
3712  const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
3713  if(mm_vars || mm_dyns)
3714  {
3715  const Uint32 *src_data= src_ptr;
3716  Uint32 src_len;
3717  KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3718  if (bits & Tuple_header::VAR_PART)
3719  {
3720  if(! (bits & Tuple_header::COPY_TUPLE))
3721  {
3722  Ptr<Page> tmp;
3723  src_data= get_ptr(&tmp, * var_ref);
3724  src_len= get_len(&tmp, * var_ref);
3725 
3726  /* If the original tuple was grown,
3727  * the old size is stored at the end. */
3728  if(bits & Tuple_header::MM_GROWN)
3729  {
3734  ndbassert(src_len>0);
3735  src_len= src_data[src_len-1];
3736  }
3737  }
3738  else
3739  {
3740  Varpart_copy* vp = (Varpart_copy*)src_ptr;
3741  src_len = vp->m_len;
3742  src_data = vp->m_data;
3743  src_ptr++;
3744  }
3745 
3746  char* varstart;
3747  Uint32 varlen;
3748  const Uint32* dynstart;
3749  if (mm_vars)
3750  {
3751  varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
3752  varlen = ((Uint16*)src_data)[mm_vars];
3753  dynstart = ALIGN_WORD(varstart + varlen);
3754  }
3755  else
3756  {
3757  varstart = 0;
3758  varlen = 0;
3759  dynstart = src_data;
3760  }
3761 
3762  dst->m_data_ptr= varstart;
3763  dst->m_offset_array_ptr= (Uint16*)src_data;
3764  dst->m_var_len_offset= 1;
3765  dst->m_max_var_offset= varlen;
3766 
3767  Uint32 dynlen = Uint32(src_len - (dynstart - src_data));
3768  ndbassert(src_len >= (dynstart - src_data));
3769  dst->m_dyn_data_ptr= (char*)dynstart;
3770  dst->m_dyn_part_len= dynlen;
3771  // Do or not to to do
3772  // dst->m_dyn_offset_arr_ptr = dynlen ? (Uint16*)(dynstart + *(Uint8*)dynstart) : 0;
3773 
3774  /*
3775  dst->m_dyn_offset_arr_ptr and dst->m_dyn_len_offset are not used for
3776  reading the stored/shrunken format.
3777  */
3778  }
3779  else
3780  {
3781  src_len = 0;
3782  dst->m_max_var_offset = 0;
3783  dst->m_dyn_part_len = 0;
3784 #if defined VM_TRACE || defined ERROR_INSERT
3785  bzero(dst, sizeof(* dst));
3786 #endif
3787  }
3788 
3789  // disk part start after dynamic part.
3790  src_ptr+= src_len;
3791  }
3792 
3793  if(disk && dd_tot)
3794  {
3795  const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3796 
3797  if(bits & Tuple_header::DISK_INLINE)
3798  {
3799  // Only on copy tuple
3800  ndbassert(bits & Tuple_header::COPY_TUPLE);
3801  }
3802  else
3803  {
3804  // XXX
3805  Local_key key;
3806  memcpy(&key, disk_ref, sizeof(key));
3807  key.m_page_no= req_struct->m_disk_page_ptr.i;
3808  src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3809  }
3810  // Fix diskpart
3811  req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
3812  ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
3813  ndbrequire(dd_vars == 0);
3814  }
3815 
3816  req_struct->is_expanded= false;
3817 }
3818 
3819 void
3820 Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
3821  const Tablerec* tabPtrP, bool disk)
3822 {
3823  ndbassert(tabPtrP->need_shrink());
3824  Tuple_header* ptr= req_struct->m_tuple_ptr;
3825  ndbassert(ptr->m_header_bits & Tuple_header::COPY_TUPLE);
3826 
3827  KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3828  Uint32 order_desc= tabPtrP->m_real_order_descriptor;
3829  const Uint32 * tabDesc= (Uint32*)req_struct->attr_descr;
3830  const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
3831  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3832  Uint16 mm_fix= tabPtrP->m_attributes[MM].m_no_of_fixsize;
3833  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3834  Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3835  Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
3836  Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
3837  Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3838 
3839  Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3840  Uint16* src_off_ptr= req_struct->var_pos_array;
3841  order += mm_fix;
3842 
3843  sizes[MM] = 1;
3844  sizes[DD] = 0;
3845  if(mm_vars || mm_dyns)
3846  {
3847  Varpart_copy* vp = (Varpart_copy*)dst_ptr;
3848  Uint32* varstart = dst_ptr = vp->m_data;
3849 
3850  if (mm_vars)
3851  {
3852  Uint16* dst_off_ptr= (Uint16*)dst_ptr;
3853  char* dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
3854  char* src_data_ptr= dst_data_ptr;
3855  Uint32 off= 0;
3856  for(Uint32 i= 0; i<mm_vars; i++)
3857  {
3858  const char* data_ptr= src_data_ptr + *src_off_ptr;
3859  Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
3860  * dst_off_ptr++= off;
3861  memmove(dst_data_ptr, data_ptr, len);
3862  off += len;
3863  src_off_ptr++;
3864  dst_data_ptr += len;
3865  }
3866  *dst_off_ptr= off;
3867  dst_ptr = ALIGN_WORD(dst_data_ptr);
3868  order += mm_vars; // Point to first dynfix entry
3869  }
3870 
3871  if (mm_dyns)
3872  {
3873  dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
3874  order, mm_dynvar, mm_dynfix, MM);
3875  ndbassert((char*)dst_ptr <= ((char*)ptr) + 8192);
3876  order += mm_dynfix + mm_dynvar;
3877  }
3878 
3879  Uint32 varpart_len= Uint32(dst_ptr - varstart);
3880  vp->m_len = varpart_len;
3881  sizes[MM] = varpart_len;
3882  ptr->m_header_bits |= (varpart_len) ? Tuple_header::VAR_PART : 0;
3883 
3884  ndbassert((UintPtr(ptr) & 3) == 0);
3885  ndbassert(varpart_len < 0x10000);
3886  }
3887 
3888  if(disk && dd_tot)
3889  {
3890  Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
3891  req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
3892  ndbrequire(dd_vars == 0);
3893  sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
3894  memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
3895  }
3896 
3897  req_struct->is_expanded= false;
3898 
3899 }
3900 
3901 void
3902 Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
3903 {
3904  /* ToDo: We could also do some checks here for any dynamic part. */
3905  Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
3906  Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size +
3907  Tuple_header::HeaderSize;
3908 
3909  if(mm_vars == 0)
3910  return;
3911 
3912  for(Uint32 F= 0; F<MAX_FRAG_PER_NODE; F++)
3913  {
3914  FragrecordPtr fragPtr;
3915 
3916  if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
3917  continue;
3918 
3919  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3920  for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
3921  {
3922  Uint32 real= getRealpid(fragPtr.p, P);
3923  Var_page* page= (Var_page*)c_page_pool.getPtr(real);
3924 
3925  for(Uint32 i=1; i<page->high_index; i++)
3926  {
3927  Uint32 idx= page->get_index_word(i);
3928  Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
3929  if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
3930  {
3931  Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
3932  Uint32 *part= ptr->get_end_of_fix_part_ptr(regTabPtr);
3933  if(! (ptr->m_header_bits & Tuple_header::COPY_TUPLE))
3934  {
3935  ndbassert(len == fix_sz + 1);
3936  Local_key tmp; tmp.assref(*part);
3937  Ptr<Page> tmpPage;
3938  part= get_ptr(&tmpPage, *(Var_part_ref*)part);
3939  len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
3940  Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3941  ndbassert(len >= ((sz + 3) >> 2));
3942  }
3943  else
3944  {
3945  Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3946  ndbassert(len >= ((sz+3)>>2)+fix_sz);
3947  }
3948  if(ptr->m_operation_ptr_i != RNIL)
3949  {
3950  c_operation_pool.getPtr(ptr->m_operation_ptr_i);
3951  }
3952  }
3953  else if(!(idx & Var_page::FREE))
3954  {
3958  Uint32 *part= page->get_ptr(i);
3959  Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3960  ndbassert(len >= ((sz + 3) >> 2));
3961  }
3962  else
3963  {
3964 
3965  }
3966  }
3967  if(p == 0 && page->high_index > 1)
3968  page->reorg((Var_page*)ctemp_page);
3969  }
3970  }
3971 
3972  if(p == 0)
3973  {
3974  validate_page(regTabPtr, (Var_page*)1);
3975  }
3976 }
3977 
3978 int
3979 Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
3980  Tuple_header* org,
3981  Operationrec* regOperPtr,
3982  Fragrecord* regFragPtr,
3983  Tablerec* regTabPtr,
3984  Uint32 sizes[4])
3985 {
3986  ndbrequire(sizes[1] == sizes[3]);
3987  //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
3988  if(0)
3989  printf("%p %d %d - handle_size_change_after_update ",
3990  req_struct->m_tuple_ptr,
3991  regOperPtr->m_tuple_location.m_page_no,
3992  regOperPtr->m_tuple_location.m_page_idx);
3993 
3994  Uint32 bits= org->m_header_bits;
3995  Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
3996 
3997  if(sizes[2+MM] == sizes[MM])
3998  ;
3999  else if(sizes[2+MM] < sizes[MM])
4000  {
4001  if(0) ndbout_c("shrink");
4002  req_struct->m_tuple_ptr->m_header_bits= copy_bits|Tuple_header::MM_SHRINK;
4003  }
4004  else
4005  {
4006  if(0) printf("grow - ");
4007  Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
4008  Var_page* pageP= (Var_page*)pagePtr.p;
4009  Var_part_ref *refptr= org->get_var_part_ref_ptr(regTabPtr);
4010  ndbassert(! (bits & Tuple_header::COPY_TUPLE));
4011 
4012  Local_key ref;
4013  refptr->copyout(&ref);
4014  Uint32 alloc;
4015  Uint32 idx= ref.m_page_idx;
4016  if (bits & Tuple_header::VAR_PART)
4017  {
4018  if (copy_bits & Tuple_header::COPY_TUPLE)
4019  {
4020  c_page_pool.getPtr(pagePtr, ref.m_page_no);
4021  pageP = (Var_page*)pagePtr.p;
4022  }
4023  alloc = pageP->get_entry_len(idx);
4024  }
4025  else
4026  {
4027  alloc = 0;
4028  }
4029  Uint32 orig_size= alloc;
4030  if(bits & Tuple_header::MM_GROWN)
4031  {
4032  /* Was grown before, so must fetch real original size from last word. */
4033  Uint32 *old_var_part= pageP->get_ptr(idx);
4034  ndbassert(alloc>0);
4035  orig_size= old_var_part[alloc-1];
4036  }
4037 
4038  if (alloc)
4039  {
4040 #ifdef VM_TRACE
4041  if(!pageP->get_entry_chain(idx))
4042  ndbout << *pageP << endl;
4043 #endif
4044  ndbassert(pageP->get_entry_chain(idx));
4045  }
4046 
4047  Uint32 needed= sizes[2+MM];
4048 
4049  if(needed <= alloc)
4050  {
4051  //ndbassert(!regOperPtr->is_first_operation());
4052  if (0) ndbout_c(" no grow");
4053  return 0;
4054  }
4055  Uint32 *new_var_part=realloc_var_part(&terrorCode,
4056  regFragPtr, regTabPtr, pagePtr,
4057  refptr, alloc, needed);
4058  if (unlikely(new_var_part==NULL))
4059  return -1;
4060  /* Mark the tuple grown, store the original length at the end. */
4061  org->m_header_bits= bits | Tuple_header::MM_GROWN | Tuple_header::VAR_PART;
4062  new_var_part[needed-1]= orig_size;
4063 
4064  if (regTabPtr->m_bits & Tablerec::TR_Checksum)
4065  {
4066  jam();
4067  setChecksum(org, regTabPtr);
4068  }
4069  }
4070  return 0;
4071 }
4072 
4073 int
4074 Dbtup::optimize_var_part(KeyReqStruct* req_struct,
4075  Tuple_header* org,
4076  Operationrec* regOperPtr,
4077  Fragrecord* regFragPtr,
4078  Tablerec* regTabPtr)
4079 {
4080  jam();
4081  Var_part_ref* refptr = org->get_var_part_ref_ptr(regTabPtr);
4082 
4083  Local_key ref;
4084  refptr->copyout(&ref);
4085  Uint32 idx = ref.m_page_idx;
4086 
4087  Ptr<Page> pagePtr;
4088  c_page_pool.getPtr(pagePtr, ref.m_page_no);
4089 
4090  Var_page* pageP = (Var_page*)pagePtr.p;
4091  Uint32 var_part_size = pageP->get_entry_len(idx);
4092 
4097  if(pageP->list_index != MAX_FREE_LIST)
4098  {
4099  jam();
4100  /*
4101  * optimize var part of tuple by moving varpart,
4102  * then we possibly reclaim free pages
4103  */
4104  move_var_part(regFragPtr, regTabPtr, pagePtr,
4105  refptr, var_part_size);
4106 
4107  if (regTabPtr->m_bits & Tablerec::TR_Checksum)
4108  {
4109  jam();
4110  setChecksum(org, regTabPtr);
4111  }
4112  }
4113 
4114  return 0;
4115 }
4116 
4117 int
4118 Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4119 {
4120  FragrecordPtr fragPtr;
4121  fragPtr.i= fragPtrI;
4122  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4123  TablerecPtr tablePtr;
4124  tablePtr.i= fragPtr.p->fragTableId;
4125  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4126 
4127  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4128  {
4129  Local_key tmp = *key;
4130  PagePtr pagePtr;
4131 
4132  Uint32 err;
4133  pagePtr.i = allocFragPage(&err, tablePtr.p, fragPtr.p, tmp.m_page_no);
4134  if (unlikely(pagePtr.i == RNIL))
4135  {
4136  return -(int)err;
4137  }
4138  c_page_pool.getPtr(pagePtr);
4139 
4140  Tuple_header* ptr = (Tuple_header*)
4141  ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
4142 
4143  ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
4144  *ptr->get_mm_gci(tablePtr.p) = gci;
4145  }
4146  return 0;
4147 }
4148 
4149 int
4150 Dbtup::nr_read_pk(Uint32 fragPtrI,
4151  const Local_key* key, Uint32* dst, bool& copy)
4152 {
4153 
4154  FragrecordPtr fragPtr;
4155  fragPtr.i= fragPtrI;
4156  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4157  TablerecPtr tablePtr;
4158  tablePtr.i= fragPtr.p->fragTableId;
4159  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4160 
4161  Local_key tmp = *key;
4162 
4163  Uint32 err;
4164  PagePtr pagePtr;
4165  pagePtr.i = allocFragPage(&err, tablePtr.p, fragPtr.p, tmp.m_page_no);
4166  if (unlikely(pagePtr.i == RNIL))
4167  return -(int)err;
4168 
4169  c_page_pool.getPtr(pagePtr);
4170  KeyReqStruct req_struct(this);
4171  Uint32* ptr= ((Fix_page*)pagePtr.p)->get_ptr(key->m_page_idx, 0);
4172 
4173  req_struct.m_page_ptr = pagePtr;
4174  req_struct.m_tuple_ptr = (Tuple_header*)ptr;
4175  Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
4176 
4177  int ret = 0;
4178  copy = false;
4179  if (! (bits & Tuple_header::FREE))
4180  {
4181  if (bits & Tuple_header::ALLOC)
4182  {
4183  Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
4184  Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
4185  ndbassert(!opPtrP->m_copy_tuple_location.isNull());
4186  req_struct.m_tuple_ptr=
4187  get_copy_tuple(&opPtrP->m_copy_tuple_location);
4188  copy = true;
4189  }
4190  req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
4191  req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
4192 
4193  Uint32 num_attr= tablePtr.p->m_no_of_attributes;
4194  Uint32 descr_start= tablePtr.p->tabDescriptor;
4195  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
4196  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
4197  req_struct.attr_descr= tab_descr;
4198 
4199  if (tablePtr.p->need_expand())
4200  prepare_read(&req_struct, tablePtr.p, false);
4201 
4202  const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
4203  const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
4204  // read pk attributes from original tuple
4205 
4206  req_struct.tablePtrP = tablePtr.p;
4207  req_struct.fragPtrP = fragPtr.p;
4208 
4209  // do it
4210  ret = readAttributes(&req_struct,
4211  attrIds,
4212  numAttrs,
4213  dst,
4214  ZNIL, false);
4215 
4216  // done
4217  if (likely(ret >= 0)) {
4218  // remove headers
4219  Uint32 n= 0;
4220  Uint32 i= 0;
4221  while (n < numAttrs) {
4222  const AttributeHeader ah(dst[i]);
4223  Uint32 size= ah.getDataSize();
4224  ndbrequire(size != 0);
4225  for (Uint32 j= 0; j < size; j++) {
4226  dst[i + j - n]= dst[i + j + 1];
4227  }
4228  n+= 1;
4229  i+= 1 + size;
4230  }
4231  ndbrequire((int)i == ret);
4232  ret -= numAttrs;
4233  } else {
4234  return ret;
4235  }
4236  }
4237 
4238  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4239  {
4240  dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
4241  }
4242  else
4243  {
4244  dst[ret] = 0;
4245  }
4246  return ret;
4247 }
4248 
4249 #include <signaldata/TuxMaint.hpp>
4250 
4251 int
4252 Dbtup::nr_delete(Signal* signal, Uint32 senderData,
4253  Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4254 {
4255  FragrecordPtr fragPtr;
4256  fragPtr.i= fragPtrI;
4257  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4258  TablerecPtr tablePtr;
4259  tablePtr.i= fragPtr.p->fragTableId;
4260  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4261 
4262  Local_key tmp = * key;
4263  tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no);
4264 
4265  PagePtr pagePtr;
4266  Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
4267 
4268  if (!tablePtr.p->tuxCustomTriggers.isEmpty())
4269  {
4270  jam();
4271  TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
4272  req->tableId = fragPtr.p->fragTableId;
4273  req->fragId = fragPtr.p->fragmentId;
4274  req->pageId = tmp.m_page_no;
4275  req->pageIndex = tmp.m_page_idx;
4276  req->tupVersion = ptr->get_tuple_version();
4277  req->opInfo = TuxMaintReq::OpRemove;
4278  removeTuxEntries(signal, tablePtr.p);
4279  }
4280 
4281  Local_key disk;
4282  memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
4283 
4284  if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
4285  tablePtr.p->m_attributes[MM].m_no_of_dynamic)
4286  {
4287  jam();
4288  free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
4289  } else {
4290  jam();
4291  free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
4292  }
4293 
4294  if (tablePtr.p->m_no_of_disk_attributes)
4295  {
4296  jam();
4297 
4298  Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4299  tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4300 
4301  D("Logfile_client - nr_delete");
4302  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4303  int res = lgman.alloc_log_space(sz);
4304  ndbrequire(res == 0);
4305 
4313  preq.m_page = disk;
4314  preq.m_callback.m_callbackData = senderData;
4315  preq.m_callback.m_callbackFunction =
4316  safe_cast(&Dbtup::nr_delete_page_callback);
4317  int flags = Page_cache_client::COMMIT_REQ;
4318 
4319 #ifdef ERROR_INSERT
4320  if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
4321  {
4322  int rnd = rand() % 100;
4323  int slp = 0;
4324  if (ERROR_INSERTED(4024))
4325  {
4326  slp = 3000;
4327  }
4328  else if (rnd > 90)
4329  {
4330  slp = 3000;
4331  }
4332  else if (rnd > 70)
4333  {
4334  slp = 100;
4335  }
4336 
4337  ndbout_c("rnd: %d slp: %d", rnd, slp);
4338 
4339  if (slp)
4340  {
4341  flags |= Page_cache_client::DELAY_REQ;
4342  preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
4343  }
4344  }
4345 #endif
4346 
4347  Page_cache_client pgman(this, c_pgman);
4348  res = pgman.get_page(signal, preq, flags);
4349  m_pgman_ptr = pgman.m_ptr;
4350  if (res == 0)
4351  {
4352  goto timeslice;
4353  }
4354  else if (unlikely(res == -1))
4355  {
4356  return -1;
4357  }
4358 
4359  PagePtr disk_page = { (Tup_page*)m_pgman_ptr.p, m_pgman_ptr.i };
4360  disk_page_set_dirty(disk_page);
4361 
4362  CallbackPtr cptr;
4363  cptr.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4364  cptr.m_callbackData = senderData;
4365  res= lgman.get_log_buffer(signal, sz, &cptr);
4366  switch(res){
4367  case 0:
4368  signal->theData[2] = disk_page.i;
4369  goto timeslice;
4370  case -1:
4371  ndbrequire("NOT YET IMPLEMENTED" == 0);
4372  break;
4373  }
4374 
4375  if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
4376  disk_page_free(signal, tablePtr.p, fragPtr.p,
4377  &disk, *(PagePtr*)&disk_page, gci);
4378  return 0;
4379  }
4380 
4381  return 0;
4382 
4383 timeslice:
4384  memcpy(signal->theData, &disk, sizeof(disk));
4385  return 1;
4386 }
4387 
4388 void
4389 Dbtup::nr_delete_page_callback(Signal* signal,
4390  Uint32 userpointer, Uint32 page_id)//unused
4391 {
4392  Ptr<GlobalPage> gpage;
4393  m_global_page_pool.getPtr(gpage, page_id);
4394  PagePtr pagePtr= { (Tup_page*)gpage.p, gpage.i };
4395  disk_page_set_dirty(pagePtr);
4396  Dblqh::Nr_op_info op;
4397  op.m_ptr_i = userpointer;
4398  op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
4399  op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
4400  c_lqh->get_nr_op_info(&op, page_id);
4401 
4402  Ptr<Fragrecord> fragPtr;
4403  fragPtr.i= op.m_tup_frag_ptr_i;
4404  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4405 
4406  Ptr<Tablerec> tablePtr;
4407  tablePtr.i = fragPtr.p->fragTableId;
4408  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4409 
4410  Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4411  tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4412 
4413  CallbackPtr cb;
4414  cb.m_callbackData = userpointer;
4415  cb.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4416  D("Logfile_client - nr_delete_page_callback");
4417  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4418  int res= lgman.get_log_buffer(signal, sz, &cb);
4419  switch(res){
4420  case 0:
4421  return;
4422  case -1:
4423  ndbrequire("NOT YET IMPLEMENTED" == 0);
4424  break;
4425  }
4426 
4427  if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4428  disk_page_free(signal, tablePtr.p, fragPtr.p,
4429  &op.m_disk_ref, pagePtr, op.m_gci_hi);
4430 
4431  c_lqh->nr_delete_complete(signal, &op);
4432  return;
4433 }
4434 
4435 void
4437  Uint32 userpointer,
4438  Uint32 unused)
4439 {
4440  Dblqh::Nr_op_info op;
4441  op.m_ptr_i = userpointer;
4442  c_lqh->get_nr_op_info(&op, RNIL);
4443 
4444  Ptr<Fragrecord> fragPtr;
4445  fragPtr.i= op.m_tup_frag_ptr_i;
4446  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4447 
4448  Ptr<Tablerec> tablePtr;
4449  tablePtr.i = fragPtr.p->fragTableId;
4450  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4451 
4452  Ptr<GlobalPage> gpage;
4453  m_global_page_pool.getPtr(gpage, op.m_page_id);
4454  PagePtr pagePtr = { (Tup_page*)gpage.p, gpage.i };
4455 
4459  if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4460 
4461  disk_page_free(signal, tablePtr.p, fragPtr.p,
4462  &op.m_disk_ref, pagePtr, op.m_gci_hi);
4463 
4464  c_lqh->nr_delete_complete(signal, &op);
4465 }