MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TCP_Transporter.hpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #ifndef TCP_TRANSPORTER_HPP
19 #define TCP_TRANSPORTER_HPP
20 
21 #include "Transporter.hpp"
22 
23 #include <NdbTCP.h>
24 
25 struct ReceiveBuffer {
26  Uint32 *startOfBuffer; // Pointer to start of the receive buffer
27  Uint32 *readPtr; // Pointer to start reading data
28 
29  char *insertPtr; // Pointer to first position in the receiveBuffer
30  // in which to insert received data. Earlier
31  // received incomplete messages (slack) are
32  // copied into the first part of the receiveBuffer
33 
34  Uint32 sizeOfData; // In bytes
35  Uint32 sizeOfBuffer;
36 
37  ReceiveBuffer() {}
38  bool init(int bytes);
39  void destroy();
40 
41  void clear();
42  void incompleteMessage();
43 };
44 
45 class TCP_Transporter : public Transporter {
46  friend class TransporterRegistry;
47  friend class Loopback_Transporter;
48 private:
49  // Initialize member variables
51 
52  // Disconnect, delete send buffers and receive buffer
53  virtual ~TCP_Transporter();
54 
55  virtual bool configure_derived(const TransporterConfiguration* conf);
56 
60  bool initTransporter();
61 
66  int doSend();
67 
72  int doReceive();
73 
77  NDB_SOCKET_TYPE getSocket() const;
78 
85  virtual Uint32 getReceiveData(Uint32 ** ptr);
86 
90  virtual void updateReceiveDataPtr(Uint32 bytesRead);
91 
92  inline bool hasReceiveData () const {
93  return receiveBuffer.sizeOfData > 0;
94  }
95 protected:
102  virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
103  virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
104  bool connect_common(NDB_SOCKET_TYPE sockfd);
105 
109  virtual void disconnectImpl();
110 
111 private:
112  // Sending/Receiving socket used by both client and server
113  NDB_SOCKET_TYPE theSocket;
114 
115  Uint32 maxReceiveSize;
116 
120  int sockOptRcvBufSize;
121  int sockOptSndBufSize;
122  int sockOptNodelay;
123  int sockOptTcpMaxSeg;
124 
125  void setSocketOptions(NDB_SOCKET_TYPE socket);
126 
127  static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
128  virtual int pre_connect_options(NDB_SOCKET_TYPE aSocket);
129 
130  bool send_is_possible(int timeout_millisec) const;
131  bool send_is_possible(NDB_SOCKET_TYPE fd, int timeout_millisec) const;
132 
136  Uint32 reportFreq;
137  Uint32 receiveCount;
138  Uint64 receiveSize;
139  Uint32 sendCount;
140  Uint64 sendSize;
141 
142  ReceiveBuffer receiveBuffer;
143 
144  bool send_limit_reached(int bufsize) { return bufsize > TCP_SEND_LIMIT; }
145 };
146 
147 inline
148 NDB_SOCKET_TYPE
149 TCP_Transporter::getSocket() const {
150  return theSocket;
151 }
152 
153 inline
154 Uint32
155 TCP_Transporter::getReceiveData(Uint32 ** ptr){
156  (* ptr) = receiveBuffer.readPtr;
157  return receiveBuffer.sizeOfData;
158 }
159 
160 inline
161 void
162 TCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){
163  char * ptr = (char *)receiveBuffer.readPtr;
164  ptr += bytesRead;
165  receiveBuffer.readPtr = (Uint32*)ptr;
166  receiveBuffer.sizeOfData -= bytesRead;
167  receiveBuffer.incompleteMessage();
168 }
169 
170 inline
171 bool
172 ReceiveBuffer::init(int bytes){
173 #ifdef DEBUG_TRANSPORTER
174  ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl;
175 #endif
176 
177  startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1];
178  sizeOfBuffer = bytes + sizeof(Uint32);
179  clear();
180  return true;
181 }
182 
183 inline
184 void
185 ReceiveBuffer::destroy(){
186  delete[] startOfBuffer;
187  sizeOfBuffer = 0;
188  startOfBuffer = 0;
189  clear();
190 }
191 
192 inline
193 void
194 ReceiveBuffer::clear(){
195  readPtr = startOfBuffer;
196  insertPtr = (char *)startOfBuffer;
197  sizeOfData = 0;
198 }
199 
200 inline
201 void
202 ReceiveBuffer::incompleteMessage() {
203  if(startOfBuffer != readPtr){
204  if(sizeOfData != 0)
205  memmove(startOfBuffer, readPtr, sizeOfData);
206  readPtr = startOfBuffer;
207  insertPtr = ((char *)startOfBuffer) + sizeOfData;
208  }
209 }
210 
211 
212 #endif // Define of TCP_Transporter_H