MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ndb_socket_poller.h
1 /*
2  Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #ifndef NDB_SOCKET_POLLER_H
19 #define NDB_SOCKET_POLLER_H
20 
21 #include <portlib/NdbTick.h>
22 
23 /*
24  Portability layer used for waiting on socket events
25 */
26 
28  // Max number of fds the list can hold, defaults to 1 and
29  // can be dynamically expanded by calling 'set_max_count'
30  unsigned m_max_count;
31 
32  // Current number of fds in the list
33  unsigned m_count;
34 
35 #ifdef HAVE_POLL
36  // The list of pollfds, initial size is 1 and m_pfds will
37  // then point at m_one_pfd. After dynamic expand points at
38  // dynamic list of pollfds
39  struct pollfd m_one_pfd;
40  struct pollfd* m_pfds;
41 #else
42 #if defined(_WIN32)
43  // Utility functions for dynamically expanding the fd_set
44  // on Windows to get around the hardcoded FD_SETSIZE limit.
45  static bool
46  set_max_count(fd_set* set, fd_set* static_set, unsigned count) {
47  void* ptr = malloc(sizeof(fd_set) + count-1*sizeof(SOCKET));
48  if (!ptr)
49  return false;
50  if (set != static_set)
51  free(set);
52  set = (fd_set*)ptr;
53  clear(set);
54  return true;
55  }
56 
57  static void
58  set_fd(fd_set* set, SOCKET s) {
59  // Avoid use of FD_SET since it silently drop
60  // sockets when FD_SETSIZE fd_count is reached
61  set->fd_array[set->fd_count++] = s;
62  }
63 
64  static void
65  clear(fd_set* set) {
66  FD_ZERO(set);
67  }
68 #endif
69  fd_set m_one_read_set;
70  fd_set m_one_write_set;
71  fd_set m_one_excp_set;
72  fd_set* m_read_set;
73  fd_set* m_write_set;
74  fd_set* m_excp_set;
75 
76  // Mapping from "index" to "fd"
77  ndb_native_socket_t m_one_fd;
78  ndb_native_socket_t* m_fds;
79 
80  int m_nfds; // Max fd number for 'select'
81 #endif
82 
83 public:
84 
85  ndb_socket_poller(void) :
86  m_max_count(1)
87 #ifdef HAVE_POLL
88  , m_pfds(&m_one_pfd)
89 #else
90  , m_read_set(&m_one_read_set)
91  , m_write_set(&m_one_write_set)
92  , m_excp_set(&m_one_excp_set)
93  , m_fds(&m_one_fd)
94 #endif
95  {
96  clear();
97  }
98 
99  void clear(void) {
100  m_count = 0;
101 #ifndef HAVE_POLL
102  FD_ZERO(m_read_set);
103  FD_ZERO(m_write_set);
104  FD_ZERO(m_excp_set);
105  m_nfds = 0;
106 #endif
107  }
108 
109  ~ndb_socket_poller() {
110 #ifdef HAVE_POLL
111  if (m_pfds != &m_one_pfd)
112  delete[] m_pfds;
113 #else
114 #ifdef _WIN32
115  if (m_read_set != &m_one_read_set)
116  free(m_read_set);
117  if (m_write_set != &m_one_write_set)
118  free(m_write_set);
119  if (m_excp_set != &m_one_excp_set)
120  free(m_excp_set);
121 #endif
122  if (m_fds != &m_one_fd)
123  delete[] m_fds;
124 #endif
125  }
126 
127  bool set_max_count(unsigned count) {
128  if (count <= m_max_count)
129  {
130  // Ignore decrease or setting same value
131  return true;
132  }
133 #ifdef HAVE_POLL
134  struct pollfd* pfds = new struct pollfd[count];
135  if (pfds == NULL)
136  return false;
137  if (m_pfds != &m_one_pfd)
138  delete[] m_pfds;
139  m_pfds = pfds;
140 #else
141 #if defined(_WIN32)
142  if (count > FD_SETSIZE)
143  {
144  // Expand the arrays above the builtin FD_SETSIZE
145  if (!set_max_count(m_read_set, &m_one_read_set, count) ||
146  !set_max_count(m_write_set, &m_one_write_set, count) ||
147  !set_max_count(m_excp_set, &m_one_excp_set, count))
148  return false;
149  }
150 #endif
151  ndb_native_socket_t* fds = new ndb_native_socket_t[count];
152  if (fds == NULL)
153  return false;
154  if (m_fds != &m_one_fd)
155  delete[] m_fds;
156  m_fds = fds;
157 #endif
158  m_max_count = count;
159  return true;
160  }
161 
162  unsigned add(ndb_socket_t sock, bool read, bool write, bool error) {
163  const unsigned index = m_count;
164 #ifdef HAVE_POLL
165  assert(m_count < m_max_count);
166  struct pollfd &pfd = m_pfds[m_count++];
167  pfd.fd = ndb_socket_get_native(sock);
168 
169  short events = 0;
170  if (read)
171  events |= POLLIN;
172  if (write)
173  events |= POLLOUT;
174  if (error)
175  events |= POLLPRI;
176  pfd.events = events;
177 
178  pfd.revents = 0;
179 #else
180 #if defined(_WIN32)
181  if (read)
182  set_fd(m_read_set, ndb_socket_get_native(sock));
183  if (write)
184  set_fd(m_write_set, ndb_socket_get_native(sock));
185  if (error)
186  set_fd(m_excp_set, ndb_socket_get_native(sock));
187  // Not counting nfds on Windows since select ignores it anyway
188  assert(m_nfds == 0);
189 #else
190  int fd = ndb_socket_get_native(sock);
191  if (fd < 0 || fd >= FD_SETSIZE)
192  {
193  fprintf(stderr, "Maximum value for FD_SETSIZE: %d exceeded when"
194  "trying to add fd: %d", FD_SETSIZE, fd);
195  fflush(stderr);
196  abort();
197  }
198  if (read)
199  FD_SET(fd, m_read_set);
200  if (write)
201  FD_SET(fd, m_write_set);
202  if (error)
203  FD_SET(fd, m_excp_set);
204  if (fd > m_nfds)
205  m_nfds = fd;
206 #endif
207  // Maintain mapping from index to fd
208  m_fds[m_count++] = ndb_socket_get_native(sock);
209 #endif
210  assert(m_count > index);
211  return index;
212  }
213 
214  unsigned count(void) const {
215  return m_count;
216  }
217 
218  bool is_socket_equal(unsigned index, ndb_socket_t socket) const {
219  assert(index < m_count);
220  assert(m_count <= m_max_count);
221 #ifdef HAVE_POLL
222  return (m_pfds[index].fd == ndb_socket_get_native(socket));
223 #else
224  return (m_fds[index] == ndb_socket_get_native(socket));
225 #endif
226  }
227 
228  bool has_read(unsigned index) const {
229  assert(index < m_count);
230  assert(m_count <= m_max_count);
231 #ifdef HAVE_POLL
232  return (m_pfds[index].revents & POLLIN);
233 #else
234  return FD_ISSET(m_fds[index], m_read_set);
235 #endif
236  }
237 
238  bool has_write(unsigned index) const {
239  assert(index < m_count);
240  assert(m_count <= m_max_count);
241 #ifdef HAVE_POLL
242  return (m_pfds[index].revents & POLLOUT);
243 #else
244  return FD_ISSET(m_fds[index], m_write_set);
245 #endif
246  }
247 
248  /*
249  Wait for event(s) on socket(s) without retry of interrupted wait
250  */
251  int poll_unsafe(int timeout)
252  {
253 #ifdef HAVE_POLL
254  return ::poll(m_pfds, m_count, timeout);
255 #else
256 
257 #ifdef _WIN32
258  if (m_count == 0)
259  {
260  // Windows does not sleep on 'select' with 0 sockets
261  Sleep(timeout);
262  return 0; // Timeout occured
263  }
264 #endif
265 
266  struct timeval tv;
267  tv.tv_sec = (timeout / 1000);
268  tv.tv_usec = (timeout % 1000) * 1000;
269 
270  return select(m_nfds+1, m_read_set, m_write_set, m_excp_set,
271  timeout == -1 ? NULL : &tv);
272 #endif
273  }
274 
275  /*
276  Wait for event(s) on socket(s), retry interrupted wait
277  if there is still time left
278  */
279  int poll(int timeout)
280  {
281  do
282  {
283  const NDB_TICKS start = NdbTick_CurrentMillisecond();
284 
285  const int res = poll_unsafe(timeout);
286  if (likely(res >= 0))
287  return res; // Default return path
288 
289  const int error = my_socket_errno();
290  if (res == -1 &&
291  (error == EINTR || error == EAGAIN))
292  {
293  // Retry if any time left of timeout
294 
295  // Subtract function call time from remaining timeout
296  timeout -= (int)(NdbTick_CurrentMillisecond() - start);
297 
298  if (timeout <= 0)
299  return 0; // Timeout occured
300 
301  //fprintf(stderr, "Got interrupted, retrying... timeout left: %d\n",
302  // timeout_millis);
303 
304  continue; // Retry interrupted poll
305  }
306 
307  // Unhandled error code, return it
308  return res;
309 
310  } while (true);
311 
312  abort(); // Never reached
313  }
314 
315 };
316 
317 
318 /*
319  ndb_poll
320  - Utility function for waiting on events on one socket
321  with retry of interrupted wait
322 */
323 
324 static inline
325 int
326 ndb_poll(ndb_socket_t sock,
327  bool read, bool write, bool error, int timeout_millis)
328 {
329  ndb_socket_poller poller;
330  (void)poller.add(sock, read, write, error);
331 
332  const int res = poller.poll(timeout_millis);
333  if (res <= 0)
334  return res;
335 
336  assert(res >= 1);
337 
338  return res;
339 }
340 
341 #endif