MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DbtupBuffer.cpp
1 /*
2  Copyright (C) 2003-2008 MySQL AB, 2008, 2009 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #define DBTUP_C
20 #define DBTUP_BUFFER_CPP
21 #include "Dbtup.hpp"
22 #include <RefConvert.hpp>
23 #include <ndb_limits.h>
24 #include <pc.hpp>
25 #include <signaldata/TransIdAI.hpp>
26 
27 void Dbtup::execSEND_PACKED(Signal* signal)
28 {
29  Uint16 hostId;
30  Uint32 i;
31  Uint32 TpackedListIndex= cpackedListIndex;
32  jamEntry();
33  for (i= 0; i < TpackedListIndex; i++) {
34  jam();
35  hostId= cpackedList[i];
36  ndbrequire((hostId - 1) < (MAX_NODES - 1)); // Also check not zero
37  Uint32 TpacketTA= hostBuffer[hostId].noOfPacketsTA;
38  if (TpacketTA != 0) {
39  jam();
40  BlockReference TBref= numberToRef(API_PACKED, hostId);
41  Uint32 TpacketLen= hostBuffer[hostId].packetLenTA;
42  MEMCOPY_NO_WORDS(&signal->theData[0],
43  &hostBuffer[hostId].packetBufferTA[0],
44  TpacketLen);
45  sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
46  hostBuffer[hostId].noOfPacketsTA= 0;
47  hostBuffer[hostId].packetLenTA= 0;
48  }
49  hostBuffer[hostId].inPackedList= false;
50  }//for
51  cpackedListIndex= 0;
52 }
53 
54 void Dbtup::bufferTRANSID_AI(Signal* signal, BlockReference aRef,
55  Uint32 Tlen)
56 {
57  if (Tlen == 3)
58  return;
59 
60  Uint32 hostId= refToNode(aRef);
61  Uint32 Theader= ((refToBlock(aRef) << 16)+(Tlen-3));
62 
63  ndbrequire(hostId < MAX_NODES);
64  Uint32 TpacketLen= hostBuffer[hostId].packetLenTA;
65  Uint32 TnoOfPackets= hostBuffer[hostId].noOfPacketsTA;
66  Uint32 sig0= signal->theData[0];
67  Uint32 sig1= signal->theData[1];
68  Uint32 sig2= signal->theData[2];
69 
70  BlockReference TBref= numberToRef(API_PACKED, hostId);
71 
72  if ((Tlen + TpacketLen + 1) <= 25) {
73 // ----------------------------------------------------------------
74 // There is still space in the buffer. We will copy it into the
75 // buffer.
76 // ----------------------------------------------------------------
77  jam();
78  updatePackedList(signal, hostId);
79  } else if (false && TnoOfPackets == 1) {
80 // ----------------------------------------------------------------
81 // The buffer is full and there was only one packet buffered. We
82 // will send this as a normal signal.
83 // ----------------------------------------------------------------
84  Uint32 TnewRef= numberToRef((hostBuffer[hostId].packetBufferTA[0] >> 16),
85  hostId);
86  MEMCOPY_NO_WORDS(&signal->theData[0],
87  &hostBuffer[hostId].packetBufferTA[1],
88  TpacketLen - 1);
89  sendSignal(TnewRef, GSN_TRANSID_AI, signal, (TpacketLen - 1), JBB);
90  TpacketLen= 0;
91  TnoOfPackets= 0;
92  } else {
93 // ----------------------------------------------------------------
94 // The buffer is full but at least two packets. Send those in
95 // packed form.
96 // ----------------------------------------------------------------
97  MEMCOPY_NO_WORDS(&signal->theData[0],
98  &hostBuffer[hostId].packetBufferTA[0],
99  TpacketLen);
100  sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
101  TpacketLen= 0;
102  TnoOfPackets= 0;
103  }
104 // ----------------------------------------------------------------
105 // Copy the signal into the buffer
106 // ----------------------------------------------------------------
107  hostBuffer[hostId].packetBufferTA[TpacketLen + 0]= Theader;
108  hostBuffer[hostId].packetBufferTA[TpacketLen + 1]= sig0;
109  hostBuffer[hostId].packetBufferTA[TpacketLen + 2]= sig1;
110  hostBuffer[hostId].packetBufferTA[TpacketLen + 3]= sig2;
111  hostBuffer[hostId].noOfPacketsTA= TnoOfPackets + 1;
112  hostBuffer[hostId].packetLenTA= Tlen + TpacketLen + 1;
113  MEMCOPY_NO_WORDS(&hostBuffer[hostId].packetBufferTA[TpacketLen + 4],
114  &signal->theData[25],
115  Tlen - 3);
116 }
117 
118 void Dbtup::updatePackedList(Signal* signal, Uint16 hostId)
119 {
120  if (hostBuffer[hostId].inPackedList == false) {
121  Uint32 TpackedListIndex= cpackedListIndex;
122  jam();
123  hostBuffer[hostId].inPackedList= true;
124  cpackedList[TpackedListIndex]= hostId;
125  cpackedListIndex= TpackedListIndex + 1;
126  }
127 }
128 
129 /* ---------------------------------------------------------------- */
130 /* ----------------------- SEND READ ATTRINFO --------------------- */
131 /* ---------------------------------------------------------------- */
132 void Dbtup::sendReadAttrinfo(Signal* signal,
133  KeyReqStruct *req_struct,
134  Uint32 ToutBufIndex,
135  const Operationrec *regOperPtr)
136 {
137  if(ToutBufIndex == 0)
138  return;
139 
140  const BlockReference recBlockref= req_struct->rec_blockref;
141  const Uint32 block= refToBlock(recBlockref);
142  const Uint32 nodeId= refToNode(recBlockref);
143 
144  bool connectedToNode= getNodeInfo(nodeId).m_connected;
145  const Uint32 type= getNodeInfo(nodeId).m_type;
146  bool is_api= (type >= NodeInfo::API && type <= NodeInfo::MGM);
147  bool old_dest= (getNodeInfo(nodeId).m_version < MAKE_VERSION(6,4,0));
148  Uint32 TpacketLen= hostBuffer[nodeId].packetLenTA;
149  Uint32 TpacketTA= hostBuffer[nodeId].noOfPacketsTA;
150 
151  if (ERROR_INSERTED(4006) && (nodeId != getOwnNodeId())){
152  // Use error insert to turn routing on
153  jam();
154  connectedToNode= false;
155  }
156 
157  Uint32 sig0= req_struct->tc_operation_ptr;
158  Uint32 sig1= req_struct->trans_id1;
159  Uint32 sig2= req_struct->trans_id2;
160 
161  TransIdAI * transIdAI= (TransIdAI *)signal->getDataPtrSend();
162  transIdAI->connectPtr= sig0;
163  transIdAI->transId[0]= sig1;
164  transIdAI->transId[1]= sig2;
165 
166  if (connectedToNode){
170  if(nodeId != getOwnNodeId()){
171  jam();
172 
176  if (ToutBufIndex >= 22 && is_api) {
177  jam();
181  if (TpacketTA != 0) {
182  jam();
183  BlockReference TBref = numberToRef(API_PACKED, nodeId);
184  MEMCOPY_NO_WORDS(&signal->theData[0],
185  &hostBuffer[nodeId].packetBufferTA[0],
186  TpacketLen);
187  sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
188  hostBuffer[nodeId].noOfPacketsTA = 0;
189  hostBuffer[nodeId].packetLenTA = 0;
190  transIdAI->connectPtr = sig0;
191  transIdAI->transId[0] = sig1;
192  transIdAI->transId[1] = sig2;
193  }//if
194  LinearSectionPtr ptr[3];
195  ptr[0].p= &signal->theData[25];
196  ptr[0].sz= ToutBufIndex;
197  sendSignal(recBlockref, GSN_TRANSID_AI, signal, 3, JBB, ptr, 1);
198  return;
199  }
200 
204  if ((block == DBUTIL || block == DBSPJ) && !old_dest) {
205  jam();
206  LinearSectionPtr ptr[3];
207  ptr[0].p= &signal->theData[25];
208  ptr[0].sz= ToutBufIndex;
209  sendSignal(recBlockref, GSN_TRANSID_AI, signal, 3, JBB, ptr, 1);
210  return;
211  }
212 
216 #ifndef NDB_NO_DROPPED_SIGNAL
217  if (ToutBufIndex < 22 && is_api){
218  jam();
219  bufferTRANSID_AI(signal, recBlockref, 3+ToutBufIndex);
220  return;
221  }
222 #endif
223 
227  Uint32 * src= signal->theData+25;
228  if (ToutBufIndex >= 22){
229  do {
230  jam();
231  MEMCOPY_NO_WORDS(&signal->theData[3], src, 22);
232  sendSignal(recBlockref, GSN_TRANSID_AI, signal, 25, JBB);
233  ToutBufIndex -= 22;
234  src += 22;
235  } while(ToutBufIndex >= 22);
236  }
237 
238  if (ToutBufIndex > 0){
239  jam();
240  MEMCOPY_NO_WORDS(&signal->theData[3], src, ToutBufIndex);
241  sendSignal(recBlockref, GSN_TRANSID_AI, signal, 3+ToutBufIndex, JBB);
242  }
243  return;
244  }
245 
255  BlockNumber blockMain = blockToMain(block);
256  const bool sameInstance = blockToInstance(block) == instance();
257  if (blockMain == DBLQH)
258  {
259  EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
260  jamEntry();
261  }
262  else if (blockMain == SUMA && sameInstance)
263  {
264  EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
265  jamEntry();
266  }
267  else if (blockMain == BACKUP && sameInstance)
268  {
269  EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
270  jamEntry();
271  }
272  else
273  {
274  jam();
275  LinearSectionPtr ptr[3];
276  ptr[0].p= &signal->theData[3];
277  ptr[0].sz= ToutBufIndex;
278  sendSignal(recBlockref, GSN_TRANSID_AI, signal, 3, JBB, ptr, 1);
279  }
280  return;
281  }
282 
288  Uint32 routeBlockref= req_struct->TC_ref;
289 
290  if (true){ // TODO is_api && !old_dest){
291  jam();
292  transIdAI->attrData[0]= recBlockref;
293  LinearSectionPtr ptr[3];
294  ptr[0].p= &signal->theData[25];
295  ptr[0].sz= ToutBufIndex;
296  sendSignal(routeBlockref, GSN_TRANSID_AI_R, signal, 4, JBB, ptr, 1);
297  return;
298  }
299 
305  Uint32 tot= ToutBufIndex;
306  Uint32 sent= 0;
307  Uint32 maxLen= TransIdAI::DataLength - 1;
308  while (sent < tot) {
309  jam();
310  Uint32 dataLen= (tot - sent > maxLen) ? maxLen : tot - sent;
311  Uint32 sigLen= dataLen + TransIdAI::HeaderLength + 1;
312  MEMCOPY_NO_WORDS(&transIdAI->attrData,
313  &signal->theData[25+sent],
314  dataLen);
315  // Set final destination in last word
316  transIdAI->attrData[dataLen]= recBlockref;
317 
318  sendSignal(routeBlockref, GSN_TRANSID_AI_R,
319  signal, sigLen, JBB);
320  sent += dataLen;
321  }
322 }