MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Transporter.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #include <TransporterRegistry.hpp>
20 #include <TransporterCallback.hpp>
21 #include "Transporter.hpp"
22 #include "TransporterInternalDefinitions.hpp"
23 #include <NdbSleep.h>
24 #include <SocketAuthenticator.hpp>
25 #include <InputStream.hpp>
26 #include <OutputStream.hpp>
27 
28 #include <EventLogger.hpp>
29 extern EventLogger * g_eventLogger;
30 
31 Transporter::Transporter(TransporterRegistry &t_reg,
32  TransporterType _type,
33  const char *lHostName,
34  const char *rHostName,
35  int s_port,
36  bool _isMgmConnection,
37  NodeId lNodeId,
38  NodeId rNodeId,
39  NodeId serverNodeId,
40  int _byteorder,
41  bool _compression, bool _checksum, bool _signalId,
42  Uint32 max_send_buffer)
43  : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
44  isServer(lNodeId==serverNodeId),
45  m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
46  m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
47  m_connected(false),
48  m_type(_type),
49  m_transporter_registry(t_reg)
50 {
51  DBUG_ENTER("Transporter::Transporter");
52  if (rHostName && strlen(rHostName) > 0){
53  strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
54  Ndb_getInAddr(&remoteHostAddress, rHostName);
55  }
56  else
57  {
58  if (!isServer) {
59  ndbout << "Unable to setup transporter. Node " << rNodeId
60  << " must have hostname. Update configuration." << endl;
61  exit(-1);
62  }
63  remoteHostName[0]= 0;
64  }
65  strncpy(localHostName, lHostName, sizeof(localHostName));
66 
67  DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
68  remoteNodeId, localNodeId, isServer,
69  remoteHostName, localHostName,
70  s_port));
71 
72  byteOrder = _byteorder;
73  compressionUsed = _compression;
74  checksumUsed = _checksum;
75  signalIdUsed = _signalId;
76 
77  m_timeOutMillis = 30000;
78 
79  m_connect_address.s_addr= 0;
80  if(s_port<0)
81  s_port= -s_port; // was dynamic
82 
83  if (isServer)
84  m_socket_client= 0;
85  else
86  {
87  m_socket_client= new SocketClient(remoteHostName, s_port,
88  new SocketAuthSimple("ndbd",
89  "ndbd passwd"));
90 
91  m_socket_client->set_connect_timeout(m_timeOutMillis);
92  }
93 
94  m_os_max_iovec = 16;
95 #if defined (_SC_IOV_MAX) && defined (HAVE_SYSCONF)
96  long res = sysconf(_SC_IOV_MAX);
97  if (res != (long)-1)
98  {
99  m_os_max_iovec = (Uint32)res;
100  }
101 #endif
102 
103  DBUG_VOID_RETURN;
104 }
105 
107  delete m_socket_client;
108 }
109 
110 
111 bool
112 Transporter::configure(const TransporterConfiguration* conf)
113 {
114  if (configure_derived(conf) &&
115  conf->s_port == m_s_port &&
116  strcmp(conf->remoteHostName, remoteHostName) == 0 &&
117  strcmp(conf->localHostName, localHostName) == 0 &&
118  conf->remoteNodeId == remoteNodeId &&
119  conf->localNodeId == localNodeId &&
120  (conf->serverNodeId == conf->localNodeId) == isServer &&
121  conf->checksum == checksumUsed &&
122  conf->signalId == signalIdUsed &&
123  conf->isMgmConnection == isMgmConnection &&
124  conf->type == m_type)
125  return true; // No change
126  return false; // Can't reconfigure
127 }
128 
129 
130 bool
131 Transporter::connect_server(NDB_SOCKET_TYPE sockfd,
132  BaseString& msg) {
133  // all initial negotiation is done in TransporterRegistry::connect_server
134  DBUG_ENTER("Transporter::connect_server");
135 
136  if (m_connected)
137  {
138  msg.assfmt("line: %u : already connected ??", __LINE__);
139  DBUG_RETURN(false);
140  }
141 
142  // Cache the connect address
143  my_socket_connect_address(sockfd, &m_connect_address);
144 
145  if (!connect_server_impl(sockfd))
146  {
147  msg.assfmt("line: %u : connect_server_impl failed", __LINE__);
148  DBUG_RETURN(false);
149  }
150 
151  m_connected = true;
152 
153  DBUG_RETURN(true);
154 }
155 
156 
157 bool
159  NDB_SOCKET_TYPE sockfd;
160  DBUG_ENTER("Transporter::connect_client");
161 
162  if(m_connected)
163  DBUG_RETURN(true);
164 
165  if(isMgmConnection)
166  {
167  sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
168  }
169  else
170  {
171  if (!m_socket_client->init())
172  DBUG_RETURN(false);
173 
174  if (pre_connect_options(m_socket_client->m_sockfd) != 0)
175  DBUG_RETURN(false);
176 
177  if (strlen(localHostName) > 0)
178  {
179  if (m_socket_client->bind(localHostName, 0) != 0)
180  DBUG_RETURN(false);
181  }
182  sockfd= m_socket_client->connect();
183  }
184 
185  DBUG_RETURN(connect_client(sockfd));
186 }
187 
188 
189 bool
190 Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
191 
192  DBUG_ENTER("Transporter::connect_client(sockfd)");
193 
194  if(m_connected)
195  {
196  DBUG_PRINT("error", ("Already connected"));
197  DBUG_RETURN(true);
198  }
199 
200  if (!my_socket_valid(sockfd))
201  {
202  DBUG_PRINT("error", ("Socket " MY_SOCKET_FORMAT " is not valid",
203  MY_SOCKET_FORMAT_VALUE(sockfd)));
204  DBUG_RETURN(false);
205  }
206 
207  DBUG_PRINT("info",("server port: %d, isMgmConnection: %d",
208  m_s_port, isMgmConnection));
209 
210  // Send "hello"
211  DBUG_PRINT("info", ("Sending own nodeid: %d and transporter type: %d",
212  localNodeId, m_type));
213  SocketOutputStream s_output(sockfd);
214  if (s_output.println("%d %d", localNodeId, m_type) < 0)
215  {
216  DBUG_PRINT("error", ("Send of 'hello' failed"));
217  NDB_CLOSE_SOCKET(sockfd);
218  DBUG_RETURN(false);
219  }
220 
221  // Read reply
222  DBUG_PRINT("info", ("Reading reply"));
223  char buf[256];
224  SocketInputStream s_input(sockfd);
225  if (s_input.gets(buf, 256) == 0)
226  {
227  DBUG_PRINT("error", ("Failed to read reply"));
228  NDB_CLOSE_SOCKET(sockfd);
229  DBUG_RETURN(false);
230  }
231 
232  // Parse reply
233  int nodeId, remote_transporter_type= -1;
234  int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
235  switch (r) {
236  case 2:
237  break;
238  case 1:
239  // we're running version prior to 4.1.9
240  // ok, but with no checks on transporter configuration compatability
241  break;
242  default:
243  DBUG_PRINT("error", ("Failed to parse reply"));
244  NDB_CLOSE_SOCKET(sockfd);
245  DBUG_RETURN(false);
246  }
247 
248  DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
249  nodeId, remote_transporter_type));
250 
251  // Check nodeid
252  if (nodeId != remoteNodeId)
253  {
254  g_eventLogger->error("Connected to wrong nodeid: %d, expected: %d",
255  nodeId, remoteNodeId);
256  NDB_CLOSE_SOCKET(sockfd);
257  DBUG_RETURN(false);
258  }
259 
260  // Check transporter type
261  if (remote_transporter_type != -1 &&
262  remote_transporter_type != m_type)
263  {
264  g_eventLogger->error("Connection to node: %d uses different transporter "
265  "type: %d, expected type: %d",
266  nodeId, remote_transporter_type, m_type);
267  NDB_CLOSE_SOCKET(sockfd);
268  DBUG_RETURN(false);
269  }
270 
271  // Cache the connect address
272  my_socket_connect_address(sockfd, &m_connect_address);
273 
274  if (!connect_client_impl(sockfd))
275  DBUG_RETURN(false);
276 
277  m_connected = true;
278 
279  DBUG_RETURN(true);
280 }
281 
282 void
284 
285  if(!m_connected)
286  return;
287 
288  m_connected = false;
289 
290  disconnectImpl();
291 }
292