MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Loopback_Transporter.cpp
1 /*
2  Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include "Loopback_Transporter.hpp"
21 #include <NdbOut.hpp>
22 #include <NdbSleep.h>
23 
24 #include <EventLogger.hpp>
25 extern EventLogger * g_eventLogger;
26 // End of stuff to be moved
27 
28 
29 Loopback_Transporter::Loopback_Transporter(TransporterRegistry &t_reg,
30  const TransporterConfiguration* conf)
31  : TCP_Transporter(t_reg, conf)
32 {
33  assert(isServer == false);
34 }
35 
36 Loopback_Transporter::~Loopback_Transporter()
37 {
38 }
39 
40 bool
41 Loopback_Transporter::connect_client()
42 {
43  NDB_SOCKET_TYPE pair[2];
44  if (my_socketpair(pair))
45  {
46  perror("socketpair failed!");
47  return false;
48  }
49 
50  if (!TCP_Transporter::setSocketNonBlocking(pair[0]) ||
51  !TCP_Transporter::setSocketNonBlocking(pair[1]))
52  {
53  goto err;
54  }
55 
56  theSocket = pair[0];
57  m_send_socket = pair[1];
58  m_connected = true;
59  return true;
60 
61 err:
62  my_socket_close(pair[0]);
63  my_socket_close(pair[1]);
64  return false;
65 }
66 
67 void
68 Loopback_Transporter::disconnectImpl()
69 {
70  NDB_SOCKET_TYPE pair[] = { theSocket, m_send_socket };
71 
72  get_callback_obj()->lock_transporter(remoteNodeId);
73 
74  receiveBuffer.clear();
75  my_socket_invalidate(&theSocket);
76  my_socket_invalidate(&m_send_socket);
77 
78  get_callback_obj()->unlock_transporter(remoteNodeId);
79 
80  if (my_socket_valid(pair[0]))
81  my_socket_close(pair[0]);
82 
83  if (my_socket_valid(pair[1]))
84  my_socket_close(pair[1]);
85 }
86 
87 bool
88 Loopback_Transporter::send_is_possible(int timeout_millisec) const
89 {
90  return TCP_Transporter::send_is_possible(m_send_socket, timeout_millisec);
91 }
92 
93 #define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
94  (!((sz == -1) && ((e == SOCKET_EAGAIN) || (e == SOCKET_EWOULDBLOCK) || (e == SOCKET_EINTR)))))
95 
96 int
97 Loopback_Transporter::doSend() {
98  struct iovec iov[64];
99  Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
100 
101  if (cnt == 0)
102  {
103  return 0;
104  }
105 
106  Uint32 sum = 0;
107  for(Uint32 i = 0; i<cnt; i++)
108  {
109  assert(iov[i].iov_len);
110  sum += iov[i].iov_len;
111  }
112 
113  Uint32 pos = 0;
114  Uint32 sum_sent = 0;
115  Uint32 send_cnt = 0;
116  Uint32 remain = sum;
117 
118  if (cnt == NDB_ARRAY_SIZE(iov))
119  {
120  // If pulling all iov's make sure that we never return everyting
121  // flushed
122  sum++;
123  }
124 
125  while (send_cnt < 5)
126  {
127  send_cnt++;
128  Uint32 iovcnt = cnt > m_os_max_iovec ? m_os_max_iovec : cnt;
129  int nBytesSent = (int)my_socket_writev(m_send_socket, iov+pos, iovcnt);
130  assert(nBytesSent <= (int)remain);
131 
132  if (Uint32(nBytesSent) == remain)
133  {
134  sum_sent += nBytesSent;
135  goto ok;
136  }
137  else if (nBytesSent > 0)
138  {
139  sum_sent += nBytesSent;
140  remain -= nBytesSent;
141 
145  while (Uint32(nBytesSent) >= iov[pos].iov_len)
146  {
147  assert(iov[pos].iov_len > 0);
148  nBytesSent -= iov[pos].iov_len;
149  pos++;
150  cnt--;
151  }
152 
153  if (nBytesSent)
154  {
155  assert(iov[pos].iov_len > Uint32(nBytesSent));
156  iov[pos].iov_len -= nBytesSent;
157  iov[pos].iov_base = ((char*)(iov[pos].iov_base))+nBytesSent;
158  }
159  continue;
160  }
161  else
162  {
163  int err = my_socket_errno();
164  if (!(DISCONNECT_ERRNO(err, nBytesSent)))
165  {
166  if (sum_sent)
167  {
168  goto ok;
169  }
170  else
171  {
172  return remain;
173  }
174  }
175 
176  do_disconnect(err);
177  return 0;
178  }
179  }
180 
181 ok:
182  assert(sum >= sum_sent);
183  iovec_data_sent(sum_sent);
184  sendCount += send_cnt;
185  sendSize += sum_sent;
186  if(sendCount >= reportFreq)
187  {
188  get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
189  sendCount = 0;
190  sendSize = 0;
191  }
192 
193  return sum - sum_sent; // 0 if every thing flushed else >0
194 }