MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SocketServer.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #include <ndb_global.h>
20 
21 #include <SocketServer.hpp>
22 
23 #include <NdbTCP.h>
24 #include <NdbOut.hpp>
25 #include <NdbThread.h>
26 #include <NdbSleep.h>
27 #include <NdbTick.h>
28 
29 SocketServer::SocketServer(unsigned maxSessions) :
30  m_sessions(10),
31  m_services(5),
32  m_maxSessions(maxSessions),
33  m_stopThread(false),
34  m_thread(0)
35 {
36 }
37 
38 SocketServer::~SocketServer() {
39  unsigned i;
40  for(i = 0; i<m_sessions.size(); i++){
41  Session* session= m_sessions[i].m_session;
42  assert(session->m_refCount == 0);
43  delete session;
44  }
45  for(i = 0; i<m_services.size(); i++){
46  if(my_socket_valid(m_services[i].m_socket))
47  my_socket_close(m_services[i].m_socket);
48  delete m_services[i].m_service;
49  }
50 }
51 
52 bool
53 SocketServer::tryBind(unsigned short port, const char * intface) {
54  struct sockaddr_in servaddr;
55  memset(&servaddr, 0, sizeof(servaddr));
56  servaddr.sin_family = AF_INET;
57  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
58  servaddr.sin_port = htons(port);
59 
60  if(intface != 0){
61  if(Ndb_getInAddr(&servaddr.sin_addr, intface))
62  return false;
63  }
64 
65  const NDB_SOCKET_TYPE sock = my_socket_create(AF_INET, SOCK_STREAM, 0);
66  if (!my_socket_valid(sock))
67  return false;
68 
69  DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
70  MY_SOCKET_FORMAT_VALUE(sock)));
71 
72  if (my_socket_reuseaddr(sock, true) == -1)
73  {
74  NDB_CLOSE_SOCKET(sock);
75  return false;
76  }
77 
78  if (my_bind_inet(sock, &servaddr) == -1) {
79  NDB_CLOSE_SOCKET(sock);
80  return false;
81  }
82 
83  NDB_CLOSE_SOCKET(sock);
84  return true;
85 }
86 
87 bool
89  unsigned short * port,
90  const char * intface){
91  DBUG_ENTER("SocketServer::setup");
92  DBUG_PRINT("enter",("interface=%s, port=%u", intface, *port));
93  struct sockaddr_in servaddr;
94  memset(&servaddr, 0, sizeof(servaddr));
95  servaddr.sin_family = AF_INET;
96  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
97  servaddr.sin_port = htons(*port);
98 
99  if(intface != 0){
100  if(Ndb_getInAddr(&servaddr.sin_addr, intface))
101  DBUG_RETURN(false);
102  }
103 
104  const NDB_SOCKET_TYPE sock = my_socket_create(AF_INET, SOCK_STREAM, 0);
105  if (!my_socket_valid(sock))
106  {
107  DBUG_PRINT("error",("socket() - %d - %s",
108  socket_errno, strerror(socket_errno)));
109  DBUG_RETURN(false);
110  }
111 
112  DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
113  MY_SOCKET_FORMAT_VALUE(sock)));
114 
115  if (my_socket_reuseaddr(sock, true) == -1)
116  {
117  DBUG_PRINT("error",("setsockopt() - %d - %s",
118  errno, strerror(errno)));
119  NDB_CLOSE_SOCKET(sock);
120  DBUG_RETURN(false);
121  }
122 
123  if (my_bind_inet(sock, &servaddr) == -1) {
124  DBUG_PRINT("error",("bind() - %d - %s",
125  socket_errno, strerror(socket_errno)));
126  NDB_CLOSE_SOCKET(sock);
127  DBUG_RETURN(false);
128  }
129 
130  /* Get the port we bound to */
131  if(my_socket_get_port(sock, port))
132  {
133  ndbout_c("An error occurred while trying to find out what"
134  " port we bound to. Error: %d - %s",
135  socket_errno, strerror(socket_errno));
136  my_socket_close(sock);
137  DBUG_RETURN(false);
138  }
139 
140  DBUG_PRINT("info",("bound to %u", *port));
141 
142  if (my_listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1)
143  {
144  DBUG_PRINT("error",("listen() - %d - %s",
145  socket_errno, strerror(socket_errno)));
146  my_socket_close(sock);
147  DBUG_RETURN(false);
148  }
149 
150  ServiceInstance i;
151  i.m_socket = sock;
152  i.m_service = service;
153  m_services.push_back(i);
154 
155  // Increase size to allow polling all listening ports
156  m_services_poller.set_max_count(m_services.size());
157 
158  DBUG_RETURN(true);
159 }
160 
161 
162 bool
163 SocketServer::doAccept()
164 {
165  m_services.lock();
166 
167  m_services_poller.clear();
168  for (unsigned i = 0; i < m_services.size(); i++)
169  {
170  m_services_poller.add(m_services[i].m_socket, true, false, true);
171  }
172  assert(m_services.size() == m_services_poller.count());
173 
174  const int accept_timeout_ms = 1000;
175  const int ret = m_services_poller.poll(accept_timeout_ms);
176  if (ret < 0)
177  {
178  // Error occured, indicate error to caller by returning false
179  m_services.unlock();
180  return false;
181  }
182 
183  if (ret == 0)
184  {
185  // Timeout occured
186  m_services.unlock();
187  return true;
188  }
189 
190  bool result = true;
191  for (unsigned i = 0; i < m_services_poller.count(); i++)
192  {
193  const bool has_read = m_services_poller.has_read(i);
194 
195  if (!has_read)
196  continue; // Ignore events where read flag wasn't set
197 
198  ServiceInstance & si = m_services[i];
199  assert(m_services_poller.is_socket_equal(i, si.m_socket));
200 
201  const NDB_SOCKET_TYPE childSock = my_accept(si.m_socket, 0, 0);
202  if (!my_socket_valid(childSock))
203  {
204  // Could not 'accept' socket(maybe at max fds), indicate error
205  // to caller by returning false
206  result = false;
207  continue;
208  }
209 
210  SessionInstance s;
211  s.m_service = si.m_service;
212  s.m_session = si.m_service->newSession(childSock);
213  if (s.m_session != 0)
214  {
215  m_session_mutex.lock();
216  m_sessions.push_back(s);
217  startSession(m_sessions.back());
218  m_session_mutex.unlock();
219  }
220  }
221 
222  m_services.unlock();
223  return result;
224 }
225 
226 extern "C"
227 void*
228 socketServerThread_C(void* _ss){
229  SocketServer * ss = (SocketServer *)_ss;
230  ss->doRun();
231  return 0;
232 }
233 
234 struct NdbThread*
236 {
237  m_threadLock.lock();
238  if(m_thread == 0 && m_stopThread == false)
239  {
240  m_thread = NdbThread_Create(socketServerThread_C,
241  (void**)this,
242  0, // default stack size
243  "NdbSockServ",
244  NDB_THREAD_PRIO_LOW);
245  }
246  m_threadLock.unlock();
247  return m_thread;
248 }
249 
250 void
251 SocketServer::stopServer(){
252  m_threadLock.lock();
253  if(m_thread != 0){
254  m_stopThread = true;
255 
256  void * res;
257  NdbThread_WaitFor(m_thread, &res);
258  NdbThread_Destroy(&m_thread);
259  m_thread = 0;
260  }
261  m_threadLock.unlock();
262 }
263 
264 void
265 SocketServer::doRun(){
266 
267  while(!m_stopThread){
268  m_session_mutex.lock();
269  checkSessionsImpl();
270  m_session_mutex.unlock();
271 
272  if(m_sessions.size() >= m_maxSessions){
273  // Don't accept more connections yet
274  NdbSleep_MilliSleep(200);
275  continue;
276  }
277 
278  if (!doAccept()){
279  // accept failed, step back
280  NdbSleep_MilliSleep(200);
281  }
282  }
283 }
284 
285 void
286 SocketServer::startSession(SessionInstance & si){
287  si.m_thread = NdbThread_Create(sessionThread_C,
288  (void**)si.m_session,
289  0, // default stack size
290  "NdbSock_Session",
291  NDB_THREAD_PRIO_LOW);
292 }
293 
294 void
295 SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *),
296  void *data)
297 {
298  // Build a list of pointers to all active sessions
299  // and increase refcount on the sessions
300  m_session_mutex.lock();
301  Vector<Session*> session_pointers(m_sessions.size());
302  for(unsigned i= 0; i < m_sessions.size(); i++){
303  Session* session= m_sessions[i].m_session;
304  session_pointers.push_back(session);
305  session->m_refCount++;
306  }
307  m_session_mutex.unlock();
308 
309  // Call the function on each session
310  for(unsigned i= 0; i < session_pointers.size(); i++){
311  (*func)(session_pointers[i], data);
312  }
313 
314  // Release the sessions pointers and any stopped sessions
315  m_session_mutex.lock();
316  for(unsigned i= 0; i < session_pointers.size(); i++){
317  Session* session= session_pointers[i];
318  assert(session->m_refCount > 0);
319  session->m_refCount--;
320  }
321  checkSessionsImpl();
322  m_session_mutex.unlock();
323 }
324 
325 void
326 SocketServer::checkSessions()
327 {
328  m_session_mutex.lock();
329  checkSessionsImpl();
330  m_session_mutex.unlock();
331 }
332 
333 void
334 SocketServer::checkSessionsImpl()
335 {
336  for(int i = m_sessions.size() - 1; i >= 0; i--)
337  {
338  if(m_sessions[i].m_session->m_thread_stopped &&
339  (m_sessions[i].m_session->m_refCount == 0))
340  {
341  if(m_sessions[i].m_thread != 0)
342  {
343  void* ret;
344  NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
345  NdbThread_Destroy(&m_sessions[i].m_thread);
346  }
347  m_sessions[i].m_session->stopSession();
348  delete m_sessions[i].m_session;
349  m_sessions.erase(i);
350  }
351  }
352 }
353 
354 bool
355 SocketServer::stopSessions(bool wait, unsigned wait_timeout){
356  int i;
357  m_session_mutex.lock();
358  for(i = m_sessions.size() - 1; i>=0; i--)
359  {
360  m_sessions[i].m_session->stopSession();
361  }
362  m_session_mutex.unlock();
363 
364  for(i = m_services.size() - 1; i>=0; i--)
365  m_services[i].m_service->stopSessions();
366 
367  if(!wait)
368  return false; // No wait
369 
370  NDB_TICKS start = NdbTick_CurrentMillisecond();
371  m_session_mutex.lock();
372  while(m_sessions.size() > 0){
373  checkSessionsImpl();
374  m_session_mutex.unlock();
375 
376  if (wait_timeout > 0 &&
377  (NdbTick_CurrentMillisecond() - start) > wait_timeout)
378  return false; // Wait abandoned
379 
380  NdbSleep_MilliSleep(100);
381  m_session_mutex.lock();
382  }
383  m_session_mutex.unlock();
384  return true; // All sessions gone
385 }
386 
387 
388 /***** Session code ******/
389 
390 extern "C"
391 void*
392 sessionThread_C(void* _sc){
394 
395  assert(si->m_thread_stopped == false);
396 
397  if(!si->m_stop)
398  si->runSession();
399  else
400  NDB_CLOSE_SOCKET(si->m_socket);
401 
402  // Mark the thread as stopped to allow the
403  // session resources to be released
404  si->m_thread_stopped = true;
405  return 0;
406 }
407 
410 template class Vector<SocketServer::Session*>;