MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Packer.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include "Packer.hpp"
21 #include <TransporterRegistry.hpp>
22 #include <TransporterCallback.hpp>
23 #include <RefConvert.hpp>
24 
25 #ifdef ERROR_INSERT
26 Uint32 MAX_RECEIVED_SIGNALS = 1024;
27 #else
28 #define MAX_RECEIVED_SIGNALS 1024
29 #endif
30 
31 Uint32
32 TransporterRegistry::unpack(Uint32 * readPtr,
33  Uint32 sizeOfData,
34  NodeId remoteNodeId,
35  IOState state) {
36  SignalHeader signalHeader;
37  LinearSectionPtr ptr[3];
38 
39  Uint32 usedData = 0;
40  Uint32 loop_count = 0;
41 
42  if(state == NoHalt || state == HaltOutput){
43  while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
44  (loop_count < MAX_RECEIVED_SIGNALS)) {
45  Uint32 word1 = readPtr[0];
46  Uint32 word2 = readPtr[1];
47  Uint32 word3 = readPtr[2];
48  loop_count++;
49 
50 #if 0
51  if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
52  //Do funky stuff
53  }
54 #endif
55 
56  const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
57  const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
58 
59  if(messageLenBytes == 0 || messageLenBytes > MAX_RECV_MESSAGE_BYTESIZE){
60  DEBUG("Message Size = " << messageLenBytes);
61  report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
62  return usedData;
63  }//if
64 
65  if (sizeOfData < messageLenBytes) {
66  break;
67  }//if
68 
69  if(Protocol6::getCheckSumIncluded(word1)){
70  const Uint32 tmpLen = messageLen32 - 1;
71  const Uint32 checkSumSent = readPtr[tmpLen];
72  const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
73 
74  if(checkSumComputed != checkSumSent){
75  report_error(remoteNodeId, TE_INVALID_CHECKSUM);
76  return usedData;
77  }//if
78  }//if
79 
80 #if 0
81  if(Protocol6::getCompressed(word1)){
82  //Do funky stuff
83  }//if
84 #endif
85 
86  Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
87 
88  Uint32 sBlockNum = signalHeader.theSendersBlockRef;
89  sBlockNum = numberToRef(sBlockNum, remoteNodeId);
90  signalHeader.theSendersBlockRef = sBlockNum;
91 
92  Uint8 prio = Protocol6::getPrio(word1);
93 
94  Uint32 * signalData = &readPtr[3];
95 
96  if(Protocol6::getSignalIdIncluded(word1) == 0){
97  signalHeader.theSendersSignalId = ~0;
98  } else {
99  signalHeader.theSendersSignalId = * signalData;
100  signalData ++;
101  }//if
102  signalHeader.theSignalId= ~0;
103 
104  Uint32 * sectionPtr = signalData + signalHeader.theLength;
105  Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
106  for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
107  Uint32 sz = * sectionPtr;
108  ptr[i].sz = sz;
109  ptr[i].p = sectionData;
110 
111  sectionPtr ++;
112  sectionData += sz;
113  }
114 
115  callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
116 
117  readPtr += messageLen32;
118  sizeOfData -= messageLenBytes;
119  usedData += messageLenBytes;
120  }//while
121 
122  return usedData;
123  } else {
126  while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
127  (loop_count < MAX_RECEIVED_SIGNALS)) {
128  Uint32 word1 = readPtr[0];
129  Uint32 word2 = readPtr[1];
130  Uint32 word3 = readPtr[2];
131  loop_count++;
132 
133 #if 0
134  if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
135  //Do funky stuff
136  }//if
137 #endif
138 
139  const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
140  const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
141  if(messageLenBytes == 0 || messageLenBytes > MAX_RECV_MESSAGE_BYTESIZE){
142  DEBUG("Message Size = " << messageLenBytes);
143  report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
144  return usedData;
145  }//if
146 
147  if (sizeOfData < messageLenBytes) {
148  break;
149  }//if
150 
151  if(Protocol6::getCheckSumIncluded(word1)){
152  const Uint32 tmpLen = messageLen32 - 1;
153  const Uint32 checkSumSent = readPtr[tmpLen];
154  const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
155 
156  if(checkSumComputed != checkSumSent){
157 
158  //theTransporters[remoteNodeId]->disconnect();
159  report_error(remoteNodeId, TE_INVALID_CHECKSUM);
160  return usedData;
161  }//if
162  }//if
163 
164 #if 0
165  if(Protocol6::getCompressed(word1)){
166  //Do funky stuff
167  }//if
168 #endif
169 
170  Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
171 
172  Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
173 
174  if(rBlockNum == 252){
175  Uint32 sBlockNum = signalHeader.theSendersBlockRef;
176  sBlockNum = numberToRef(sBlockNum, remoteNodeId);
177  signalHeader.theSendersBlockRef = sBlockNum;
178 
179  Uint8 prio = Protocol6::getPrio(word1);
180 
181  Uint32 * signalData = &readPtr[3];
182 
183  if(Protocol6::getSignalIdIncluded(word1) == 0){
184  signalHeader.theSendersSignalId = ~0;
185  } else {
186  signalHeader.theSendersSignalId = * signalData;
187  signalData ++;
188  }//if
189 
190  Uint32 * sectionPtr = signalData + signalHeader.theLength;
191  Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
192  for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
193  Uint32 sz = * sectionPtr;
194  ptr[i].sz = sz;
195  ptr[i].p = sectionData;
196 
197  sectionPtr ++;
198  sectionData += sz;
199  }
200 
201  callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
202  } else {
203  DEBUG("prepareReceive(...) - Discarding message to block: "
204  << rBlockNum << " from Node: " << remoteNodeId);
205  }//if
206 
207  readPtr += messageLen32;
208  sizeOfData -= messageLenBytes;
209  usedData += messageLenBytes;
210  }//while
211 
212 
213  return usedData;
214  }//if
215 }
216 
217 Uint32 *
218 TransporterRegistry::unpack(Uint32 * readPtr,
219  Uint32 * eodPtr,
220  NodeId remoteNodeId,
221  IOState state) {
222  SignalHeader signalHeader;
223  LinearSectionPtr ptr[3];
224  Uint32 loop_count = 0;
225  if(state == NoHalt || state == HaltOutput){
226  while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
227  Uint32 word1 = readPtr[0];
228  Uint32 word2 = readPtr[1];
229  Uint32 word3 = readPtr[2];
230  loop_count++;
231 #if 0
232  if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
233  //Do funky stuff
234  }
235 #endif
236 
237  const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
238 
239  if(messageLen32 == 0 ||
240  messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))
241  {
242  DEBUG("Message Size(words) = " << messageLen32);
243  report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
244  return readPtr;
245  }//if
246 
247  if(Protocol6::getCheckSumIncluded(word1)){
248  const Uint32 tmpLen = messageLen32 - 1;
249  const Uint32 checkSumSent = readPtr[tmpLen];
250  const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
251 
252  if(checkSumComputed != checkSumSent){
253  report_error(remoteNodeId, TE_INVALID_CHECKSUM);
254  return readPtr;
255  }//if
256  }//if
257 
258 #if 0
259  if(Protocol6::getCompressed(word1)){
260  //Do funky stuff
261  }//if
262 #endif
263 
264  Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
265 
266  Uint32 sBlockNum = signalHeader.theSendersBlockRef;
267  sBlockNum = numberToRef(sBlockNum, remoteNodeId);
268  signalHeader.theSendersBlockRef = sBlockNum;
269 
270  Uint8 prio = Protocol6::getPrio(word1);
271 
272  Uint32 * signalData = &readPtr[3];
273 
274  if(Protocol6::getSignalIdIncluded(word1) == 0){
275  signalHeader.theSendersSignalId = ~0;
276  } else {
277  signalHeader.theSendersSignalId = * signalData;
278  signalData ++;
279  }//if
280 
281  Uint32 * sectionPtr = signalData + signalHeader.theLength;
282  Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
283  for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
284  Uint32 sz = * sectionPtr;
285  ptr[i].sz = sz;
286  ptr[i].p = sectionData;
287 
288  sectionPtr ++;
289  sectionData += sz;
290  }
291 
292  callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
293 
294  readPtr += messageLen32;
295  }//while
296  } else {
299  while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
300  Uint32 word1 = readPtr[0];
301  Uint32 word2 = readPtr[1];
302  Uint32 word3 = readPtr[2];
303  loop_count++;
304 #if 0
305  if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
306  //Do funky stuff
307  }//if
308 #endif
309 
310  const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
311  if(messageLen32 == 0 ||
312  messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))
313  {
314  DEBUG("Message Size(words) = " << messageLen32);
315  report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
316  return readPtr;
317  }//if
318 
319  if(Protocol6::getCheckSumIncluded(word1)){
320  const Uint32 tmpLen = messageLen32 - 1;
321  const Uint32 checkSumSent = readPtr[tmpLen];
322  const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
323 
324  if(checkSumComputed != checkSumSent){
325 
326  //theTransporters[remoteNodeId]->disconnect();
327  report_error(remoteNodeId, TE_INVALID_CHECKSUM);
328  return readPtr;
329  }//if
330  }//if
331 
332 #if 0
333  if(Protocol6::getCompressed(word1)){
334  //Do funky stuff
335  }//if
336 #endif
337 
338  Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
339 
340  Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
341 
342  if(rBlockNum == 252){
343  Uint32 sBlockNum = signalHeader.theSendersBlockRef;
344  sBlockNum = numberToRef(sBlockNum, remoteNodeId);
345  signalHeader.theSendersBlockRef = sBlockNum;
346 
347  Uint8 prio = Protocol6::getPrio(word1);
348 
349  Uint32 * signalData = &readPtr[3];
350 
351  if(Protocol6::getSignalIdIncluded(word1) == 0){
352  signalHeader.theSendersSignalId = ~0;
353  } else {
354  signalHeader.theSendersSignalId = * signalData;
355  signalData ++;
356  }//if
357 
358  Uint32 * sectionPtr = signalData + signalHeader.theLength;
359  Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
360  for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
361  Uint32 sz = * sectionPtr;
362  ptr[i].sz = sz;
363  ptr[i].p = sectionData;
364 
365  sectionPtr ++;
366  sectionData += sz;
367  }
368 
369  callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
370  } else {
371  DEBUG("prepareReceive(...) - Discarding message to block: "
372  << rBlockNum << " from Node: " << remoteNodeId);
373  }//if
374 
375  readPtr += messageLen32;
376  }//while
377  }//if
378  return readPtr;
379 }
380 
381 Packer::Packer(bool signalId, bool checksum) {
382 
383  checksumUsed = (checksum ? 1 : 0);
384  signalIdUsed = (signalId ? 1 : 0);
385 
386  // Set the priority
387 
388  preComputedWord1 = 0;
389  Protocol6::setByteOrder(preComputedWord1, MY_OWN_BYTE_ORDER);
390  Protocol6::setSignalIdIncluded(preComputedWord1, signalIdUsed);
391  Protocol6::setCheckSumIncluded(preComputedWord1, checksumUsed);
392  Protocol6::setCompressed(preComputedWord1, 0);
393 }
394 
395 inline
396 void
397 import(Uint32 * & insertPtr, const LinearSectionPtr & ptr){
398  const Uint32 sz = ptr.sz;
399  memcpy(insertPtr, ptr.p, 4 * sz);
400  insertPtr += sz;
401 }
402 
403 inline
404 void
405 importGeneric(Uint32 * & insertPtr, const GenericSectionPtr & ptr){
406  /* Use the section iterator to obtain the words in this section */
407  Uint32 remain= ptr.sz;
408 
409  while (remain > 0)
410  {
411  Uint32 len= 0;
412  const Uint32* next= ptr.sectionIter->getNextWords(len);
413 
414  assert(len <= remain);
415  assert(next != NULL);
416 
417  memcpy(insertPtr, next, 4 * len);
418  insertPtr+= len;
419  remain-= len;
420  }
421 
422  /* Check that there were no more words available from the
423  * Signal iterator
424  */
425  assert(ptr.sectionIter->getNextWords(remain) == NULL);
426 }
427 
428 void copy(Uint32 * & insertPtr,
429  class SectionSegmentPool &, const SegmentedSectionPtr & ptr);
430 
431 void
432 Packer::pack(Uint32 * insertPtr,
433  Uint32 prio,
434  const SignalHeader * header,
435  const Uint32 * theData,
436  const LinearSectionPtr ptr[3]) const {
437  Uint32 i;
438 
439  Uint32 dataLen32 = header->theLength;
440  Uint32 no_segs = header->m_noOfSections;
441 
442  Uint32 len32 =
443  dataLen32 + no_segs +
444  checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
445 
446 
447  for(i = 0; i<no_segs; i++){
448  len32 += ptr[i].sz;
449  }
450 
454  Uint32 word1 = preComputedWord1;
455  Uint32 word2 = 0;
456  Uint32 word3 = 0;
457 
458  Protocol6::setPrio(word1, prio);
459  Protocol6::setMessageLength(word1, len32);
460  Protocol6::createProtocol6Header(word1, word2, word3, header);
461 
462  insertPtr[0] = word1;
463  insertPtr[1] = word2;
464  insertPtr[2] = word3;
465 
466  Uint32 * tmpInserPtr = &insertPtr[3];
467 
468  if(signalIdUsed){
469  * tmpInserPtr = header->theSignalId;
470  tmpInserPtr++;
471  }
472 
473  memcpy(tmpInserPtr, theData, 4 * dataLen32);
474 
475  tmpInserPtr += dataLen32;
476  for(i = 0; i<no_segs; i++){
477  tmpInserPtr[i] = ptr[i].sz;
478  }
479 
480  tmpInserPtr += no_segs;
481  for(i = 0; i<no_segs; i++){
482  import(tmpInserPtr, ptr[i]);
483  }
484 
485  if(checksumUsed){
486  * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
487  }
488 }
489 
490 void
491 Packer::pack(Uint32 * insertPtr,
492  Uint32 prio,
493  const SignalHeader * header,
494  const Uint32 * theData,
495  class SectionSegmentPool & thePool,
496  const SegmentedSectionPtr ptr[3]) const {
497  Uint32 i;
498 
499  Uint32 dataLen32 = header->theLength;
500  Uint32 no_segs = header->m_noOfSections;
501 
502  Uint32 len32 =
503  dataLen32 + no_segs +
504  checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
505 
506  for(i = 0; i<no_segs; i++){
507  len32 += ptr[i].sz;
508  }
509 
513  Uint32 word1 = preComputedWord1;
514  Uint32 word2 = 0;
515  Uint32 word3 = 0;
516 
517  Protocol6::setPrio(word1, prio);
518  Protocol6::setMessageLength(word1, len32);
519  Protocol6::createProtocol6Header(word1, word2, word3, header);
520 
521  insertPtr[0] = word1;
522  insertPtr[1] = word2;
523  insertPtr[2] = word3;
524 
525  Uint32 * tmpInserPtr = &insertPtr[3];
526 
527  if(signalIdUsed){
528  * tmpInserPtr = header->theSignalId;
529  tmpInserPtr++;
530  }
531 
532  memcpy(tmpInserPtr, theData, 4 * dataLen32);
533 
534  tmpInserPtr += dataLen32;
535  for(i = 0; i<no_segs; i++){
536  tmpInserPtr[i] = ptr[i].sz;
537  }
538 
539  tmpInserPtr += no_segs;
540  for(i = 0; i<no_segs; i++){
541  copy(tmpInserPtr, thePool, ptr[i]);
542  }
543 
544  if(checksumUsed){
545  * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
546  }
547 }
548 
549 
550 void
551 Packer::pack(Uint32 * insertPtr,
552  Uint32 prio,
553  const SignalHeader * header,
554  const Uint32 * theData,
555  const GenericSectionPtr ptr[3]) const {
556  Uint32 i;
557 
558  Uint32 dataLen32 = header->theLength;
559  Uint32 no_segs = header->m_noOfSections;
560 
561  Uint32 len32 =
562  dataLen32 + no_segs +
563  checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
564 
565 
566  for(i = 0; i<no_segs; i++){
567  len32 += ptr[i].sz;
568  }
569 
573  Uint32 word1 = preComputedWord1;
574  Uint32 word2 = 0;
575  Uint32 word3 = 0;
576 
577  Protocol6::setPrio(word1, prio);
578  Protocol6::setMessageLength(word1, len32);
579  Protocol6::createProtocol6Header(word1, word2, word3, header);
580 
581  insertPtr[0] = word1;
582  insertPtr[1] = word2;
583  insertPtr[2] = word3;
584 
585  Uint32 * tmpInsertPtr = &insertPtr[3];
586 
587  if(signalIdUsed){
588  * tmpInsertPtr = header->theSignalId;
589  tmpInsertPtr++;
590  }
591 
592  memcpy(tmpInsertPtr, theData, 4 * dataLen32);
593 
594  tmpInsertPtr += dataLen32;
595  for(i = 0; i<no_segs; i++){
596  tmpInsertPtr[i] = ptr[i].sz;
597  }
598 
599  tmpInsertPtr += no_segs;
600  for(i = 0; i<no_segs; i++){
601  importGeneric(tmpInsertPtr, ptr[i]);
602  }
603 
604  if(checksumUsed){
605  * tmpInsertPtr = computeChecksum(&insertPtr[0], len32-1);
606  }
607 }
608 
616 Uint32
617 TransporterRegistry::unpack_length_words(const Uint32 *readPtr, Uint32 maxWords)
618 {
619  Uint32 wordLength = 0;
620 
621  while (wordLength + 4 + sizeof(Protocol6) <= maxWords)
622  {
623  Uint32 word1 = readPtr[wordLength];
624  Uint16 messageLen32 = Protocol6::getMessageLength(word1);
625  if (wordLength + messageLen32 > maxWords)
626  break;
627  wordLength += messageLen32;
628  }
629  return wordLength;
630 }