MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TransporterCallback.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include <TransporterCallback.hpp>
21 #include <TransporterRegistry.hpp>
22 #include <FastScheduler.hpp>
23 #include <Emulator.hpp>
24 #include <ErrorHandlingMacros.hpp>
25 
26 #include "LongSignal.hpp"
27 #include "LongSignalImpl.hpp"
28 
29 #include <signaldata/EventReport.hpp>
30 #include <signaldata/TestOrd.hpp>
31 #include <signaldata/SignalDroppedRep.hpp>
32 #include <signaldata/DisconnectRep.hpp>
33 
34 #include "VMSignal.hpp"
35 #include <NdbOut.hpp>
36 #include "TransporterCallbackKernel.hpp"
37 
41 SectionSegmentPool g_sectionSegmentPool;
42 
43 /* Instance debugging vars
44  * Set from DBTC
45  */
46 int ErrorSignalReceive= 0;
47 int ErrorMaxSegmentsToSeize= 0;
48 
54 extern bool ErrorImportActive;
55 
57 {
58  enum TransporterError err;
59  const char *text;
60 };
61 
62 static const ConnectionError connectionError[] =
63 {
64  { TE_NO_ERROR, "No error"},
65  { TE_SHM_UNABLE_TO_CREATE_SEGMENT, "Unable to create shared memory segment"},
66  { (enum TransporterError) -1, "No connection error message available (please report a bug)"}
67 };
68 
69 const char *lookupConnectionError(Uint32 err)
70 {
71  int i= 0;
72  while ((Uint32)connectionError[i].err != err &&
73  connectionError[i].err != -1)
74  i++;
75  return connectionError[i].text;
76 }
77 
78 #include <DebuggerNames.hpp>
79 
80 #ifndef NDBD_MULTITHREADED
81 extern TransporterRegistry globalTransporterRegistry; // Forward declaration
82 
84 {
89  int checkJobBuffer() { return globalScheduler.checkDoJob(); }
90  void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
91  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
92  {
93  return globalTransporterRegistry.get_bytes_to_send_iovec(node, dst, max);
94  }
95  Uint32 bytes_sent(NodeId node, Uint32 bytes)
96  {
97  return globalTransporterRegistry.bytes_sent(node, bytes);
98  }
99  bool has_data_to_send(NodeId node)
100  {
101  return globalTransporterRegistry.has_data_to_send(node);
102  }
103  void reset_send_buffer(NodeId node, bool should_be_empty)
104  {
105  globalTransporterRegistry.reset_send_buffer(node, should_be_empty);
106  }
107 };
108 static TransporterCallbackKernelNonMT myTransporterCallback;
109 TransporterRegistry globalTransporterRegistry(&myTransporterCallback);
110 #endif
111 
112 #ifdef NDBD_MULTITHREADED
113 static SectionSegmentPool::Cache cache(1024,1024);
114 
115 void
116 mt_set_section_chunk_size()
117 {
118  g_sectionSegmentPool.setChunkSize(256);
119 }
120 
121 #else
122 void mt_set_section_chunk_size(){}
123 #endif
124 
125 void
127  Uint8 prio,
128  Uint32 * const theData,
129  LinearSectionPtr ptr[3])
130 {
131 
132  const Uint32 secCount = header->m_noOfSections;
133  const Uint32 length = header->theLength;
134 
135  // if this node is not MT LQH then instance bits are stripped at execute
136 
137 #ifdef TRACE_DISTRIBUTED
138  ndbout_c("recv: %s(%d) from (%s, %d)",
139  getSignalName(header->theVerId_signalNumber),
140  header->theVerId_signalNumber,
141  getBlockName(refToBlock(header->theSendersBlockRef)),
142  refToNode(header->theSendersBlockRef));
143 #endif
144 
145  bool ok = true;
146  Ptr<SectionSegment> secPtr[3];
147  bzero(secPtr, sizeof(secPtr));
148  secPtr[0].p = secPtr[1].p = secPtr[2].p = 0;
149 
150  ErrorImportActive = true;
151  switch(secCount){
152  case 3:
153  ok &= import(SPC_CACHE_ARG secPtr[2], ptr[2].p, ptr[2].sz);
154  case 2:
155  ok &= import(SPC_CACHE_ARG secPtr[1], ptr[1].p, ptr[1].sz);
156  case 1:
157  ok &= import(SPC_CACHE_ARG secPtr[0], ptr[0].p, ptr[0].sz);
158  }
159  ErrorImportActive = false;
160 
164  ok &= (length + secCount <= 25);
165 
166  Uint32 secPtrI[3];
167  if(ok){
171  secPtrI[0] = secPtr[0].i;
172  secPtrI[1] = secPtr[1].i;
173  secPtrI[2] = secPtr[2].i;
174 
175 #ifndef NDBD_MULTITHREADED
176  globalScheduler.execute(header, prio, theData, secPtrI);
177 #else
178  if (prio == JBB)
179  sendlocal(receiverThreadId,
180  header, theData, secPtrI);
181  else
182  sendprioa(receiverThreadId,
183  header, theData, secPtrI);
184 
185 #endif
186  return;
187  }
188 
192  for(Uint32 i = 0; i<secCount; i++){
193  if(secPtr[i].p != 0){
194  g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
195  relSz(secPtr[i].p->m_sz),
196  secPtr[i].i,
197  secPtr[i].p->m_lastSegment);
198  }
199  }
200 
201  SignalDroppedRep * rep = (SignalDroppedRep*)theData;
202  Uint32 gsn = header->theVerId_signalNumber;
203  Uint32 len = header->theLength;
204  Uint32 newLen= (len > 22 ? 22 : len);
205  memmove(rep->originalData, theData, (4 * newLen));
206  rep->originalGsn = gsn;
207  rep->originalLength = len;
208  rep->originalSectionCount = secCount;
209  header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
210  header->theLength = newLen + 3;
211  header->m_noOfSections = 0;
212 #ifndef NDBD_MULTITHREADED
213  globalScheduler.execute(header, prio, theData, secPtrI);
214 #else
215  if (prio == JBB)
216  sendlocal(receiverThreadId,
217  header, theData, NULL);
218  else
219  sendprioa(receiverThreadId,
220  header, theData, NULL);
221 
222 #endif
223 }
224 
225 NdbOut &
226 operator<<(NdbOut& out, const SectionSegment & ss){
227  out << "[ last= " << ss.m_lastSegment << " next= " << ss.nextPool << " ]";
228  return out;
229 }
230 
231 void
233  TransporterError errorCode,
234  const char *info)
235 {
236 #ifdef DEBUG_TRANSPORTER
237  ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
238 #endif
239 
240  DBUG_ENTER("reportError");
241  DBUG_PRINT("info",("nodeId %d errorCode: 0x%x info: %s",
242  nodeId, errorCode, info));
243 
244  switch (errorCode)
245  {
246  case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
247  {
248  char msg[64];
249  BaseString::snprintf(msg, sizeof(msg), "Remote node id %d.%s%s", nodeId,
250  info ? " " : "", info ? info : "");
251  ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
252  msg, __FILE__, NST_ErrorHandler);
253  }
254  case TE_SIGNAL_LOST:
255  {
256  char msg[64];
257  BaseString::snprintf(msg, sizeof(msg), "Remote node id %d,%s%s", nodeId,
258  info ? " " : "", info ? info : "");
259  ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
260  msg, __FILE__, NST_ErrorHandler);
261  }
262  case TE_SHM_IPC_PERMANENT:
263  {
264  char msg[128];
265  BaseString::snprintf(msg, sizeof(msg),
266  "Remote node id %d.%s%s",
267  nodeId, info ? " " : "", info ? info : "");
268  ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
269  msg, __FILE__, NST_ErrorHandler);
270  }
271  default:
272  break;
273  }
274 
275  if(errorCode & TE_DO_DISCONNECT){
276  reportDisconnect(nodeId, errorCode);
277  }
278 
279  SignalT<3> signal;
280  memset(&signal.header, 0, sizeof(signal.header));
281 
282 
283  if(errorCode & TE_DO_DISCONNECT)
284  signal.theData[0] = NDB_LE_TransporterError;
285  else
286  signal.theData[0] = NDB_LE_TransporterWarning;
287 
288  signal.theData[1] = nodeId;
289  signal.theData[2] = errorCode;
290 
291  signal.header.theLength = 3;
292  signal.header.theSendersSignalId = 0;
293  signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
294  signal.header.theReceiversBlockNumber = CMVMI;
295  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
296 #ifndef NDBD_MULTITHREADED
297  Uint32 secPtr[3];
298  globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
299 #else
300  sendprioa(receiverThreadId,
301  &signal.header, signal.theData, NULL);
302 #endif
303 
304  DBUG_VOID_RETURN;
305 }
306 
310 #ifndef NDBD_MULTITHREADED
311 void
312 TransporterCallbackKernelNonMT::reportSendLen(NodeId nodeId, Uint32 count,
313  Uint64 bytes)
314 {
315 
316  SignalT<3> signal;
317  memset(&signal.header, 0, sizeof(signal.header));
318 
319  signal.header.theLength = 3;
320  signal.header.theSendersSignalId = 0;
321  signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
322  signal.header.theReceiversBlockNumber = CMVMI;
323  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
324 
325  signal.theData[0] = NDB_LE_SendBytesStatistic;
326  signal.theData[1] = nodeId;
327  signal.theData[2] = Uint32(bytes/count);
328 
329  Uint32 secPtr[3];
330  globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
331 }
332 #endif
333 
337 void
339  Uint64 bytes)
340 {
341 
342  SignalT<3> signal;
343  memset(&signal.header, 0, sizeof(signal.header));
344 
345  signal.header.theLength = 3;
346  signal.header.theSendersSignalId = 0;
347  signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
348  signal.header.theReceiversBlockNumber = CMVMI;
349  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
350 
351  signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
352  signal.theData[1] = nodeId;
353  signal.theData[2] = Uint32(bytes/count);
354 #ifndef NDBD_MULTITHREADED
355  Uint32 secPtr[3];
356  globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
357 #else
358  sendprioa(receiverThreadId,
359  &signal.header, signal.theData, NULL);
360 #endif
361 }
362 
367 void
369 {
370 
371  SignalT<1> signal;
372  memset(&signal.header, 0, sizeof(signal.header));
373 
374  signal.header.theLength = 1;
375  signal.header.theSendersSignalId = 0;
376  signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
377  signal.header.theReceiversBlockNumber = CMVMI;
378  signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
379 
380  signal.theData[0] = nodeId;
381 
382 #ifndef NDBD_MULTITHREADED
383  Uint32 secPtr[3];
384  globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
385 #else
386  sendprioa(receiverThreadId,
387  &signal.header, signal.theData, NULL);
388 #endif
389 }
390 
394 void
396 {
397 
398  DBUG_ENTER("reportDisconnect");
399 
400  SignalT<sizeof(DisconnectRep)/4> signal;
401  memset(&signal.header, 0, sizeof(signal.header));
402 
403  signal.header.theLength = DisconnectRep::SignalLength;
404  signal.header.theSendersSignalId = 0;
405  signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
406  signal.header.theTrace = TestOrd::TraceDisconnect;
407  signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
408  signal.header.theReceiversBlockNumber = CMVMI;
409 
410  DisconnectRep * rep = CAST_PTR(DisconnectRep, &signal.theData[0]);
411  rep->nodeId = nodeId;
412  rep->err = errNo;
413 
414 #ifndef NDBD_MULTITHREADED
415  Uint32 secPtr[3];
416  globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
417 #else
418  sendprioa(receiverThreadId,
419  &signal.header, signal.theData, NULL);
420 #endif
421 
422  DBUG_VOID_RETURN;
423 }
424 
425 void
427  const SignalHeader & sh,
428  const SegmentedSectionPtr ptr[3],
429  unsigned i)
430 {
431  fprintf(output, "SECTION %u type=segmented", i);
432  if (i >= 3) {
433  fprintf(output, " *** invalid ***\n");
434  return;
435  }
436  const Uint32 len = ptr[i].sz;
437  SectionSegment * ssp = ptr[i].p;
438  Uint32 pos = 0;
439  fprintf(output, " size=%u\n", (unsigned)len);
440  while (pos < len) {
441  if (pos > 0 && pos % SectionSegment::DataLength == 0) {
442  ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment);
443  }
444  printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);
445  }
446  if (len > 0)
447  putc('\n', output);
448 }
449 
450 void
452 {
453  globalData.m_nodeInfo[nodeId].m_heartbeat_cnt= 0;
454  return;
455 }