25 # include <ws2tcpip.h>
27 # ifdef HAVE_SYS_SOCKET_H
28 # include <sys/socket.h>
30 # ifdef HAVE_NETINET_IN_H
31 # include <netinet/in.h>
33 # ifdef HAVE_NETINET_TCP_H
34 # include <netinet/tcp.h>
45 #define PF_INET AF_INET
50 # define SOL_TCP IPPROTO_TCP
64 #ifndef USE_MSG_NOSIGNAL
68 # define MSG_NOSIGNAL 0
75 CRITICAL_SECTION_ENTER(q->
cs);
79 CRITICAL_SECTION_LEAVE(q->
cs);
106 CRITICAL_SECTION_ENTER(q->
cs);
111 CRITICAL_SECTION_LEAVE(q->
cs);
135 if (msg->
ctx != ctx) {
174 uint16_t status, uint32_t key_size, uint8_t extra_size)
177 header->
status = htons(status);
178 header->
keylen = htons(key_size);
179 header->
level = extra_size;
191 switch (header->
proto) {
196 if (ret == -1) {
SERR(
"send"); }
210 header->
flags = flags;
211 header->
status = htons((uint16_t)ctx->
rc);
252 if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) {
257 #ifndef USE_MSG_NOSIGNAL
258 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
280 MUTEX_INIT(ev->
mutex);
286 if ((ev->epfd = epoll_create(max_nevents)) != -1) {
289 SERR(
"epoll_create");
296 if ((ev->kqfd = kqueue()) != -1) {
351 struct epoll_event e;
352 memset(&e, 0,
sizeof(
struct epoll_event));
354 e.events = (__uint32_t) events;
355 if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) {
365 EV_SET(&e, (fd), events, EV_ADD, 0, 0, NULL);
366 if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
377 if (com) { *com = c; }
393 if (com) { *com = c; }
394 if (c->
events != events) {
396 struct epoll_event e;
397 memset(&e, 0,
sizeof(
struct epoll_event));
399 e.events = (__uint32_t) events;
400 if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) {
409 EV_SET(&e[1], (fd), events, EV_ADD, 0, 0, NULL);
410 if (kevent(ev->kqfd, e, 2, NULL, 0, NULL) == -1) {
432 struct epoll_event e;
433 memset(&e, 0,
sizeof(
struct epoll_event));
436 if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) {
444 EV_SET(&e, (fd), c->
events, EV_DELETE, 0, 0, NULL);
445 if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
458 #define LISTEN_BACKLOG 0x1000
472 SERR(
"listen - start accept");
487 if (listen(com->
fd, 0) == 0) {
490 SERR(
"listen - disable accept");
505 if (errno == EMFILE) {
551 tv.tv_sec = timeout / 1000;
552 tv.tv_usec = (timeout % 1000) * 1000;
561 if (*pfd > nfds) { nfds = *pfd; }
563 nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL);
571 if (FD_ISSET(*pfd, &rfds)) { grn_com_receiver(ctx, com); }
575 struct epoll_event *ep;
587 tv.tv_sec = timeout / 1000;
588 tv.tv_nsec = (timeout % 1000) * 1000;
597 struct pollfd *ep = ev->
events;
608 nevents = poll(ev->
events, nfd, timeout);
621 for (ep = ev->
events; nevents; ep++) {
628 struct epoll_event e;
630 memset(&e, 0,
sizeof(
struct epoll_event));
632 e.events = ep->events;
633 if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) {
SERR(
"epoll_ctl"); }
637 if ((ep->events &
GRN_COM_POLLIN)) { grn_com_receiver(ctx, com); }
646 EV_SET(&e, efd, ep->filter, EV_DELETE, 0, 0, NULL);
647 if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
SERR(
"kevent"); }
651 if ((ep->filter ==
GRN_COM_POLLIN)) { grn_com_receiver(ctx, com); }
654 if (!(ep->events & ep->revents)) {
continue; }
661 if ((ep->revents &
GRN_COM_POLLIN)) { grn_com_receiver(ctx, com); }
696 grn_com_header *header,
const char *body, uint32_t size,
int flags)
701 header->
size = htonl(size);
702 GRN_LOG(ctx,
GRN_LOG_INFO,
"send (%d,%x,%d,%02x,%02x,%04x)", size, header->
flags, header->
proto, header->
qtype, header->
level, header->
status);
708 wsabufs[0].buf = (
char *)header;
710 wsabufs[1].buf = (
char *)body;
711 wsabufs[1].len = size;
712 if (WSASend(cs->
fd, wsabufs, 2, &n_sent, 0, NULL, NULL) == SOCKET_ERROR) {
717 struct iovec msg_iov[2];
721 msg.msg_iov = msg_iov;
723 msg.msg_control = NULL;
724 msg.msg_controllen = 0;
726 msg_iov[0].iov_base = header;
728 msg_iov[1].iov_base = (
char *)body;
729 msg_iov[1].iov_len = size;
736 if ((ret = send(cs->
fd, (
const void *)header, whole_size,
MSG_NOSIGNAL|flags)) == -1) {
741 if (ret != whole_size) {
743 cs->
fd, (
long long int)ret, (
unsigned long long int)whole_size);
752 scan_delimiter(
const char *p,
const char *e)
758 if (p[0] ==
'\r') {
return p + 4; }
else { p += 2; }
761 }
else { p += p[3] ==
'\r' ? 1 : 4; }
797 if ((p = scan_delimiter(p - (o > 3 ? 3 : o), p + ret))) {
815 if (header->
qtype ==
'H') {
838 if ((ret = recv(com->
fd, p, rest, 0)) < 0) {
849 if (header->
proto < 0x80) {
850 return grn_com_recv_text(ctx, com, header, buf, ret);
852 rest -= ret, p += ret;
860 GRN_LOG(ctx,
GRN_LOG_INFO,
"recv (%d,%x,%d,%02x,%02x,%04x)", ntohl(header->
size), header->
flags, header->
proto, header->
qtype, header->
level, header->
status);
862 uint8_t proto = header->
proto;
863 size_t value_size = ntohl(header->
size);
874 for (rest = value_size; rest;) {
875 if ((ret = recv(com->
fd,
GRN_BULK_CURR(buf), rest, MSG_WAITALL)) < 0) {
911 struct addrinfo hints, *addrinfo_list, *addrinfo_ptr;
912 char port_string[16];
913 int getaddrinfo_result;
915 memset(&hints, 0,
sizeof(hints));
916 hints.ai_family = AF_UNSPEC;
917 hints.ai_socktype = SOCK_STREAM;
918 #ifdef AI_NUMERICSERV
919 hints.ai_flags = AI_NUMERICSERV;
921 snprintf(port_string,
sizeof(port_string),
"%d", port);
923 getaddrinfo_result = getaddrinfo(dest, port_string, &hints, &addrinfo_list);
924 if (getaddrinfo_result != 0) {
925 switch (getaddrinfo_result) {
929 dest, port_string, gai_strerror(getaddrinfo_result));
939 dest, port_string, gai_strerror(getaddrinfo_result));
945 for (addrinfo_ptr = addrinfo_list; addrinfo_ptr;
946 addrinfo_ptr = addrinfo_ptr->ai_next) {
947 static const int value = 1;
948 fd = socket(addrinfo_ptr->ai_family, addrinfo_ptr->ai_socktype,
949 addrinfo_ptr->ai_protocol);
952 }
else if (setsockopt(fd, 6, TCP_NODELAY, &value,
sizeof(value))) {
955 }
else if (connect(fd, addrinfo_ptr->ai_addr, addrinfo_ptr->ai_addrlen)) {
963 freeaddrinfo(addrinfo_list);
989 if (shutdown(fd, SHUT_RDWR) == -1) { }
1019 int getaddrinfo_result;
1020 struct addrinfo *bind_address_info = NULL;
1021 struct addrinfo hints;
1022 char port_string[6];
1025 if (!bind_address) {
1026 bind_address =
"0.0.0.0";
1028 snprintf(port_string,
sizeof(port_string),
"%d", port);
1029 memset(&hints, 0,
sizeof(
struct addrinfo));
1030 hints.ai_family = PF_UNSPEC;
1031 hints.ai_socktype = SOCK_STREAM;
1032 #ifdef AI_NUMERICSERV
1033 hints.ai_flags = AI_NUMERICSERV;
1035 getaddrinfo_result = getaddrinfo(bind_address, port_string,
1036 &hints, &bind_address_info);
1037 if (getaddrinfo_result != 0) {
1038 switch (getaddrinfo_result) {
1042 "getaddrinfo: <%s:%s>: %s",
1043 bind_address, port_string, gai_strerror(getaddrinfo_result));
1048 SERR(
"getaddrinfo");
1053 "getaddrinfo: <%s:%s>: %s",
1054 bind_address, port_string, gai_strerror(getaddrinfo_result));
1059 if ((lfd = socket(bind_address_info->ai_family, SOCK_STREAM, 0)) == -1) {
1068 if (setsockopt(lfd,
SOL_TCP, TCP_NODELAY, (
void *) &v,
sizeof(
int)) == -1) {
1072 if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (
void *) &v,
sizeof(
int)) == -1) {
1077 if (bind(lfd, bind_address_info->ai_addr, bind_address_info->ai_addrlen) < 0) {
1100 if (bind_address_info) { freeaddrinfo(bind_address_info); }
1129 (
void **)&edge, added);
1131 if (
id) { edge->
id = id; }