MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TCP_Transporter.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include <NdbTCP.h>
21 #include "TCP_Transporter.hpp"
22 #include <NdbOut.hpp>
23 #include <NdbSleep.h>
24 
25 #include <EventLogger.hpp>
26 extern EventLogger * g_eventLogger;
27 // End of stuff to be moved
28 
29 #ifdef NDB_WIN32
30 class ndbstrerror
31 {
32 public:
33  ndbstrerror(int iError);
34  ~ndbstrerror(void);
35  operator char*(void) { return m_szError; };
36 
37 private:
38  int m_iError;
39  char* m_szError;
40 };
41 
42 ndbstrerror::ndbstrerror(int iError)
43 : m_iError(iError)
44 {
45  FormatMessage(
46  FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
47  0,
48  iError,
49  MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
50  (LPTSTR)&m_szError,
51  0,
52  0);
53 }
54 
55 ndbstrerror::~ndbstrerror(void)
56 {
57  LocalFree( m_szError );
58  m_szError = 0;
59 }
60 #else
61 #define ndbstrerror strerror
62 #endif
63 
64 static
65 void
66 setIf(int& ref, Uint32 val, Uint32 def)
67 {
68  if (val)
69  ref = val;
70  else
71  ref = def;
72 }
73 
74 
75 static
76 Uint32 overload_limit(const TransporterConfiguration* conf)
77 {
78  return (conf->tcp.tcpOverloadLimit ?
79  conf->tcp.tcpOverloadLimit :
80  conf->tcp.sendBufferSize*4/5);
81 }
82 
83 
84 TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
85  const TransporterConfiguration* conf)
86  :
87  Transporter(t_reg, tt_TCP_TRANSPORTER,
88  conf->localHostName,
89  conf->remoteHostName,
90  conf->s_port,
91  conf->isMgmConnection,
92  conf->localNodeId,
93  conf->remoteNodeId,
94  conf->serverNodeId,
95  0, false,
96  conf->checksum,
97  conf->signalId,
98  conf->tcp.sendBufferSize)
99 {
100  maxReceiveSize = conf->tcp.maxReceiveSize;
101 
102  // Initialize member variables
103  my_socket_invalidate(&theSocket);
104 
105  sendCount = receiveCount = 0;
106  sendSize = receiveSize = 0;
107  reportFreq = 4096;
108 
109  sockOptNodelay = 1;
110  setIf(sockOptRcvBufSize, conf->tcp.tcpRcvBufSize, 70080);
111  setIf(sockOptSndBufSize, conf->tcp.tcpSndBufSize, 71540);
112  setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
113 
114  m_overload_limit = overload_limit(conf);
115 }
116 
117 
118 bool
119 TCP_Transporter::configure_derived(const TransporterConfiguration* conf)
120 {
121  if (conf->tcp.sendBufferSize == m_max_send_buffer &&
122  conf->tcp.maxReceiveSize == maxReceiveSize &&
123  (int)conf->tcp.tcpSndBufSize == sockOptSndBufSize &&
124  (int)conf->tcp.tcpRcvBufSize == sockOptRcvBufSize &&
125  (int)conf->tcp.tcpMaxsegSize == sockOptTcpMaxSeg &&
126  overload_limit(conf) == m_overload_limit)
127  return true; // No change
128 
129  return false; // Can't reconfigure
130 }
131 
132 
133 TCP_Transporter::~TCP_Transporter() {
134 
135  // Disconnect
136  if (my_socket_valid(theSocket))
137  doDisconnect();
138 
139  // Delete receive buffer!!
140  receiveBuffer.destroy();
141 }
142 
143 bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
144 {
145  DBUG_ENTER("TCP_Transpporter::connect_server_impl");
146  DBUG_RETURN(connect_common(sockfd));
147 }
148 
149 bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
150 {
151  DBUG_ENTER("TCP_Transpporter::connect_client_impl");
152  DBUG_RETURN(connect_common(sockfd));
153 }
154 
155 bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
156 {
157  setSocketOptions(sockfd);
158  setSocketNonBlocking(sockfd);
159 
160  get_callback_obj()->lock_transporter(remoteNodeId);
161  theSocket = sockfd;
162  get_callback_obj()->unlock_transporter(remoteNodeId);
163 
164  DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
165  remoteNodeId));
166  return true;
167 }
168 
169 bool
170 TCP_Transporter::initTransporter() {
171 
172  // Allocate buffer for receiving
173  // Let it be the maximum size we receive plus 8 kB for any earlier received
174  // incomplete messages (slack)
175  Uint32 recBufSize = maxReceiveSize;
176  if(recBufSize < MAX_RECV_MESSAGE_BYTESIZE){
177  recBufSize = MAX_RECV_MESSAGE_BYTESIZE;
178  }
179 
180  if(!receiveBuffer.init(recBufSize+MAX_RECV_MESSAGE_BYTESIZE)){
181  return false;
182  }
183 
184  return true;
185 }
186 
187 static
188 void
189 set_get(NDB_SOCKET_TYPE fd, int level, int optval, const char *optname,
190  int val)
191 {
192  int actual = 0, defval = 0;
193  SOCKET_SIZE_TYPE len = sizeof(actual);
194 
195  my_getsockopt(fd, level, optval, (char*)&defval, &len);
196 
197  if (my_setsockopt(fd, level, optval,
198  (char*)&val, sizeof(val)) < 0)
199  {
200 #ifdef DEBUG_TRANSPORTER
201  g_eventLogger->error("setsockopt(%s, %d) errno: %d %s",
202  optname, val, errno, strerror(errno));
203 #endif
204  }
205 
206  len = sizeof(actual);
207  if ((my_getsockopt(fd, level, optval,
208  (char*)&actual, &len) == 0) &&
209  actual != val)
210  {
211 #ifdef DEBUG_TRANSPORTER
212  g_eventLogger->error("setsockopt(%s, %d) - actual %d default: %d",
213  optname, val, actual, defval);
214 #endif
215  }
216 }
217 
218 int
219 TCP_Transporter::pre_connect_options(NDB_SOCKET_TYPE sockfd)
220 {
221  if (sockOptTcpMaxSeg)
222  {
223 #ifdef TCP_MAXSEG
224  set_get(sockfd, IPPROTO_TCP, TCP_MAXSEG, "TCP_MAXSEG", sockOptTcpMaxSeg);
225 #endif
226  }
227  return 0;
228 }
229 
230 void
231 TCP_Transporter::setSocketOptions(NDB_SOCKET_TYPE socket)
232 {
233  set_get(socket, SOL_SOCKET, SO_RCVBUF, "SO_RCVBUF", sockOptRcvBufSize);
234  set_get(socket, SOL_SOCKET, SO_SNDBUF, "SO_SNDBUF", sockOptSndBufSize);
235  set_get(socket, IPPROTO_TCP, TCP_NODELAY, "TCP_NODELAY", sockOptNodelay);
236  set_get(socket, SOL_SOCKET, SO_KEEPALIVE, "SO_KEEPALIVE", 1);
237 
238  if (sockOptTcpMaxSeg)
239  {
240 #ifdef TCP_MAXSEG
241  set_get(socket, IPPROTO_TCP, TCP_MAXSEG, "TCP_MAXSEG", sockOptTcpMaxSeg);
242 #endif
243  }
244 }
245 
246 bool TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket)
247 {
248  if(my_socket_nonblock(socket, true)==0)
249  return true;
250  return false;
251 }
252 
253 bool
254 TCP_Transporter::send_is_possible(int timeout_millisec) const
255 {
256  return send_is_possible(theSocket, timeout_millisec);
257 }
258 
259 bool
260 TCP_Transporter::send_is_possible(NDB_SOCKET_TYPE fd,int timeout_millisec) const
261 {
262  ndb_socket_poller poller;
263 
264  if (!my_socket_valid(fd))
265  return false;
266 
267  poller.clear();
268  poller.add(fd, false, true, false);
269 
270  if (poller.poll_unsafe(timeout_millisec) <= 0)
271  return false; // Timeout or error occured
272 
273  return true;
274 }
275 
276 #define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
277  (!((sz == -1) && ((e == SOCKET_EAGAIN) || (e == SOCKET_EWOULDBLOCK) || (e == SOCKET_EINTR)))))
278 
279 
280 int
281 TCP_Transporter::doSend() {
282  struct iovec iov[64];
283  Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
284 
285  if (cnt == 0)
286  {
287  return 0;
288  }
289 
290  Uint32 sum = 0;
291  for(Uint32 i = 0; i<cnt; i++)
292  {
293  assert(iov[i].iov_len);
294  sum += iov[i].iov_len;
295  }
296 
297  Uint32 pos = 0;
298  Uint32 sum_sent = 0;
299  Uint32 send_cnt = 0;
300  Uint32 remain = sum;
301 
302  if (cnt == NDB_ARRAY_SIZE(iov))
303  {
304  // If pulling all iov's make sure that we never return everyting
305  // flushed
306  sum++;
307  }
308 
309  while (send_cnt < 5)
310  {
311  send_cnt++;
312  Uint32 iovcnt = cnt > m_os_max_iovec ? m_os_max_iovec : cnt;
313  int nBytesSent = (int)my_socket_writev(theSocket, iov+pos, iovcnt);
314  assert(nBytesSent <= (int)remain);
315 
316  if (Uint32(nBytesSent) == remain)
317  {
318  sum_sent += nBytesSent;
319  goto ok;
320  }
321  else if (nBytesSent > 0)
322  {
323  sum_sent += nBytesSent;
324  remain -= nBytesSent;
325 
329  while (Uint32(nBytesSent) >= iov[pos].iov_len)
330  {
331  assert(iov[pos].iov_len > 0);
332  nBytesSent -= iov[pos].iov_len;
333  pos++;
334  cnt--;
335  }
336 
337  if (nBytesSent)
338  {
339  assert(iov[pos].iov_len > Uint32(nBytesSent));
340  iov[pos].iov_len -= nBytesSent;
341  iov[pos].iov_base = ((char*)(iov[pos].iov_base))+nBytesSent;
342  }
343  continue;
344  }
345  else
346  {
347  int err = my_socket_errno();
348  if (!(DISCONNECT_ERRNO(err, nBytesSent)))
349  {
350  if (sum_sent)
351  {
352  goto ok;
353  }
354  else
355  {
356  return remain;
357  }
358  }
359 
360 #if defined DEBUG_TRANSPORTER
361  g_eventLogger->error("Send Failure(disconnect==%d) to node = %d "
362  "nBytesSent = %d "
363  "errno = %d strerror = %s",
364  DISCONNECT_ERRNO(err, nBytesSent),
365  remoteNodeId, nBytesSent, my_socket_errno(),
366  (char*)ndbstrerror(err));
367 #endif
368  do_disconnect(err);
369  return 0;
370  }
371  }
372 
373 ok:
374  assert(sum >= sum_sent);
375  iovec_data_sent(sum_sent);
376  sendCount += send_cnt;
377  sendSize += sum_sent;
378  if(sendCount >= reportFreq)
379  {
380  get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
381  sendCount = 0;
382  sendSize = 0;
383  }
384 
385  return sum - sum_sent; // 0 if every thing flushed else >0
386 }
387 
388 int
389 TCP_Transporter::doReceive() {
390  // Select-function must return the socket for read
391  // before this method is called
392  // It reads the external TCP/IP interface once
393  Uint32 size = receiveBuffer.sizeOfBuffer - receiveBuffer.sizeOfData;
394  if(size > 0){
395  const int nBytesRead = (int)my_recv(theSocket,
396  receiveBuffer.insertPtr,
397  size < maxReceiveSize ? size : maxReceiveSize,
398  0);
399 
400  if (nBytesRead > 0) {
401  receiveBuffer.sizeOfData += nBytesRead;
402  receiveBuffer.insertPtr += nBytesRead;
403 
404  if(receiveBuffer.sizeOfData > receiveBuffer.sizeOfBuffer){
405 #ifdef DEBUG_TRANSPORTER
406  g_eventLogger->error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
407  receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
408  g_eventLogger->error("nBytesRead = %d", nBytesRead);
409 #endif
410  g_eventLogger->error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
411  receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
412  report_error(TE_INVALID_MESSAGE_LENGTH);
413  return 0;
414  }
415 
416  receiveCount ++;
417  receiveSize += nBytesRead;
418 
419  if(receiveCount == reportFreq){
420  get_callback_obj()->reportReceiveLen(remoteNodeId,
421  receiveCount, receiveSize);
422  receiveCount = 0;
423  receiveSize = 0;
424  }
425  return nBytesRead;
426  } else {
427 #if defined DEBUG_TRANSPORTER
428  g_eventLogger->error("Receive Failure(disconnect==%d) to node = %d nBytesSent = %d "
429  "errno = %d strerror = %s",
430  DISCONNECT_ERRNO(my_socket_errno(), nBytesRead),
431  remoteNodeId, nBytesRead, my_socket_errno(),
432  (char*)ndbstrerror(my_socket_errno()));
433 #endif
434  if(DISCONNECT_ERRNO(my_socket_errno(), nBytesRead)){
435  do_disconnect(my_socket_errno());
436  }
437  }
438  return nBytesRead;
439  } else {
440  return 0;
441  }
442 }
443 
444 void
446  get_callback_obj()->lock_transporter(remoteNodeId);
447 
448  NDB_SOCKET_TYPE sock = theSocket;
449  receiveBuffer.clear();
450  my_socket_invalidate(&theSocket);
451 
452  get_callback_obj()->unlock_transporter(remoteNodeId);
453 
454  if(my_socket_valid(sock))
455  {
456  if(my_socket_close(sock) < 0){
457  report_error(TE_ERROR_CLOSING_SOCKET);
458  }
459  }
460 }