24 #include <sys/types.h>
32 #include <sys/queue.h>
33 #include <sys/socket.h>
34 #include <sys/types.h>
35 #include <netinet/in.h>
36 #include <sys/resource.h>
47 #define DEFAULT_PORT 8080
48 #define DEFAULT_MAX_THREADS 8
50 #define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
52 #define LISTEN_BACKLOG 756
53 #define MIN_MAX_FDS 2048
54 #define MAX_THREADS 128
63 #define RUN_MODE_MASK 0x007f
64 #define RUN_MODE_ENABLE_MAX_FD_CHECK 0x0080
91 #define CMD_BUF_SIZE 1024
95 static uint32_t max_threads;
96 static volatile sig_atomic_t loop = 1;
98 static uint32_t n_lines_per_log_file = 1000000;
102 struct evbuffer *res_buf,
const char *types,
const char *query,
103 const char *target_name,
int frequency_threshold,
104 double conditional_probability_threshold,
int limit,
107 if (target_name && types && query) {
117 GRN_TEXT_PUTS(ctx, cmd_buf,
"&conditional_probability_threshold=");
118 grn_text_ftoa(ctx, cmd_buf, conditional_probability_threshold);
130 unsigned int res_len;
135 evbuffer_add(res_buf, res, res_len);
139 evbuffer_add(res_buf,
"{}", 2);
145 log_send(
struct evkeyvalq *output_headers,
struct evbuffer *res_buf,
146 thd_data *thd,
struct evkeyvalq *get_args)
149 int frequency_threshold, limit;
150 double conditional_probability_threshold;
151 const char *callback, *types, *query, *client_id, *target_name,
155 parse_keyval(thd->
ctx, get_args, &query, &types, &client_id, &target_name,
156 &learn_target_name, &callback, &millisec, &frequency_threshold,
157 &conditional_probability_threshold, &limit,
161 if (thd->
zmq_sock && millisec && client_id && query && learn_target_name) {
165 msgpack_sbuffer sbuf;
166 int cnt, submit_flag = 0;
168 msgpack_sbuffer_init(&sbuf);
169 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
172 if (types && !strcmp(types,
"submit")) {
177 msgpack_pack_map(&pk, cnt);
180 msgpack_pack_raw(&pk, 1);
181 msgpack_pack_raw_body(&pk, &c, 1);
182 l = strlen(client_id);
183 msgpack_pack_raw(&pk, l);
184 msgpack_pack_raw_body(&pk, client_id, l);
187 msgpack_pack_raw(&pk, 1);
188 msgpack_pack_raw_body(&pk, &c, 1);
190 msgpack_pack_raw(&pk, l);
191 msgpack_pack_raw_body(&pk, query, l);
194 msgpack_pack_raw(&pk, 1);
195 msgpack_pack_raw_body(&pk, &c, 1);
196 msgpack_pack_uint64(&pk, millisec);
199 msgpack_pack_raw(&pk, 1);
200 msgpack_pack_raw_body(&pk, &c, 1);
201 l = strlen(learn_target_name);
202 msgpack_pack_raw(&pk, l);
203 msgpack_pack_raw_body(&pk, learn_target_name, l);
207 msgpack_pack_raw(&pk, 1);
208 msgpack_pack_raw_body(&pk, &c, 1);
209 msgpack_pack_true(&pk);
213 if (!zmq_msg_init_size(&msg, sbuf.size)) {
214 memcpy((
void *)zmq_msg_data(&msg), sbuf.data, sbuf.size);
215 if (zmq_msg_send(&msg, thd->
zmq_sock, 0)) {
221 msgpack_sbuffer_destroy(&sbuf);
227 evhttp_add_header(output_headers,
228 "Content-Type",
"text/javascript; charset=UTF-8");
229 content_length = strlen(callback);
230 evbuffer_add(res_buf, callback, content_length);
231 evbuffer_add(res_buf,
"(", 1);
232 content_length += suggest_result(thd->
ctx,
233 res_buf, types, query, target_name,
235 conditional_probability_threshold,
239 evbuffer_add(res_buf,
");", 2);
241 evhttp_add_header(output_headers,
242 "Content-Type",
"application/json; charset=UTF-8");
243 content_length = suggest_result(thd->
ctx,
244 res_buf, types, query, target_name,
246 conditional_probability_threshold,
251 if (content_length >= 0) {
253 snprintf(num_buf, 16,
"%d", content_length);
254 evhttp_add_header(output_headers,
"Content-Length", num_buf);
260 cleanup_httpd_thread(
thd_data *thd) {
265 evhttp_free(thd->
httpd);
275 event_base_free(thd->
base);
287 generic_handler(
struct evhttp_request *req,
void *arg)
289 struct evkeyvalq args;
293 event_base_loopexit(thd->base, NULL);
296 if (!req->uri) {
return; }
298 evhttp_parse_query(req->uri, &args);
300 struct evbuffer *res_buf;
301 if (!(res_buf = evbuffer_new())) {
302 err(1,
"failed to create response buffer");
305 evhttp_add_header(req->output_headers,
"Connection",
"close");
307 log_send(req->output_headers, res_buf, thd, &args);
308 evhttp_send_reply(req, HTTP_OK,
"OK", res_buf);
309 evbuffer_free(res_buf);
312 if (thd->log_base_path) {
313 if (thd->log_file && thd->request_reopen_log_file) {
316 if (!thd->log_file) {
322 t_st = localtime(&n);
324 snprintf(p,
PATH_MAX,
"%s%04d%02d%02d%02d%02d%02d-%02d",
326 t_st->tm_year + 1900, t_st->tm_mon + 1, t_st->tm_mday,
327 t_st->tm_hour, t_st->tm_min, t_st->tm_sec, thd->thread_id);
329 if (!(thd->log_file = fopen(p,
"a"))) {
336 fprintf(thd->log_file,
"%s\n", req->uri);
338 if (n_lines_per_log_file > 0 &&
339 thd->log_count >= n_lines_per_log_file) {
346 evhttp_clear_headers(&args);
350 bind_socket(
int port)
353 if ((nfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
358 struct sockaddr_in addr;
360 r = setsockopt(nfd, SOL_SOCKET, SO_REUSEADDR, (
char *)&one,
sizeof(
int));
361 memset(&addr, 0,
sizeof(addr));
362 addr.sin_family = AF_INET;
363 addr.sin_addr.s_addr = INADDR_ANY;
364 addr.sin_port = htons(port);
366 if ((r = bind(nfd, (
struct sockaddr *)&addr,
sizeof(addr))) < 0) {
374 if ((r = fcntl(nfd, F_GETFL, 0)) < 0 || fcntl(nfd, F_SETFL, r | O_NONBLOCK) < 0 ) {
383 signal_handler(
int sig)
389 signal_reopen_log_file(
int sig)
393 for (i = 0; i < max_threads; i++) {
402 event_base_loopexit(thd->
base, NULL);
404 struct timeval tv = {1, 0};
405 evtimer_add(&(thd->
pulse), &tv);
412 event_base_dispatch((
struct event_base *)arg);
420 case MSGPACK_OBJECT_POSITIVE_INTEGER:
423 case MSGPACK_OBJECT_RAW:
424 grn_text_esc(ctx, buf, o->via.raw.ptr, o->via.raw.size);
426 case MSGPACK_OBJECT_ARRAY:
430 for (i = 0; i < o->via.array.size; i++) {
431 msgpack2json(o->via.array.ptr, ctx, buf);
436 case MSGPACK_OBJECT_DOUBLE:
445 load_from_learner(msgpack_object *o,
grn_ctx *ctx,
grn_obj *cmd_buf)
447 if (o->type == MSGPACK_OBJECT_MAP && o->via.map.size) {
448 msgpack_object_kv *kv;
449 kv = &(o->via.map.ptr[0]);
450 if (kv->key.type == MSGPACK_OBJECT_RAW && kv->key.via.raw.size == 6 &&
452 if (kv->val.type == MSGPACK_OBJECT_RAW) {
456 GRN_TEXT_PUT(ctx, cmd_buf, kv->val.via.raw.ptr, kv->val.via.raw.size);
459 if (kv->val.via.raw.size > 5) {
464 for (i = 1; i < o->via.map.size; i++) {
466 kv = &(o->via.map.ptr[
i]);
467 msgpack2json(&(kv->key), ctx, cmd_buf);
469 msgpack2json(&(kv->val), ctx, cmd_buf);
481 unsigned int res_len;
490 recv_handler(
grn_ctx *ctx,
void *zmq_recv_sock, msgpack_zone *mempool,
grn_obj *cmd_buf)
494 if (zmq_msg_init(&msg)) {
497 if (zmq_msg_recv(&msg, zmq_recv_sock, 0)) {
501 msgpack_unpack_return ret;
503 ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
504 if (MSGPACK_UNPACK_SUCCESS == ret) {
505 load_from_learner(&obj, ctx, cmd_buf);
509 msgpack_zone_clear(mempool);
516 recv_from_learner(
void *arg)
521 if ((zmq_recv_sock = zmq_socket(thd->
zmq_ctx, ZMQ_SUB))) {
526 msgpack_zone *mempool;
527 if ((mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
529 zmq_pollitem_t items[] = {
530 { zmq_recv_sock, 0, ZMQ_POLLIN, 0}
533 zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE,
"", 0);
535 zmq_poll(items, 1, 10000);
536 if (items[0].revents & ZMQ_POLLIN) {
537 recv_handler(&ctx, zmq_recv_sock, mempool, &cmd_buf);
541 msgpack_zone_free(mempool);
547 print_error(
"error in grn_db_open() on recv thread.");
551 print_error(
"error in grn_ctx_init() on recv thread.");
563 serve_threads(
int nthreads,
int port,
const char *db_path,
void *zmq_ctx,
564 const char *send_endpoint,
const char *recv_endpoint,
565 const char *log_base_path)
569 if ((nfd = bind_socket(port)) < 0) {
570 print_error(
"cannot bind socket. please check port number with netstat.");
574 for (i = 0; i < nthreads; i++) {
575 memset(&threads[i], 0,
sizeof(threads[i]));
577 if (!(threads[i].base = event_init())) {
578 print_error(
"error in event_init() on thread %d.", i);
580 if (!(threads[i].httpd = evhttp_new(threads[i].base))) {
581 print_error(
"error in evhttp_new() on thread %d.", i);
584 if ((r = evhttp_accept_socket(threads[i].httpd, nfd))) {
585 print_error(
"error in evhttp_accept_socket() on thread %d.", i);
588 if (!(threads[i].zmq_sock = zmq_socket(zmq_ctx, ZMQ_PUB))) {
590 }
else if (zmq_connect(threads[i].zmq_sock, send_endpoint)) {
592 zmq_close(threads[i].zmq_sock);
596 zmq_setsockopt(threads[i].zmq_sock,
ZMQ_SNDHWM, &hwm,
sizeof(uint64_t));
602 print_error(
"error in grn_ctx_open() on thread %d.", i);
604 print_error(
"error in grn_db_open() on thread %d.", i);
610 evhttp_set_gencb(threads[i].httpd, generic_handler, &threads[i]);
611 evhttp_set_timeout(threads[i].httpd, 10);
613 struct timeval tv = {1, 0};
615 evtimer_add(&(threads[i].pulse), &tv);
617 if ((r = pthread_create(&(threads[i].thd), NULL, dispatch, threads[i].base))) {
618 print_error(
"error in pthread_create() on thread %d.", i);
633 if (pthread_create(&(rthd.
thd), NULL, recv_from_learner, &rthd)) {
634 print_error(
"error in pthread_create() on thread %d.", i);
636 if (pthread_join(rthd.
thd, NULL)) {
637 print_error(
"error in pthread_join() on thread %d.", i);
640 while (loop) { sleep(1); }
644 for (i = 0; i < nthreads; i++) {
645 if (threads[i].thd) {
646 if (pthread_join(threads[i].thd, NULL)) {
647 print_error(
"error in pthread_join() on thread %d.", i);
650 cleanup_httpd_thread(&(threads[i]));
656 get_core_number(
void)
658 #ifdef ACTUALLY_GET_CORE_NUMBER
659 #ifdef _SC_NPROCESSORS_CONF
660 return sysconf(_SC_NPROCESSORS_CONF);
663 size_t length =
sizeof(n_processors);
664 int mib[] = {CTL_HW, HW_NCPU};
665 if (sysctl(mib,
sizeof(mib) /
sizeof(mib[0]),
666 &n_processors, &length, NULL, 0) == 0 &&
667 length ==
sizeof(n_processors) &&
683 "Usage: groonga-suggest-httpd [options...] db_path\n"
685 " specify groonga database path which is used for suggestion.\n"
688 " -p, --port <port number> : http server port number\n"
694 " -c <thread number> : number of server threads\n"
695 " (deprecated. use --n-threads)\n"
696 " -t, --n-threads <thread number> : number of server threads\n"
698 " -s, --send-endpoint <send endpoint> : send endpoint\n"
699 " (ex. tcp://example.com:1234)\n"
700 " -r, --receive-endpoint <receive endpoint> : receive endpoint\n"
701 " (ex. tcp://example.com:1235)\n"
702 " -l, --log-base-path <path prefix> : log path prefix\n"
703 " --n-lines-per-log-file <lines number> : number of lines in a log file\n"
704 " use 0 for disabling this\n"
706 " -d, --daemon : daemonize\n"
707 " --disable-max-fd-check : disable max FD check on start\n"
708 " -h, --help : show this message\n",
709 DEFAULT_PORT, default_max_threads, n_lines_per_log_file);
716 const char *max_threads_string = NULL, *port_string = NULL;
718 const char *send_endpoint = NULL, *recv_endpoint = NULL, *log_base_path = NULL;
719 const char *n_lines_per_log_file_string = NULL;
723 if (!(default_max_threads = get_core_number())) {
742 {
'\0', NULL, NULL, 0, 0}
744 opts[0].
arg = &max_threads_string;
745 opts[1].
arg = &max_threads_string;
746 opts[3].
arg = &port_string;
747 opts[4].
arg = &address;
748 opts[5].
arg = &send_endpoint;
749 opts[6].
arg = &recv_endpoint;
750 opts[7].
arg = &log_base_path;
751 opts[8].
arg = &n_lines_per_log_file_string;
758 if (n_processed_args < 0 ||
759 (argc - n_processed_args) != 1 ||
770 if (max_threads_string) {
771 max_threads = atoi(max_threads_string);
777 max_threads = default_max_threads;
781 port_no = atoi(port_string);
787 if (!getrlimit(RLIMIT_NOFILE, &rlim)) {
792 rlim.rlim_cur = rlim.rlim_cur;
793 setrlimit(RLIMIT_NOFILE, &rlim);
797 if (n_lines_per_log_file_string) {
799 n_lines =
grn_atoll(n_lines_per_log_file_string,
800 n_lines_per_log_file_string + strlen(n_lines_per_log_file_string),
803 print_error(
"--n-lines-per-log-file must be >= 0: <%s>",
804 n_lines_per_log_file_string);
805 return(EXIT_FAILURE);
808 print_error(
"--n-lines-per-log-file must be <= %ld: <%s>",
810 return(EXIT_FAILURE);
812 n_lines_per_log_file = (uint32_t)n_lines;
821 if ((db =
grn_db_open(&ctx, argv[n_processed_args]))) {
822 if ((zmq_ctx = zmq_init(1))) {
823 signal(SIGTERM, signal_handler);
824 signal(SIGINT, signal_handler);
825 signal(SIGQUIT, signal_handler);
826 signal(SIGUSR1, signal_reopen_log_file);
828 serve_threads(max_threads, port_no, argv[n_processed_args], zmq_ctx,
829 send_endpoint, recv_endpoint, log_base_path);