MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SHM_Transporter.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #include <ndb_global.h>
20 
21 #include "SHM_Transporter.hpp"
22 #include "TransporterInternalDefinitions.hpp"
23 #include <TransporterCallback.hpp>
24 #include <NdbSleep.h>
25 #include <NdbOut.hpp>
26 
27 #include <InputStream.hpp>
28 #include <OutputStream.hpp>
29 
30 extern int g_ndb_shm_signum;
31 
32 SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
33  const char *lHostName,
34  const char *rHostName,
35  int r_port,
36  bool isMgmConnection_arg,
37  NodeId lNodeId,
38  NodeId rNodeId,
39  NodeId serverNodeId,
40  bool checksum,
41  bool signalId,
42  key_t _shmKey,
43  Uint32 _shmSize) :
44  Transporter(t_reg, tt_SHM_TRANSPORTER,
45  lHostName, rHostName, r_port, isMgmConnection_arg,
46  lNodeId, rNodeId, serverNodeId,
47  0, false, checksum, signalId,
48  4096 + MAX_SEND_MESSAGE_BYTESIZE),
49  shmKey(_shmKey),
50  shmSize(_shmSize)
51 {
52 #ifndef NDB_WIN32
53  shmId= 0;
54 #endif
55  _shmSegCreated = false;
56  _attached = false;
57 
58  shmBuf = 0;
59  reader = 0;
60  writer = 0;
61 
62  setupBuffersDone=false;
63 #ifdef DEBUG_TRANSPORTER
64  printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
65 #endif
66  m_signal_threshold = 4096;
67 }
68 
69 
70 bool
71 SHM_Transporter::configure_derived(const TransporterConfiguration* conf)
72 {
73  if ((key_t)conf->shm.shmKey == shmKey &&
74  (int)conf->shm.shmSize == shmSize &&
75  conf->shm.signum == g_ndb_shm_signum)
76  return true; // No change
77  return false; // Can't reconfigure
78 }
79 
80 
82  doDisconnect();
83 }
84 
85 bool
87  if (g_ndb_shm_signum)
88  return true;
89  return false;
90 }
91 
92 void
94  Uint32 sharedSize = 0;
95  sharedSize += 28; //SHM_Reader::getSharedSize();
96  sharedSize += 28; //SHM_Writer::getSharedSize();
97 
98  const Uint32 slack = MAX(MAX_RECV_MESSAGE_BYTESIZE,
99  MAX_SEND_MESSAGE_BYTESIZE);
100 
104  Uint32 sizeOfBuffer = shmSize;
105  sizeOfBuffer -= 2*sharedSize;
106  sizeOfBuffer /= 2;
107 
108  Uint32 * base1 = (Uint32*)shmBuf;
109 
110  Uint32 * sharedReadIndex1 = base1;
111  Uint32 * sharedWriteIndex1 = base1 + 1;
112  serverStatusFlag = base1 + 4;
113  char * startOfBuf1 = shmBuf+sharedSize;
114 
115  Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
116  Uint32 * sharedReadIndex2 = base2;
117  Uint32 * sharedWriteIndex2 = base2 + 1;
118  clientStatusFlag = base2 + 4;
119  char * startOfBuf2 = ((char *)base2)+sharedSize;
120 
121  if(isServer){
122  * serverStatusFlag = 0;
123  reader = new SHM_Reader(startOfBuf1,
124  sizeOfBuffer,
125  slack,
126  sharedReadIndex1,
127  sharedWriteIndex1);
128 
129  writer = new SHM_Writer(startOfBuf2,
130  sizeOfBuffer,
131  slack,
132  sharedReadIndex2,
133  sharedWriteIndex2);
134 
135  * sharedReadIndex1 = 0;
136  * sharedWriteIndex1 = 0;
137 
138  * sharedReadIndex2 = 0;
139  * sharedWriteIndex2 = 0;
140 
141  reader->clear();
142  writer->clear();
143 
144  * serverStatusFlag = 1;
145 
146 #ifdef DEBUG_TRANSPORTER
147  printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
148  printf("Reader at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
149  printf("sharedReadIndex1 at %ld (%p) = %d\n",
150  (char*)sharedReadIndex1-shmBuf,
151  sharedReadIndex1, *sharedReadIndex1);
152  printf("sharedWriteIndex1 at %ld (%p) = %d\n",
153  (char*)sharedWriteIndex1-shmBuf,
154  sharedWriteIndex1, *sharedWriteIndex1);
155 
156  printf("Writer at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
157  printf("sharedReadIndex2 at %ld (%p) = %d\n",
158  (char*)sharedReadIndex2-shmBuf,
159  sharedReadIndex2, *sharedReadIndex2);
160  printf("sharedWriteIndex2 at %ld (%p) = %d\n",
161  (char*)sharedWriteIndex2-shmBuf,
162  sharedWriteIndex2, *sharedWriteIndex2);
163 
164  printf("sizeOfBuffer = %d\n", sizeOfBuffer);
165 #endif
166  } else {
167  * clientStatusFlag = 0;
168  reader = new SHM_Reader(startOfBuf2,
169  sizeOfBuffer,
170  slack,
171  sharedReadIndex2,
172  sharedWriteIndex2);
173 
174  writer = new SHM_Writer(startOfBuf1,
175  sizeOfBuffer,
176  slack,
177  sharedReadIndex1,
178  sharedWriteIndex1);
179 
180  * sharedReadIndex2 = 0;
181  * sharedWriteIndex1 = 0;
182 
183  reader->clear();
184  writer->clear();
185  * clientStatusFlag = 1;
186 #ifdef DEBUG_TRANSPORTER
187  printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
188  printf("Reader at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
189  printf("sharedReadIndex2 at %ld (%p) = %d\n",
190  (char*)sharedReadIndex2-shmBuf,
191  sharedReadIndex2, *sharedReadIndex2);
192  printf("sharedWriteIndex2 at %ld (%p) = %d\n",
193  (char*)sharedWriteIndex2-shmBuf,
194  sharedWriteIndex2, *sharedWriteIndex2);
195 
196  printf("Writer at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
197  printf("sharedReadIndex1 at %ld (%p) = %d\n",
198  (char*)sharedReadIndex1-shmBuf,
199  sharedReadIndex1, *sharedReadIndex1);
200  printf("sharedWriteIndex1 at %ld (%p) = %d\n",
201  (char*)sharedWriteIndex1-shmBuf,
202  sharedWriteIndex1, *sharedWriteIndex1);
203 
204  printf("sizeOfBuffer = %d\n", sizeOfBuffer);
205 #endif
206  }
207 #ifdef DEBUG_TRANSPORTER
208  printf("Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);
209 #endif
210 }
211 
212 bool
214 {
215  DBUG_ENTER("SHM_Transporter::connect_server_impl");
216  SocketOutputStream s_output(sockfd);
217  SocketInputStream s_input(sockfd);
218  char buf[256];
219 
220  // Create
221  if(!_shmSegCreated){
222  if (!ndb_shm_create()) {
223  make_error_info(buf, sizeof(buf));
224  report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT, buf);
225  NDB_CLOSE_SOCKET(sockfd);
226  DBUG_RETURN(false);
227  }
228  _shmSegCreated = true;
229  }
230 
231  // Attach
232  if(!_attached){
233  if (!ndb_shm_attach()) {
234  make_error_info(buf, sizeof(buf));
235  report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
236  NDB_CLOSE_SOCKET(sockfd);
237  DBUG_RETURN(false);
238  }
239  _attached = true;
240  }
241 
242  // Send ok to client
243  s_output.println("shm server 1 ok: %d",
244  m_transporter_registry.m_shm_own_pid);
245 
246  // Wait for ok from client
247  DBUG_PRINT("info", ("Wait for ok from client"));
248  if (s_input.gets(buf, sizeof(buf)) == 0)
249  {
250  NDB_CLOSE_SOCKET(sockfd);
251  DBUG_RETURN(false);
252  }
253 
254  if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
255  {
256  NDB_CLOSE_SOCKET(sockfd);
257  DBUG_RETURN(false);
258  }
259 
260  int r= connect_common(sockfd);
261 
262  if (r) {
263  // Send ok to client
264  s_output.println("shm server 2 ok");
265  // Wait for ok from client
266  if (s_input.gets(buf, 256) == 0) {
267  NDB_CLOSE_SOCKET(sockfd);
268  DBUG_RETURN(false);
269  }
270  DBUG_PRINT("info", ("Successfully connected server to node %d",
271  remoteNodeId));
272  }
273 
274  NDB_CLOSE_SOCKET(sockfd);
275  DBUG_RETURN(r);
276 }
277 
278 bool
280 {
281  DBUG_ENTER("SHM_Transporter::connect_client_impl");
282  SocketInputStream s_input(sockfd);
283  SocketOutputStream s_output(sockfd);
284  char buf[256];
285 
286  // Wait for server to create and attach
287  DBUG_PRINT("info", ("Wait for server to create and attach"));
288  if (s_input.gets(buf, 256) == 0) {
289  NDB_CLOSE_SOCKET(sockfd);
290  DBUG_PRINT("error", ("Server id %d did not attach",
291  remoteNodeId));
292  DBUG_RETURN(false);
293  }
294 
295  if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
296  {
297  NDB_CLOSE_SOCKET(sockfd);
298  DBUG_RETURN(false);
299  }
300 
301  // Create
302  if(!_shmSegCreated){
303  if (!ndb_shm_get()) {
304  NDB_CLOSE_SOCKET(sockfd);
305  DBUG_PRINT("error", ("Failed create of shm seg to node %d",
306  remoteNodeId));
307  DBUG_RETURN(false);
308  }
309  _shmSegCreated = true;
310  }
311 
312  // Attach
313  if(!_attached){
314  if (!ndb_shm_attach()) {
315  make_error_info(buf, sizeof(buf));
316  report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
317  NDB_CLOSE_SOCKET(sockfd);
318  DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
319  remoteNodeId));
320  DBUG_RETURN(false);
321  }
322  _attached = true;
323  }
324 
325  // Send ok to server
326  s_output.println("shm client 1 ok: %d",
327  m_transporter_registry.m_shm_own_pid);
328 
329  int r= connect_common(sockfd);
330 
331  if (r) {
332  // Wait for ok from server
333  DBUG_PRINT("info", ("Wait for ok from server"));
334  if (s_input.gets(buf, 256) == 0) {
335  NDB_CLOSE_SOCKET(sockfd);
336  DBUG_PRINT("error", ("No ok from server node %d",
337  remoteNodeId));
338  DBUG_RETURN(false);
339  }
340  // Send ok to server
341  s_output.println("shm client 2 ok");
342  DBUG_PRINT("info", ("Successfully connected client to node %d",
343  remoteNodeId));
344  }
345 
346  NDB_CLOSE_SOCKET(sockfd);
347  DBUG_RETURN(r);
348 }
349 
350 bool
351 SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
352 {
353  if (!checkConnected()) {
354  return false;
355  }
356 
357  if(!setupBuffersDone)
358  {
359  setupBuffers();
360  setupBuffersDone=true;
361  }
362 
363  if(setupBuffersDone)
364  {
365  NdbSleep_MilliSleep(m_timeOutMillis);
366  if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
367  return true;
368  }
369 
370  DBUG_PRINT("error", ("Failed to set up buffers to node %d",
371  remoteNodeId));
372  return false;
373 }
374 
375 int
377 {
378  struct iovec iov[64];
379  Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
380 
381  if (cnt == 0)
382  {
383  return 0;
384  }
385 
386  Uint32 sum = 0;
387  for(Uint32 i = 0; i<cnt; i++)
388  {
389  assert(iov[i].iov_len);
390  sum += iov[i].iov_len;
391  }
392 
393  int nBytesSent = writer->writev(iov, cnt);
394 
395  if (nBytesSent > 0)
396  {
397  kill(m_remote_pid, g_ndb_shm_signum);
398  iovec_data_sent(nBytesSent);
399 
400  if (Uint32(nBytesSent) == sum && (cnt != NDB_ARRAY_SIZE(iov)))
401  {
402  return 0;
403  }
404  return 1;
405  }
406 
407  return 1;
408 }