Groonga 3.0.9 Source Code Document
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
com.c
Go to the documentation of this file.
1 /* -*- c-basic-offset: 2 -*- */
2 /* Copyright(C) 2009-2012 Brazil
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License version 2.1 as published by the Free Software Foundation.
7 
8  This library is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  Lesser General Public License for more details.
12 
13  You should have received a copy of the GNU Lesser General Public
14  License along with this library; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "groonga_in.h"
19 
20 #include <stdio.h>
21 #include <string.h>
22 #include "ctx_impl.h"
23 
24 #ifdef WIN32
25 # include <ws2tcpip.h>
26 #else
27 # ifdef HAVE_SYS_SOCKET_H
28 # include <sys/socket.h>
29 # endif /* HAVE_SYS_SOCKET_H */
30 # ifdef HAVE_NETINET_IN_H
31 # include <netinet/in.h>
32 # endif /* HAVE_NETINET_IN_H */
33 # ifdef HAVE_NETINET_TCP_H
34 # include <netinet/tcp.h>
35 # endif /* HAVE_NETINET_TCP_H */
36 # ifdef HAVE_SIGNAL_H
37 # include <signal.h>
38 # endif /* HAVE_SIGNAL_H */
39 #endif /* WIN32 */
40 
41 #include "ctx.h"
42 #include "com.h"
43 
44 #ifndef PF_INET
45 #define PF_INET AF_INET
46 #endif /* PF_INET */
47 
48 #ifndef SOL_TCP
49 # ifdef IPPROTO_TCP
50 # define SOL_TCP IPPROTO_TCP
51 # else
52 # define SOL_TCP 6
53 # endif /* IPPROTO_TCP */
54 #endif /* SOL_TCP */
55 
56 #ifndef USE_MSG_MORE
57 # ifdef MSG_MORE
58 # undef MSG_MORE
59 # endif
60 # define MSG_MORE 0
61 #endif /* USE_MSG_MORE */
62 
63 
64 #ifndef USE_MSG_NOSIGNAL
65 # ifdef MSG_NOSIGNAL
66 # undef MSG_NOSIGNAL
67 # endif
68 # define MSG_NOSIGNAL 0
69 #endif /* USE_MSG_NOSIGNAL */
70 /******* grn_com_queue ********/
71 
72 grn_rc
74 {
75  CRITICAL_SECTION_ENTER(q->cs);
76  e->next = NULL;
77  *q->tail = e;
78  q->tail = &e->next;
79  CRITICAL_SECTION_LEAVE(q->cs);
80  /*
81  uint8_t i = q->last + 1;
82  e->next = NULL;
83  if (q->first == i || q->next) {
84  CRITICAL_SECTION_ENTER(q->cs);
85  if (q->first == i || q->next) {
86  *q->tail = e;
87  q->tail = &e->next;
88  } else {
89  q->bins[q->last] = e;
90  q->last = i;
91  }
92  CRITICAL_SECTION_LEAVE(q->cs);
93  } else {
94  q->bins[q->last] = e;
95  q->last = i;
96  }
97  */
98  return GRN_SUCCESS;
99 }
100 
103 {
104  grn_com_queue_entry *e = NULL;
105 
106  CRITICAL_SECTION_ENTER(q->cs);
107  if (q->next) {
108  e = q->next;
109  if (!(q->next = e->next)) { q->tail = &q->next; }
110  }
111  CRITICAL_SECTION_LEAVE(q->cs);
112 
113  /*
114  if (q->first == q->last) {
115  if (q->next) {
116  CRITICAL_SECTION_ENTER(q->cs);
117  e = q->next;
118  if (!(q->next = e->next)) { q->tail = &q->next; }
119  CRITICAL_SECTION_LEAVE(q->cs);
120  }
121  } else {
122  e = q->bins[q->first++];
123  }
124  */
125  return e;
126 }
127 
128 /******* grn_msg ********/
129 
130 grn_obj *
132 {
133  grn_msg *msg = NULL;
134  if (old && (msg = (grn_msg *)grn_com_queue_deque(ctx, old))) {
135  if (msg->ctx != ctx) {
136  ERR(GRN_INVALID_ARGUMENT, "ctx unmatch");
137  return NULL;
138  }
139  GRN_BULK_REWIND(&msg->qe.obj);
140  } else if ((msg = GRN_MALLOCN(grn_msg, 1))) {
141  GRN_OBJ_INIT(&msg->qe.obj, GRN_MSG, 0, GRN_DB_TEXT);
143  msg->ctx = ctx;
144  }
145  msg->qe.next = NULL;
146  msg->u.peer = com;
147  msg->old = old;
148  memset(&msg->header, 0, sizeof(grn_com_header));
149  return (grn_obj *)msg;
150 }
151 
152 grn_obj *
154 {
155  grn_msg *req = (grn_msg *)query, *msg = NULL;
156  if (req && (msg = (grn_msg *)grn_msg_open(ctx, req->u.peer, old))) {
157  msg->edge_id = req->edge_id;
158  msg->header.proto = req->header.proto == GRN_COM_PROTO_MBREQ
160  }
161  return (grn_obj *)msg;
162 }
163 
164 grn_rc
166 {
167  grn_msg *msg = (grn_msg *)obj;
168  if (ctx == msg->ctx) { return grn_obj_close(ctx, obj); }
169  return grn_com_queue_enque(ctx, msg->old, (grn_com_queue_entry *)msg);
170 }
171 
172 grn_rc
174  uint16_t status, uint32_t key_size, uint8_t extra_size)
175 {
176  grn_com_header *header = &((grn_msg *)obj)->header;
177  header->status = htons(status);
178  header->keylen = htons(key_size);
179  header->level = extra_size;
180  return GRN_SUCCESS;
181 }
182 
183 grn_rc
184 grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags)
185 {
186  grn_rc rc;
187  grn_msg *m = (grn_msg *)msg;
188  grn_com *peer = m->u.peer;
189  grn_com_header *header = &m->header;
190  if (GRN_COM_QUEUE_EMPTYP(&peer->new_)) {
191  switch (header->proto) {
192  case GRN_COM_PROTO_HTTP :
193  {
194  ssize_t ret;
195  ret = send(peer->fd, GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), MSG_NOSIGNAL);
196  if (ret == -1) { SERR("send"); }
197  if (ctx->rc != GRN_OPERATION_WOULD_BLOCK) {
199  return ctx->rc;
200  }
201  }
202  break;
203  case GRN_COM_PROTO_GQTP :
204  {
205  if ((flags & GRN_CTX_MORE)) { flags |= GRN_CTX_QUIET; }
206  if (ctx->stat == GRN_CTX_QUIT) { flags |= GRN_CTX_QUIT; }
207  header->qtype = (uint8_t) ctx->impl->output_type;
208  header->keylen = 0;
209  header->level = 0;
210  header->flags = flags;
211  header->status = htons((uint16_t)ctx->rc);
212  header->opaque = 0;
213  header->cas = 0;
214  //todo : MSG_DONTWAIT
215  rc = grn_com_send(ctx, peer, header,
216  GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), 0);
217  if (rc != GRN_OPERATION_WOULD_BLOCK) {
219  return rc;
220  }
221  }
222  break;
223  case GRN_COM_PROTO_MBREQ :
225  case GRN_COM_PROTO_MBRES :
226  rc = grn_com_send(ctx, peer, header,
227  GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg),
228  (flags & GRN_CTX_MORE) ? MSG_MORE :0);
229  if (rc != GRN_OPERATION_WOULD_BLOCK) {
231  return rc;
232  }
233  break;
234  default :
235  return GRN_INVALID_ARGUMENT;
236  }
237  }
238  MUTEX_LOCK(peer->ev->mutex);
239  rc = grn_com_queue_enque(ctx, &peer->new_, (grn_com_queue_entry *)msg);
240  COND_SIGNAL(peer->ev->cond);
241  MUTEX_UNLOCK(peer->ev->mutex);
242  return rc;
243 }
244 
245 /******* grn_com ********/
246 
247 grn_rc
249 {
250 #ifdef WIN32
251  WSADATA wd;
252  if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) {
253  grn_ctx *ctx = &grn_gctx;
254  SERR("WSAStartup");
255  }
256 #else /* WIN32 */
257 #ifndef USE_MSG_NOSIGNAL
258  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
259  grn_ctx *ctx = &grn_gctx;
260  SERR("signal");
261  }
262 #endif /* USE_MSG_NOSIGNAL */
263 #endif /* WIN32 */
264  return grn_gctx.rc;
265 }
266 
267 void
269 {
270 #ifdef WIN32
271  WSACleanup();
272 #endif /* WIN32 */
273 }
274 
275 grn_rc
276 grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size)
277 {
278  ev->max_nevents = max_nevents;
279  if ((ev->hash = grn_hash_create(ctx, NULL, sizeof(grn_sock), data_size, 0))) {
280  MUTEX_INIT(ev->mutex);
281  COND_INIT(ev->cond);
283 #ifndef USE_SELECT
284 #ifdef USE_EPOLL
285  if ((ev->events = GRN_MALLOC(sizeof(struct epoll_event) * max_nevents))) {
286  if ((ev->epfd = epoll_create(max_nevents)) != -1) {
287  goto exit;
288  } else {
289  SERR("epoll_create");
290  }
291  GRN_FREE(ev->events);
292  }
293 #else /* USE_EPOLL */
294 #ifdef USE_KQUEUE
295  if ((ev->events = GRN_MALLOC(sizeof(struct kevent) * max_nevents))) {
296  if ((ev->kqfd = kqueue()) != -1) {
297  goto exit;
298  } else {
299  SERR("kqueue");
300  }
301  GRN_FREE(ev->events);
302  }
303 #else /* USE_KQUEUE */
304  if ((ev->events = GRN_MALLOC(sizeof(struct pollfd) * max_nevents))) {
305  goto exit;
306  }
307 #endif /* USE_KQUEUE*/
308 #endif /* USE_EPOLL */
309  grn_hash_close(ctx, ev->hash);
310  ev->hash = NULL;
311  ev->events = NULL;
312 #else /* USE_SELECT */
313 
314 #endif /* USE_SELECT */
315  }
316 exit :
317  return ctx->rc;
318 }
319 
320 grn_rc
322 {
323  grn_obj *msg;
324  while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &ev->recv_old))) {
325  grn_msg_close(ctx, msg);
326  }
327  if (ev->hash) { grn_hash_close(ctx, ev->hash); }
328 #ifndef USE_SELECT
329  if (ev->events) { GRN_FREE(ev->events); }
330 #ifdef USE_EPOLL
331  GRN_CLOSE(ev->epfd);
332 #endif /* USE_EPOLL */
333 #ifdef USE_KQUEUE
334  GRN_CLOSE(ev->kqfd);
335 #endif /* USE_KQUEUE*/
336 #endif /* USE_SELECT */
337  return GRN_SUCCESS;
338 }
339 
340 grn_rc
341 grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
342 {
343  grn_com *c;
344  /* todo : expand events */
345  if (!ev || *ev->hash->n_entries == ev->max_nevents) {
346  if (ev) { GRN_LOG(ctx, GRN_LOG_ERROR, "too many connections (%d)", ev->max_nevents); }
347  return GRN_INVALID_ARGUMENT;
348  }
349 #ifdef USE_EPOLL
350  {
351  struct epoll_event e;
352  memset(&e, 0, sizeof(struct epoll_event));
353  e.data.fd = (fd);
354  e.events = (__uint32_t) events;
355  if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) {
356  SERR("epoll_ctl");
357  return ctx->rc;
358  }
359  }
360 #endif /* USE_EPOLL*/
361 #ifdef USE_KQUEUE
362  {
363  struct kevent e;
364  /* todo: udata should have fd */
365  EV_SET(&e, (fd), events, EV_ADD, 0, 0, NULL);
366  if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
367  SERR("kevent");
368  return ctx->rc;
369  }
370  }
371 #endif /* USE_KQUEUE */
372  {
373  if (grn_hash_add(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c, NULL)) {
374  c->ev = ev;
375  c->fd = fd;
376  c->events = events;
377  if (com) { *com = c; }
378  }
379  }
380  return ctx->rc;
381 }
382 
383 grn_rc
384 grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
385 {
386  grn_com *c;
387  if (!ev) { return GRN_INVALID_ARGUMENT; }
388  if (grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c)) {
389  if (c->fd != fd) {
390  GRN_LOG(ctx, GRN_LOG_ERROR, "grn_com_event_mod fd unmatch %d != %d", c->fd, fd);
391  return GRN_OBJECT_CORRUPT;
392  }
393  if (com) { *com = c; }
394  if (c->events != events) {
395 #ifdef USE_EPOLL
396  struct epoll_event e;
397  memset(&e, 0, sizeof(struct epoll_event));
398  e.data.fd = (fd);
399  e.events = (__uint32_t) events;
400  if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) {
401  SERR("epoll_ctl");
402  return ctx->rc;
403  }
404 #endif /* USE_EPOLL*/
405 #ifdef USE_KQUEUE
406  // experimental
407  struct kevent e[2];
408  EV_SET(&e[0], (fd), GRN_COM_POLLIN|GRN_COM_POLLOUT, EV_DELETE, 0, 0, NULL);
409  EV_SET(&e[1], (fd), events, EV_ADD, 0, 0, NULL);
410  if (kevent(ev->kqfd, e, 2, NULL, 0, NULL) == -1) {
411  SERR("kevent");
412  return ctx->rc;
413  }
414 #endif /* USE_KQUEUE */
415  c->events = events;
416  }
417  return GRN_SUCCESS;
418  }
419  return GRN_INVALID_ARGUMENT;
420 }
421 
422 grn_rc
424 {
425  if (!ev) { return GRN_INVALID_ARGUMENT; }
426  {
427  grn_com *c;
428  grn_id id = grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c);
429  if (id) {
430 #ifdef USE_EPOLL
431  if (!c->closed) {
432  struct epoll_event e;
433  memset(&e, 0, sizeof(struct epoll_event));
434  e.data.fd = fd;
435  e.events = c->events;
436  if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) {
437  SERR("epoll_ctl");
438  return ctx->rc;
439  }
440  }
441 #endif /* USE_EPOLL*/
442 #ifdef USE_KQUEUE
443  struct kevent e;
444  EV_SET(&e, (fd), c->events, EV_DELETE, 0, 0, NULL);
445  if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
446  SERR("kevent");
447  return ctx->rc;
448  }
449 #endif /* USE_KQUEUE */
450  return grn_hash_delete_by_id(ctx, ev->hash, id, NULL);
451  } else {
452  GRN_LOG(ctx, GRN_LOG_ERROR, "%04x| fd(%d) not found in ev(%p)", getpid(), fd, ev);
453  return GRN_INVALID_ARGUMENT;
454  }
455  }
456 }
457 
458 #define LISTEN_BACKLOG 0x1000
459 
460 grn_rc
462 {
463  grn_com *com = ev->acceptor;
464 
465  if (com->accepting) {return ctx->rc;}
466 
468  if (!grn_com_event_mod(ctx, ev, com->fd, GRN_COM_POLLIN, NULL)) {
469  if (listen(com->fd, LISTEN_BACKLOG) == 0) {
470  com->accepting = GRN_TRUE;
471  } else {
472  SERR("listen - start accept");
473  }
474  }
475  GRN_API_RETURN(ctx->rc);
476 }
477 
478 grn_rc
480 {
481  grn_com *com = ev->acceptor;
482 
483  if (!com->accepting) {return ctx->rc;}
484 
486  if (!grn_com_event_mod(ctx, ev, com->fd, 0, NULL)) {
487  if (listen(com->fd, 0) == 0) {
488  com->accepting = GRN_FALSE;
489  } else {
490  SERR("listen - disable accept");
491  }
492  }
493  GRN_API_RETURN(ctx->rc);
494 }
495 
496 static void
497 grn_com_receiver(grn_ctx *ctx, grn_com *com)
498 {
499  grn_com_event *ev = com->ev;
500  ERRCLR(ctx);
501  if (ev->acceptor == com) {
502  grn_com *ncs;
503  grn_sock fd = accept(com->fd, NULL, NULL);
504  if (fd == -1) {
505  if (errno == EMFILE) {
506  grn_com_event_stop_accept(ctx, ev);
507  } else {
508  SERR("accept");
509  }
510  return;
511  }
512  if (grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, (grn_com **)&ncs)) {
513  grn_sock_close(fd);
514  return;
515  }
516  ncs->has_sid = 0;
517  ncs->closed = 0;
518  ncs->opaque = NULL;
519  GRN_COM_QUEUE_INIT(&ncs->new_);
520  // GRN_LOG(ctx, GRN_LOG_NOTICE, "accepted (%d)", fd);
521  return;
522  } else {
523  grn_msg *msg = (grn_msg *)grn_msg_open(ctx, com, &ev->recv_old);
524  grn_com_recv(ctx, msg->u.peer, &msg->header, (grn_obj *)msg);
525  if (msg->u.peer /* is_edge_request(msg)*/) {
526  memcpy(&msg->edge_id, &ev->curr_edge_id, sizeof(grn_com_addr));
527  if (!com->has_sid) {
528  com->has_sid = 1;
529  com->sid = ev->curr_edge_id.sid++;
530  }
531  msg->edge_id.sid = com->sid;
532  }
533  msg->acceptor = ev->acceptor;
534  ev->msg_handler(ctx, (grn_obj *)msg);
535  }
536 }
537 
538 grn_rc
539 grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout)
540 {
541  int nevents;
542  grn_com *com;
543 #ifdef USE_SELECT
544  uint32_t dummy;
545  grn_sock *pfd;
546  int nfds = 0;
547  fd_set rfds;
548  fd_set wfds;
549  struct timeval tv;
550  if (timeout >= 0) {
551  tv.tv_sec = timeout / 1000;
552  tv.tv_usec = (timeout % 1000) * 1000;
553  }
554  FD_ZERO(&rfds);
555  FD_ZERO(&wfds);
556  ctx->errlvl = GRN_OK;
557  ctx->rc = GRN_SUCCESS;
558  GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
559  if ((com->events & GRN_COM_POLLIN)) { FD_SET(*pfd, &rfds); }
560  if ((com->events & GRN_COM_POLLOUT)) { FD_SET(*pfd, &wfds); }
561  if (*pfd > nfds) { nfds = *pfd; }
562  });
563  nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL);
564  if (nevents < 0) {
565  SERR("select");
566  if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { ERRCLR(ctx); }
567  return ctx->rc;
568  }
569  if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "select returns 0 events"); }
570  GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
571  if (FD_ISSET(*pfd, &rfds)) { grn_com_receiver(ctx, com); }
572  });
573 #else /* USE_SELECT */
574 #ifdef USE_EPOLL
575  struct epoll_event *ep;
576  ctx->errlvl = GRN_OK;
577  ctx->rc = GRN_SUCCESS;
578  nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout);
579  if (nevents < 0) {
580  SERR("epoll_wait");
581  }
582 #else /* USE_EPOLL */
583 #ifdef USE_KQUEUE
584  struct kevent *ep;
585  struct timespec tv;
586  if (timeout >= 0) {
587  tv.tv_sec = timeout / 1000;
588  tv.tv_nsec = (timeout % 1000) * 1000;
589  }
590  nevents = kevent(ev->kqfd, NULL, 0, ev->events, ev->max_nevents, &tv);
591  if (nevents < 0) {
592  SERR("kevent");
593  }
594 #else /* USE_KQUEUE */
595  uint32_t dummy;
596  int nfd = 0, *pfd;
597  struct pollfd *ep = ev->events;
598  ctx->errlvl = GRN_OK;
599  ctx->rc = GRN_SUCCESS;
600  GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
601  ep->fd = *pfd;
602  // ep->events =(short) com->events;
603  ep->events = POLLIN;
604  ep->revents = 0;
605  ep++;
606  nfd++;
607  });
608  nevents = poll(ev->events, nfd, timeout);
609  if (nevents < 0) {
610  SERR("poll");
611  }
612 #endif /* USE_KQUEUE */
613 #endif /* USE_EPOLL */
614  if (ctx->rc != GRN_SUCCESS) {
615  if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
616  ERRCLR(ctx);
617  }
618  return ctx->rc;
619  }
620  if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "poll returns 0 events"); }
621  for (ep = ev->events; nevents; ep++) {
622  int efd;
623 #ifdef USE_EPOLL
624  efd = ep->data.fd;
625  nevents--;
626  // todo : com = ep->data.ptr;
627  if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
628  struct epoll_event e;
629  GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
630  memset(&e, 0, sizeof(struct epoll_event));
631  e.data.fd = efd;
632  e.events = ep->events;
633  if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) { SERR("epoll_ctl"); }
634  if (grn_sock_close(efd) == -1) { SERR("close"); }
635  continue;
636  }
637  if ((ep->events & GRN_COM_POLLIN)) { grn_com_receiver(ctx, com); }
638 #else /* USE_EPOLL */
639 #ifdef USE_KQUEUE
640  efd = ep->ident;
641  nevents--;
642  // todo : com = ep->udata;
643  if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
644  struct kevent e;
645  GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->set", efd);
646  EV_SET(&e, efd, ep->filter, EV_DELETE, 0, 0, NULL);
647  if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { SERR("kevent"); }
648  if (grn_sock_close(efd) == -1) { SERR("close"); }
649  continue;
650  }
651  if ((ep->filter == GRN_COM_POLLIN)) { grn_com_receiver(ctx, com); }
652 #else
653  efd = ep->fd;
654  if (!(ep->events & ep->revents)) { continue; }
655  nevents--;
656  if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
657  GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
658  if (grn_sock_close(efd) == -1) { SERR("close"); }
659  continue;
660  }
661  if ((ep->revents & GRN_COM_POLLIN)) { grn_com_receiver(ctx, com); }
662 #endif /* USE_KQUEUE */
663 #endif /* USE_EPOLL */
664  }
665 #endif /* USE_SELECT */
666  /* todo :
667  while (!(msg = (grn_com_msg *)grn_com_queue_deque(&recv_old))) {
668  grn_msg_close(ctx, msg);
669  }
670  */
671  return GRN_SUCCESS;
672 }
673 
674 grn_rc
675 grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags)
676 {
677  ssize_t ret;
678  grn_obj buf;
679  GRN_TEXT_INIT(&buf, 0);
680  GRN_TEXT_PUTS(ctx, &buf, "GET ");
681  grn_bulk_write(ctx, &buf, path, path_len);
682  GRN_TEXT_PUTS(ctx, &buf, " HTTP/1.0\r\n\r\n");
683  // todo : refine
684  if ((ret = send(cs->fd, GRN_BULK_HEAD(&buf), GRN_BULK_VSIZE(&buf), MSG_NOSIGNAL|flags)) == -1) {
685  SERR("send");
686  }
687  if (ret != GRN_BULK_VSIZE(&buf)) {
688  GRN_LOG(ctx, GRN_LOG_NOTICE, "send %d != %d", (int)ret, (int)GRN_BULK_VSIZE(&buf));
689  }
690  grn_obj_close(ctx, &buf);
691  return ctx->rc;
692 }
693 
694 grn_rc
696  grn_com_header *header, const char *body, uint32_t size, int flags)
697 {
698  grn_rc rc = GRN_SUCCESS;
699  size_t whole_size = sizeof(grn_com_header) + size;
700  ssize_t ret;
701  header->size = htonl(size);
702  GRN_LOG(ctx, GRN_LOG_INFO, "send (%d,%x,%d,%02x,%02x,%04x)", size, header->flags, header->proto, header->qtype, header->level, header->status);
703 
704  if (size) {
705 #ifdef WIN32
706  WSABUF wsabufs[2];
707  DWORD n_sent;
708  wsabufs[0].buf = (char *)header;
709  wsabufs[0].len = sizeof(grn_com_header);
710  wsabufs[1].buf = (char *)body;
711  wsabufs[1].len = size;
712  if (WSASend(cs->fd, wsabufs, 2, &n_sent, 0, NULL, NULL) == SOCKET_ERROR) {
713  SERR("WSASend");
714  }
715  ret = n_sent;
716 #else /* WIN32 */
717  struct iovec msg_iov[2];
718  struct msghdr msg;
719  msg.msg_name = NULL;
720  msg.msg_namelen = 0;
721  msg.msg_iov = msg_iov;
722  msg.msg_iovlen = 2;
723  msg.msg_control = NULL;
724  msg.msg_controllen = 0;
725  msg.msg_flags = 0;
726  msg_iov[0].iov_base = header;
727  msg_iov[0].iov_len = sizeof(grn_com_header);
728  msg_iov[1].iov_base = (char *)body;
729  msg_iov[1].iov_len = size;
730  if ((ret = sendmsg(cs->fd, &msg, MSG_NOSIGNAL|flags)) == -1) {
731  SERR("sendmsg");
732  rc = ctx->rc;
733  }
734 #endif /* WIN32 */
735  } else {
736  if ((ret = send(cs->fd, (const void *)header, whole_size, MSG_NOSIGNAL|flags)) == -1) {
737  SERR("send");
738  rc = ctx->rc;
739  }
740  }
741  if (ret != whole_size) {
742  GRN_LOG(ctx, GRN_LOG_ERROR, "sendmsg(%d): %" GRN_FMT_LLD " < %" GRN_FMT_LLU,
743  cs->fd, (long long int)ret, (unsigned long long int)whole_size);
744  rc = ctx->rc;
745  }
746  return rc;
747 }
748 
749 #define RETRY_MAX 10
750 
751 static const char *
752 scan_delimiter(const char *p, const char *e)
753 {
754  while (p + 4 <= e) {
755  if (p[3] == '\n') {
756  if (p[2] == '\r') {
757  if (p[1] == '\n') {
758  if (p[0] == '\r') { return p + 4; } else { p += 2; }
759  } else { p += 2; }
760  } else { p += 4; }
761  } else { p += p[3] == '\r' ? 1 : 4; }
762  }
763  return NULL;
764 }
765 
766 #define BUFSIZE 4096
767 
768 static grn_rc
769 grn_com_recv_text(grn_ctx *ctx, grn_com *com,
770  grn_com_header *header, grn_obj *buf, ssize_t ret)
771 {
772  const char *p;
773  int retry = 0;
774  grn_bulk_write(ctx, buf, (char *)header, ret);
775  if ((p = scan_delimiter(GRN_BULK_HEAD(buf), GRN_BULK_CURR(buf)))) {
776  // todo : keep rest of message
777  GRN_BULK_SET_CURR(buf, p);
778  header->qtype = *GRN_BULK_HEAD(buf);
779  header->proto = GRN_COM_PROTO_HTTP;
780  header->size = GRN_BULK_VSIZE(buf);
781  goto exit;
782  }
783  for (;;) {
784  if (grn_bulk_reserve(ctx, buf, BUFSIZE)) { return ctx->rc; }
785  if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
786  SERR("recv text");
787  if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
789  ERRCLR(ctx);
790  continue;
791  }
792  goto exit;
793  }
794  if (ret) {
795  off_t o = GRN_BULK_VSIZE(buf);
796  p = GRN_BULK_CURR(buf);
797  if ((p = scan_delimiter(p - (o > 3 ? 3 : o), p + ret))) {
798  GRN_BULK_SET_CURR(buf, p);
799  // todo : keep rest of message
800  break;
801  } else {
802  GRN_BULK_INCR_LEN(buf, ret);
803  }
804  } else {
805  if (++retry > RETRY_MAX) {
806  // ERR(GRN_RETRY_MAX, "retry max in recv text");
807  goto exit;
808  }
809  }
810  }
811  header->qtype = *GRN_BULK_HEAD(buf);
812  header->proto = GRN_COM_PROTO_HTTP;
813  header->size = GRN_BULK_VSIZE(buf);
814 exit :
815  if (header->qtype == 'H') {
816  //todo : refine
817  /*
818  GRN_BULK_REWIND(buf);
819  grn_bulk_reserve(ctx, buf, BUFSIZE);
820  if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
821  SERR("recv text body");
822  } else {
823  GRN_BULK_CURR(buf) += ret;
824  }
825  */
826  }
827  return ctx->rc;
828 }
829 
830 grn_rc
832 {
833  ssize_t ret;
834  int retry = 0;
835  byte *p = (byte *)header;
836  size_t rest = sizeof(grn_com_header);
837  do {
838  if ((ret = recv(com->fd, p, rest, 0)) < 0) {
839  SERR("recv size");
840  GRN_LOG(ctx, GRN_LOG_ERROR, "recv error (%d)", com->fd);
841  if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
843  ERRCLR(ctx);
844  continue;
845  }
846  goto exit;
847  }
848  if (ret) {
849  if (header->proto < 0x80) {
850  return grn_com_recv_text(ctx, com, header, buf, ret);
851  }
852  rest -= ret, p += ret;
853  } else {
854  if (++retry > RETRY_MAX) {
855  // ERR(GRN_RETRY_MAX, "retry max in recv header (%d)", com->fd);
856  goto exit;
857  }
858  }
859  } while (rest);
860  GRN_LOG(ctx, GRN_LOG_INFO, "recv (%d,%x,%d,%02x,%02x,%04x)", ntohl(header->size), header->flags, header->proto, header->qtype, header->level, header->status);
861  {
862  uint8_t proto = header->proto;
863  size_t value_size = ntohl(header->size);
864  GRN_BULK_REWIND(buf);
865  switch (proto) {
866  case GRN_COM_PROTO_GQTP :
867  case GRN_COM_PROTO_MBREQ :
868  if (GRN_BULK_WSIZE(buf) < value_size) {
869  if ((grn_bulk_resize(ctx, buf, value_size))) {
870  goto exit;
871  }
872  }
873  retry = 0;
874  for (rest = value_size; rest;) {
875  if ((ret = recv(com->fd, GRN_BULK_CURR(buf), rest, MSG_WAITALL)) < 0) {
876  SERR("recv body");
877  if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
879  ERRCLR(ctx);
880  continue;
881  }
882  goto exit;
883  }
884  if (ret) {
885  rest -= ret;
886  GRN_BULK_INCR_LEN(buf, ret);
887  } else {
888  if (++retry > RETRY_MAX) {
889  // ERR(GRN_RETRY_MAX, "retry max in recv body");
890  goto exit;
891  }
892  }
893  }
894  break;
895  default :
896  GRN_LOG(ctx, GRN_LOG_ERROR, "illegal header: %d", proto);
897  ctx->rc = GRN_INVALID_FORMAT;
898  goto exit;
899  }
900  }
901 exit :
902  return ctx->rc;
903 }
904 
905 grn_com *
906 grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port)
907 {
908  grn_sock fd = -1;
909  grn_com *cs = NULL;
910 
911  struct addrinfo hints, *addrinfo_list, *addrinfo_ptr;
912  char port_string[16];
913  int getaddrinfo_result;
914 
915  memset(&hints, 0, sizeof(hints));
916  hints.ai_family = AF_UNSPEC;
917  hints.ai_socktype = SOCK_STREAM;
918 #ifdef AI_NUMERICSERV
919  hints.ai_flags = AI_NUMERICSERV;
920 #endif
921  snprintf(port_string, sizeof(port_string), "%d", port);
922 
923  getaddrinfo_result = getaddrinfo(dest, port_string, &hints, &addrinfo_list);
924  if (getaddrinfo_result != 0) {
925  switch (getaddrinfo_result) {
926 #ifdef EAI_MEMORY
927  case EAI_MEMORY:
928  ERR(GRN_NO_MEMORY_AVAILABLE, "getaddrinfo: <%s:%s>: %s",
929  dest, port_string, gai_strerror(getaddrinfo_result));
930  break;
931 #endif
932 #ifdef EAI_SYSTEM
933  case EAI_SYSTEM:
934  SERR("getaddrinfo");
935  break;
936 #endif
937  default:
938  ERR(GRN_INVALID_ARGUMENT, "getaddrinfo: <%s:%s>: %s",
939  dest, port_string, gai_strerror(getaddrinfo_result));
940  break;
941  }
942  return NULL;
943  }
944 
945  for (addrinfo_ptr = addrinfo_list; addrinfo_ptr;
946  addrinfo_ptr = addrinfo_ptr->ai_next) {
947  static const int value = 1;
948  fd = socket(addrinfo_ptr->ai_family, addrinfo_ptr->ai_socktype,
949  addrinfo_ptr->ai_protocol);
950  if (fd == -1) {
951  SERR("socket");
952  } else if (setsockopt(fd, 6, TCP_NODELAY, &value, sizeof(value))) {
953  SERR("setsockopt");
954  grn_sock_close(fd);
955  } else if (connect(fd, addrinfo_ptr->ai_addr, addrinfo_ptr->ai_addrlen)) {
956  SERR("connect");
957  grn_sock_close(fd);
958  } else {
959  break;
960  }
961  }
962 
963  freeaddrinfo(addrinfo_list);
964 
965  if (!addrinfo_ptr) {
966  return NULL;
967  }
968  ctx->errlvl = GRN_OK;
969  ctx->rc = GRN_SUCCESS;
970 
971  if (ev) {
972  grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, &cs);
973  } else {
974  cs = GRN_CALLOC(sizeof(grn_com));
975  if (cs) {
976  cs->fd = fd;
977  }
978  }
979  if (!cs) {
980  grn_sock_close(fd);
981  }
982  return cs;
983 }
984 
985 void
987 {
988  grn_sock fd = com->fd;
989  if (shutdown(fd, SHUT_RDWR) == -1) { /* SERR("shutdown"); */ }
990  if (grn_sock_close(fd) == -1) {
991  SERR("close");
992  } else {
993  com->closed = 1;
994  }
995 }
996 
997 grn_rc
999 {
1000  grn_sock fd = com->fd;
1001  grn_com_event *ev = com->ev;
1002  if (ev) {
1003  grn_com *acceptor = ev->acceptor;
1004  grn_com_event_del(ctx, ev, fd);
1005  if (acceptor) { grn_com_event_start_accept(ctx, ev); }
1006  }
1007  if (!com->closed) { grn_com_close_(ctx, com); }
1008  if (!ev) { GRN_FREE(com); }
1009  return GRN_SUCCESS;
1010 }
1011 
1012 grn_rc
1014  const char *bind_address, int port, grn_msg_handler *func,
1015  struct hostent *he)
1016 {
1017  grn_sock lfd = -1;
1018  grn_com *cs = NULL;
1019  int getaddrinfo_result;
1020  struct addrinfo *bind_address_info = NULL;
1021  struct addrinfo hints;
1022  char port_string[6]; /* ceil(log10(65535)) + 1 ('\0')*/
1023 
1024  GRN_API_ENTER;
1025  if (!bind_address) {
1026  bind_address = "0.0.0.0";
1027  }
1028  snprintf(port_string, sizeof(port_string), "%d", port);
1029  memset(&hints, 0, sizeof(struct addrinfo));
1030  hints.ai_family = PF_UNSPEC;
1031  hints.ai_socktype = SOCK_STREAM;
1032 #ifdef AI_NUMERICSERV
1033  hints.ai_flags = AI_NUMERICSERV;
1034 #endif
1035  getaddrinfo_result = getaddrinfo(bind_address, port_string,
1036  &hints, &bind_address_info);
1037  if (getaddrinfo_result != 0) {
1038  switch (getaddrinfo_result) {
1039 #ifdef EAI_MEMORY
1040  case EAI_MEMORY:
1042  "getaddrinfo: <%s:%s>: %s",
1043  bind_address, port_string, gai_strerror(getaddrinfo_result));
1044  break;
1045 #endif
1046 #ifdef EAI_SYSTEM
1047  case EAI_SYSTEM:
1048  SERR("getaddrinfo");
1049  break;
1050 #endif
1051  default:
1053  "getaddrinfo: <%s:%s>: %s",
1054  bind_address, port_string, gai_strerror(getaddrinfo_result));
1055  break;
1056  }
1057  goto exit;
1058  }
1059  if ((lfd = socket(bind_address_info->ai_family, SOCK_STREAM, 0)) == -1) {
1060  SERR("socket");
1061  goto exit;
1062  }
1063  memcpy(&ev->curr_edge_id.addr, he->h_addr, he->h_length);
1064  ev->curr_edge_id.port = htons(port);
1065  ev->curr_edge_id.sid = 0;
1066  {
1067  int v = 1;
1068  if (setsockopt(lfd, SOL_TCP, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) {
1069  SERR("setsockopt");
1070  goto exit;
1071  }
1072  if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (void *) &v, sizeof(int)) == -1) {
1073  SERR("setsockopt");
1074  goto exit;
1075  }
1076  }
1077  if (bind(lfd, bind_address_info->ai_addr, bind_address_info->ai_addrlen) < 0) {
1078  SERR("bind");
1079  goto exit;
1080  }
1081  if (listen(lfd, LISTEN_BACKLOG) < 0) {
1082  SERR("listen");
1083  goto exit;
1084  }
1085  if (ev) {
1086  if (grn_com_event_add(ctx, ev, lfd, GRN_COM_POLLIN, &cs)) { goto exit; }
1087  ev->acceptor = cs;
1088  ev->msg_handler = func;
1089  cs->has_sid = 0;
1090  cs->closed = 0;
1091  cs->opaque = NULL;
1092  GRN_COM_QUEUE_INIT(&cs->new_);
1093  } else {
1094  if (!(cs = GRN_MALLOC(sizeof(grn_com)))) { goto exit; }
1095  cs->fd = lfd;
1096  }
1097  cs->accepting = GRN_TRUE;
1098 exit :
1099  if (!cs && lfd != 1) { grn_sock_close(lfd); }
1100  if (bind_address_info) { freeaddrinfo(bind_address_info); }
1101  GRN_API_RETURN(ctx->rc);
1102 }
1103 
1104 
1106 void (*grn_dispatcher)(grn_ctx *ctx, grn_edge *edge);
1107 
1108 void
1109 grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge))
1110 {
1111  grn_edges = grn_hash_create(ctx, NULL, sizeof(grn_com_addr), sizeof(grn_edge), 0);
1112  grn_dispatcher = dispatcher;
1113 }
1114 
1115 void
1117 {
1118  grn_hash_close(ctx, grn_edges);
1119 }
1120 
1121 grn_edge *
1122 grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added)
1123 {
1124  if (grn_io_lock(ctx, grn_edges->io, 10000000)) {
1125  return NULL;
1126  } else {
1127  grn_edge *edge;
1128  grn_id id = grn_hash_add(ctx, grn_edges, addr, sizeof(grn_com_addr),
1129  (void **)&edge, added);
1130  grn_io_unlock(grn_edges->io);
1131  if (id) { edge->id = id; }
1132  return edge;
1133  }
1134 }
1135 
1136 void
1138 {
1139  if (!grn_io_lock(ctx, grn_edges->io, 10000000)) {
1140  grn_hash_delete_by_id(ctx, grn_edges, edge->id, NULL);
1141  grn_io_unlock(grn_edges->io);
1142  }
1143 }
1144 
1145 grn_edge *
1147 {
1148  int added;
1149  grn_edge *edge = grn_edges_add(ctx, addr, &added);
1150  if (added) {
1151  grn_ctx_init(&edge->ctx, 0);
1152  GRN_COM_QUEUE_INIT(&edge->recv_new);
1153  GRN_COM_QUEUE_INIT(&edge->send_old);
1154  edge->com = NULL;
1155  edge->stat = 0 /*EDGE_IDLE*/;
1156  edge->flags = GRN_EDGE_COMMUNICATOR;
1157  }
1158  return edge;
1159 }
1160 
1161 void
1163 {
1164  grn_com_queue_enque(ctx, &edge->recv_new, (grn_com_queue_entry *)msg);
1165  grn_dispatcher(ctx, edge);
1166 }