18 #ifndef NDB_SOCKET_POLLER_H 
   19 #define NDB_SOCKET_POLLER_H 
   21 #include <portlib/NdbTick.h> 
   39   struct pollfd m_one_pfd;
 
   40   struct pollfd* m_pfds;
 
   46   set_max_count(fd_set* 
set, fd_set* static_set, 
unsigned count) {
 
   47     void* ptr = malloc(
sizeof(fd_set) + count-1*
sizeof(SOCKET));
 
   50     if (
set != static_set)
 
   58   set_fd(fd_set* 
set, SOCKET s) {
 
   61     set->fd_array[
set->fd_count++] = s;
 
   69   fd_set m_one_read_set;
 
   70   fd_set m_one_write_set;
 
   71   fd_set m_one_excp_set;
 
   77   ndb_native_socket_t m_one_fd;
 
   78   ndb_native_socket_t* m_fds;
 
   90     , m_read_set(&m_one_read_set)
 
   91     , m_write_set(&m_one_write_set)
 
   92     , m_excp_set(&m_one_excp_set)
 
  103     FD_ZERO(m_write_set);
 
  111     if (m_pfds != &m_one_pfd)
 
  115     if (m_read_set != &m_one_read_set)
 
  117     if (m_write_set != &m_one_write_set)
 
  119     if (m_excp_set != &m_one_excp_set)
 
  122     if (m_fds != &m_one_fd)
 
  127   bool set_max_count(
unsigned count) {
 
  128     if (count <= m_max_count)
 
  134     struct pollfd* pfds = 
new struct pollfd[count];
 
  137     if (m_pfds != &m_one_pfd)
 
  142     if (count > FD_SETSIZE)
 
  145       if (!set_max_count(m_read_set, &m_one_read_set, count) ||
 
  146           !set_max_count(m_write_set, &m_one_write_set, count) ||
 
  147           !set_max_count(m_excp_set, &m_one_excp_set, count))
 
  151     ndb_native_socket_t* fds = 
new ndb_native_socket_t[count];
 
  154     if (m_fds != &m_one_fd)
 
  162   unsigned add(
ndb_socket_t sock, 
bool read, 
bool write, 
bool error) {
 
  163     const unsigned index = m_count;
 
  165     assert(m_count < m_max_count);
 
  166     struct pollfd &pfd = m_pfds[m_count++];
 
  167     pfd.fd = ndb_socket_get_native(sock);
 
  182       set_fd(m_read_set, ndb_socket_get_native(sock));
 
  184       set_fd(m_write_set, ndb_socket_get_native(sock));
 
  186       set_fd(m_excp_set, ndb_socket_get_native(sock));
 
  190     int fd = ndb_socket_get_native(sock);
 
  191     if (fd < 0 || fd >= FD_SETSIZE)
 
  193       fprintf(stderr, 
"Maximum value for FD_SETSIZE: %d exceeded when" 
  194         "trying to add fd: %d", FD_SETSIZE, fd);
 
  199       FD_SET(fd, m_read_set);
 
  201       FD_SET(fd, m_write_set);
 
  203       FD_SET(fd, m_excp_set);
 
  208     m_fds[m_count++] = ndb_socket_get_native(sock);
 
  210     assert(m_count > index);
 
  214   unsigned count(
void)
 const {
 
  219     assert(index < m_count);
 
  220     assert(m_count <= m_max_count);
 
  222     return (m_pfds[index].fd == ndb_socket_get_native(socket));
 
  224     return (m_fds[index] == ndb_socket_get_native(socket));
 
  228   bool has_read(
unsigned index)
 const {
 
  229     assert(index < m_count);
 
  230     assert(m_count <= m_max_count);
 
  232     return (m_pfds[index].revents & POLLIN);
 
  234     return FD_ISSET(m_fds[index], m_read_set);
 
  238   bool has_write(
unsigned index)
 const {
 
  239     assert(index < m_count);
 
  240     assert(m_count <= m_max_count);
 
  242     return (m_pfds[index].revents & POLLOUT);
 
  244     return FD_ISSET(m_fds[index], m_write_set);
 
  251   int poll_unsafe(
int timeout)
 
  254     return ::poll(m_pfds, m_count, timeout);
 
  267     tv.tv_sec  = (timeout / 1000);
 
  268     tv.tv_usec = (timeout % 1000) * 1000;
 
  270     return select(m_nfds+1, m_read_set, m_write_set, m_excp_set,
 
  271                   timeout == -1 ? NULL : &tv);
 
  279   int poll(
int timeout)
 
  283       const NDB_TICKS start = NdbTick_CurrentMillisecond();
 
  285       const int res = poll_unsafe(timeout);
 
  286       if (likely(res >= 0))
 
  289       const int error = my_socket_errno();
 
  291           (error == EINTR || error == EAGAIN))
 
  296         timeout -= (int)(NdbTick_CurrentMillisecond() - start);
 
  327          bool read, 
bool write, 
bool error, 
int timeout_millis)
 
  330   (void)poller.add(sock, read, write, error);
 
  332   const int res = poller.poll(timeout_millis);