Groonga 3.0.9 Source Code Document
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
groonga_suggest_httpd.c
Go to the documentation of this file.
1 /* -*- c-basic-offset: 2 -*- */
2 /* Copyright(C) 2010-2013 Brazil
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License version 2.1 as published by the Free Software Foundation.
7 
8  This library is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  Lesser General Public License for more details.
12 
13  You should have received a copy of the GNU Lesser General Public
14  License along with this library; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 /* groonga origin headers */
19 #include <str.h>
20 
21 #include <stdio.h>
22 #include <signal.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <sys/time.h>
26 #include <time.h>
27 #include <stdlib.h>
28 #include <unistd.h>
29 #include <err.h>
30 
31 #include <fcntl.h>
32 #include <sys/queue.h>
33 #include <sys/socket.h>
34 #include <sys/types.h>
35 #include <netinet/in.h>
36 #include <sys/resource.h>
37 
38 #include "zmq_compatible.h"
39 #include <event.h>
40 #include <evhttp.h>
41 #include <msgpack.h>
42 #include <groonga.h>
43 #include <pthread.h>
44 
45 #include "util.h"
46 
47 #define DEFAULT_PORT 8080
48 #define DEFAULT_MAX_THREADS 8
49 
50 #define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
51 
52 #define LISTEN_BACKLOG 756
53 #define MIN_MAX_FDS 2048
54 #define MAX_THREADS 128 /* max 256 */
55 
56 typedef enum {
61 } run_mode;
62 
63 #define RUN_MODE_MASK 0x007f
64 #define RUN_MODE_ENABLE_MAX_FD_CHECK 0x0080
65 
66 
67 typedef struct {
70  void *zmq_sock;
73  pthread_t thd;
74  uint32_t thread_id;
75  struct event_base *base;
76  struct evhttp *httpd;
77  struct event pulse;
78  const char *log_base_path;
79  FILE *log_file;
80  uint32_t log_count;
82 } thd_data;
83 
84 typedef struct {
85  const char *db_path;
86  const char *recv_endpoint;
87  pthread_t thd;
88  void *zmq_ctx;
90 
91 #define CMD_BUF_SIZE 1024
92 
93 static thd_data threads[MAX_THREADS];
94 static uint32_t default_max_threads = DEFAULT_MAX_THREADS;
95 static uint32_t max_threads;
96 static volatile sig_atomic_t loop = 1;
97 static grn_obj *db;
98 static uint32_t n_lines_per_log_file = 1000000;
99 
100 static int
101 suggest_result(grn_ctx *ctx,
102  struct evbuffer *res_buf, const char *types, const char *query,
103  const char *target_name, int frequency_threshold,
104  double conditional_probability_threshold, int limit,
105  grn_obj *cmd_buf, grn_obj *pass_through_parameters)
106 {
107  if (target_name && types && query) {
108  GRN_BULK_REWIND(cmd_buf);
109  GRN_TEXT_PUTS(ctx, cmd_buf, "/d/suggest?table=item_");
110  grn_text_urlenc(ctx, cmd_buf, target_name, strlen(target_name));
111  GRN_TEXT_PUTS(ctx, cmd_buf, "&column=kana&types=");
112  grn_text_urlenc(ctx, cmd_buf, types, strlen(types));
113  GRN_TEXT_PUTS(ctx, cmd_buf, "&query=");
114  grn_text_urlenc(ctx, cmd_buf, query, strlen(query));
115  GRN_TEXT_PUTS(ctx, cmd_buf, "&frequency_threshold=");
116  grn_text_itoa(ctx, cmd_buf, frequency_threshold);
117  GRN_TEXT_PUTS(ctx, cmd_buf, "&conditional_probability_threshold=");
118  grn_text_ftoa(ctx, cmd_buf, conditional_probability_threshold);
119  GRN_TEXT_PUTS(ctx, cmd_buf, "&limit=");
120  grn_text_itoa(ctx, cmd_buf, limit);
121  if (GRN_TEXT_LEN(pass_through_parameters) > 0) {
122  GRN_TEXT_PUTS(ctx, cmd_buf, "&");
123  GRN_TEXT_PUT(ctx, cmd_buf,
124  GRN_TEXT_VALUE(pass_through_parameters),
125  GRN_TEXT_LEN(pass_through_parameters));
126  }
127  {
128  char *res;
129  int flags;
130  unsigned int res_len;
131 
132  grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), 0);
133  grn_ctx_recv(ctx, &res, &res_len, &flags);
134 
135  evbuffer_add(res_buf, res, res_len);
136  return res_len;
137  }
138  } else {
139  evbuffer_add(res_buf, "{}", 2);
140  return 2;
141  }
142 }
143 
144 static void
145 log_send(struct evkeyvalq *output_headers, struct evbuffer *res_buf,
146  thd_data *thd, struct evkeyvalq *get_args)
147 {
148  uint64_t millisec;
149  int frequency_threshold, limit;
150  double conditional_probability_threshold;
151  const char *callback, *types, *query, *client_id, *target_name,
152  *learn_target_name;
153 
155  parse_keyval(thd->ctx, get_args, &query, &types, &client_id, &target_name,
156  &learn_target_name, &callback, &millisec, &frequency_threshold,
157  &conditional_probability_threshold, &limit,
158  &(thd->pass_through_parameters));
159 
160  /* send data to learn client */
161  if (thd->zmq_sock && millisec && client_id && query && learn_target_name) {
162  char c;
163  size_t l;
164  msgpack_packer pk;
165  msgpack_sbuffer sbuf;
166  int cnt, submit_flag = 0;
167 
168  msgpack_sbuffer_init(&sbuf);
169  msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
170 
171  cnt = 4;
172  if (types && !strcmp(types, "submit")) {
173  cnt++;
174  types = NULL;
175  submit_flag = 1;
176  }
177  msgpack_pack_map(&pk, cnt);
178 
179  c = 'i';
180  msgpack_pack_raw(&pk, 1);
181  msgpack_pack_raw_body(&pk, &c, 1);
182  l = strlen(client_id);
183  msgpack_pack_raw(&pk, l);
184  msgpack_pack_raw_body(&pk, client_id, l);
185 
186  c = 'q';
187  msgpack_pack_raw(&pk, 1);
188  msgpack_pack_raw_body(&pk, &c, 1);
189  l = strlen(query);
190  msgpack_pack_raw(&pk, l);
191  msgpack_pack_raw_body(&pk, query, l);
192 
193  c = 's';
194  msgpack_pack_raw(&pk, 1);
195  msgpack_pack_raw_body(&pk, &c, 1);
196  msgpack_pack_uint64(&pk, millisec);
197 
198  c = 'l';
199  msgpack_pack_raw(&pk, 1);
200  msgpack_pack_raw_body(&pk, &c, 1);
201  l = strlen(learn_target_name);
202  msgpack_pack_raw(&pk, l);
203  msgpack_pack_raw_body(&pk, learn_target_name, l);
204 
205  if (submit_flag) {
206  c = 't';
207  msgpack_pack_raw(&pk, 1);
208  msgpack_pack_raw_body(&pk, &c, 1);
209  msgpack_pack_true(&pk);
210  }
211  {
212  zmq_msg_t msg;
213  if (!zmq_msg_init_size(&msg, sbuf.size)) {
214  memcpy((void *)zmq_msg_data(&msg), sbuf.data, sbuf.size);
215  if (zmq_msg_send(&msg, thd->zmq_sock, 0)) {
216  print_error("zmq_msg_send() error");
217  }
218  zmq_msg_close(&msg);
219  }
220  }
221  msgpack_sbuffer_destroy(&sbuf);
222  }
223  /* make result */
224  {
225  int content_length;
226  if (callback) {
227  evhttp_add_header(output_headers,
228  "Content-Type", "text/javascript; charset=UTF-8");
229  content_length = strlen(callback);
230  evbuffer_add(res_buf, callback, content_length);
231  evbuffer_add(res_buf, "(", 1);
232  content_length += suggest_result(thd->ctx,
233  res_buf, types, query, target_name,
234  frequency_threshold,
235  conditional_probability_threshold,
236  limit,
237  &(thd->cmd_buf),
238  &(thd->pass_through_parameters)) + 3;
239  evbuffer_add(res_buf, ");", 2);
240  } else {
241  evhttp_add_header(output_headers,
242  "Content-Type", "application/json; charset=UTF-8");
243  content_length = suggest_result(thd->ctx,
244  res_buf, types, query, target_name,
245  frequency_threshold,
246  conditional_probability_threshold,
247  limit,
248  &(thd->cmd_buf),
249  &(thd->pass_through_parameters));
250  }
251  if (content_length >= 0) {
252  char num_buf[16];
253  snprintf(num_buf, 16, "%d", content_length);
254  evhttp_add_header(output_headers, "Content-Length", num_buf);
255  }
256  }
257 }
258 
259 static void
260 cleanup_httpd_thread(thd_data *thd) {
261  if (thd->log_file) {
262  fclose(thd->log_file);
263  }
264  if (thd->httpd) {
265  evhttp_free(thd->httpd);
266  }
267  if (thd->zmq_sock) {
268  zmq_close(thd->zmq_sock);
269  }
270  grn_obj_unlink(thd->ctx, &(thd->cmd_buf));
272  if (thd->ctx) {
273  grn_ctx_close(thd->ctx);
274  }
275  event_base_free(thd->base);
276 }
277 
278 static void
279 close_log_file(thd_data *thread)
280 {
281  fclose(thread->log_file);
282  thread->log_file = NULL;
284 }
285 
286 static void
287 generic_handler(struct evhttp_request *req, void *arg)
288 {
289  struct evkeyvalq args;
290  thd_data *thd = arg;
291 
292  if (!loop) {
293  event_base_loopexit(thd->base, NULL);
294  return;
295  }
296  if (!req->uri) { return; }
297 
298  evhttp_parse_query(req->uri, &args);
299  {
300  struct evbuffer *res_buf;
301  if (!(res_buf = evbuffer_new())) {
302  err(1, "failed to create response buffer");
303  }
304 
305  evhttp_add_header(req->output_headers, "Connection", "close");
306 
307  log_send(req->output_headers, res_buf, thd, &args);
308  evhttp_send_reply(req, HTTP_OK, "OK", res_buf);
309  evbuffer_free(res_buf);
310  /* logging */
311  {
312  if (thd->log_base_path) {
313  if (thd->log_file && thd->request_reopen_log_file) {
314  close_log_file(thd);
315  }
316  if (!thd->log_file) {
317  time_t n;
318  struct tm *t_st;
319  char p[PATH_MAX + 1];
320 
321  time(&n);
322  t_st = localtime(&n);
323 
324  snprintf(p, PATH_MAX, "%s%04d%02d%02d%02d%02d%02d-%02d",
325  thd->log_base_path,
326  t_st->tm_year + 1900, t_st->tm_mon + 1, t_st->tm_mday,
327  t_st->tm_hour, t_st->tm_min, t_st->tm_sec, thd->thread_id);
328 
329  if (!(thd->log_file = fopen(p, "a"))) {
330  print_error("cannot open log_file %s.", p);
331  } else {
332  thd->log_count = 0;
333  }
334  }
335  if (thd->log_file) {
336  fprintf(thd->log_file, "%s\n", req->uri);
337  thd->log_count++;
338  if (n_lines_per_log_file > 0 &&
339  thd->log_count >= n_lines_per_log_file) {
340  close_log_file(thd);
341  }
342  }
343  }
344  }
345  }
346  evhttp_clear_headers(&args);
347 }
348 
349 static int
350 bind_socket(int port)
351 {
352  int nfd;
353  if ((nfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
354  print_error("cannot open socket for http.");
355  return -1;
356  } else {
357  int r, one = 1;
358  struct sockaddr_in addr;
359 
360  r = setsockopt(nfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(int));
361  memset(&addr, 0, sizeof(addr));
362  addr.sin_family = AF_INET;
363  addr.sin_addr.s_addr = INADDR_ANY;
364  addr.sin_port = htons(port);
365 
366  if ((r = bind(nfd, (struct sockaddr *)&addr, sizeof(addr))) < 0) {
367  print_error("cannot bind socket for http.");
368  return r;
369  }
370  if ((r = listen(nfd, LISTEN_BACKLOG)) < 0) {
371  print_error("cannot listen socket for http.");
372  return r;
373  }
374  if ((r = fcntl(nfd, F_GETFL, 0)) < 0 || fcntl(nfd, F_SETFL, r | O_NONBLOCK) < 0 ) {
375  print_error("cannot fcntl socket for http.");
376  return -1;
377  }
378  return nfd;
379  }
380 }
381 
382 static void
383 signal_handler(int sig)
384 {
385  loop = 0;
386 }
387 
388 static void
389 signal_reopen_log_file(int sig)
390 {
391  uint32_t i;
392 
393  for (i = 0; i < max_threads; i++) {
395  }
396 }
397 
398 void
399 timeout_handler(int fd, short events, void *arg) {
400  thd_data *thd = arg;
401  if (!loop) {
402  event_base_loopexit(thd->base, NULL);
403  } else {
404  struct timeval tv = {1, 0};
405  evtimer_add(&(thd->pulse), &tv);
406  }
407 }
408 
409 static void *
410 dispatch(void *arg)
411 {
412  event_base_dispatch((struct event_base *)arg);
413  return NULL;
414 }
415 
416 static void
417 msgpack2json(msgpack_object *o, grn_ctx *ctx, grn_obj *buf)
418 {
419  switch (o->type) {
420  case MSGPACK_OBJECT_POSITIVE_INTEGER:
421  grn_text_ulltoa(ctx, buf, o->via.u64);
422  break;
423  case MSGPACK_OBJECT_RAW:
424  grn_text_esc(ctx, buf, o->via.raw.ptr, o->via.raw.size);
425  break;
426  case MSGPACK_OBJECT_ARRAY:
427  GRN_TEXT_PUTC(ctx, buf, '[');
428  {
429  int i;
430  for (i = 0; i < o->via.array.size; i++) {
431  msgpack2json(o->via.array.ptr, ctx, buf);
432  }
433  }
434  GRN_TEXT_PUTC(ctx, buf, ']');
435  break;
436  case MSGPACK_OBJECT_DOUBLE:
437  grn_text_ftoa(ctx, buf, o->via.dec);
438  break;
439  default:
440  print_error("cannot handle this msgpack type.");
441  }
442 }
443 
444 static void
445 load_from_learner(msgpack_object *o, grn_ctx *ctx, grn_obj *cmd_buf)
446 {
447  if (o->type == MSGPACK_OBJECT_MAP && o->via.map.size) {
448  msgpack_object_kv *kv;
449  kv = &(o->via.map.ptr[0]);
450  if (kv->key.type == MSGPACK_OBJECT_RAW && kv->key.via.raw.size == 6 &&
451  !memcmp(kv->key.via.raw.ptr, CONST_STR_LEN("target"))) {
452  if (kv->val.type == MSGPACK_OBJECT_RAW) {
453  int i;
454  GRN_BULK_REWIND(cmd_buf);
455  GRN_TEXT_PUTS(ctx, cmd_buf, "load --table ");
456  GRN_TEXT_PUT(ctx, cmd_buf, kv->val.via.raw.ptr, kv->val.via.raw.size);
457  grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE);
459  if (kv->val.via.raw.size > 5) {
460  if (!memcmp(kv->val.via.raw.ptr, CONST_STR_LEN("item_")) ||
461  !memcmp(kv->val.via.raw.ptr, CONST_STR_LEN("pair_"))) {
462  char delim = '{';
463  GRN_BULK_REWIND(cmd_buf);
464  for (i = 1; i < o->via.map.size; i++) {
465  GRN_TEXT_PUTC(ctx, cmd_buf, delim);
466  kv = &(o->via.map.ptr[i]);
467  msgpack2json(&(kv->key), ctx, cmd_buf);
468  GRN_TEXT_PUTC(ctx, cmd_buf, ':');
469  msgpack2json(&(kv->val), ctx, cmd_buf);
470  delim = ',';
471  }
472  GRN_TEXT_PUTC(ctx, cmd_buf, '}');
473  /* printf("msg: %.*s\n", GRN_TEXT_LEN(cmd_buf), GRN_TEXT_VALUE(cmd_buf)); */
474  grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE);
475  }
476  }
477  grn_ctx_send(ctx, CONST_STR_LEN("]"), 0);
478  {
479  char *res;
480  int flags;
481  unsigned int res_len;
482  grn_ctx_recv(ctx, &res, &res_len, &flags);
483  }
484  }
485  }
486  }
487 }
488 
489 static void
490 recv_handler(grn_ctx *ctx, void *zmq_recv_sock, msgpack_zone *mempool, grn_obj *cmd_buf)
491 {
492  zmq_msg_t msg;
493 
494  if (zmq_msg_init(&msg)) {
495  print_error("cannot init zmq message.");
496  } else {
497  if (zmq_msg_recv(&msg, zmq_recv_sock, 0)) {
498  print_error("cannot recv zmq message.");
499  } else {
500  msgpack_object obj;
501  msgpack_unpack_return ret;
502 
503  ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
504  if (MSGPACK_UNPACK_SUCCESS == ret) {
505  load_from_learner(&obj, ctx, cmd_buf);
506  } else {
507  print_error("invalid recv data.");
508  }
509  msgpack_zone_clear(mempool);
510  }
511  zmq_msg_close(&msg);
512  }
513 }
514 
515 static void *
516 recv_from_learner(void *arg)
517 {
518  void *zmq_recv_sock;
519  recv_thd_data *thd = arg;
520 
521  if ((zmq_recv_sock = zmq_socket(thd->zmq_ctx, ZMQ_SUB))) {
522  if (!zmq_connect(zmq_recv_sock, thd->recv_endpoint)) {
523  grn_ctx ctx;
524  if (!grn_ctx_init(&ctx, 0)) {
525  if ((!grn_ctx_use(&ctx, db))) {
526  msgpack_zone *mempool;
527  if ((mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
528  grn_obj cmd_buf;
529  zmq_pollitem_t items[] = {
530  { zmq_recv_sock, 0, ZMQ_POLLIN, 0}
531  };
532  GRN_TEXT_INIT(&cmd_buf, 0);
533  zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
534  while (loop) {
535  zmq_poll(items, 1, 10000);
536  if (items[0].revents & ZMQ_POLLIN) {
537  recv_handler(&ctx, zmq_recv_sock, mempool, &cmd_buf);
538  }
539  }
540  grn_obj_unlink(&ctx, &cmd_buf);
541  msgpack_zone_free(mempool);
542  } else {
543  print_error("cannot create msgpack zone.");
544  }
545  /* db_close */
546  } else {
547  print_error("error in grn_db_open() on recv thread.");
548  }
549  grn_ctx_fin(&ctx);
550  } else {
551  print_error("error in grn_ctx_init() on recv thread.");
552  }
553  } else {
554  print_error("cannot create recv zmq_socket.");
555  }
556  } else {
557  print_error("cannot connect zmq_socket.");
558  }
559  return NULL;
560 }
561 
562 static int
563 serve_threads(int nthreads, int port, const char *db_path, void *zmq_ctx,
564  const char *send_endpoint, const char *recv_endpoint,
565  const char *log_base_path)
566 {
567  int nfd;
568  uint32_t i;
569  if ((nfd = bind_socket(port)) < 0) {
570  print_error("cannot bind socket. please check port number with netstat.");
571  return -1;
572  }
573 
574  for (i = 0; i < nthreads; i++) {
575  memset(&threads[i], 0, sizeof(threads[i]));
577  if (!(threads[i].base = event_init())) {
578  print_error("error in event_init() on thread %d.", i);
579  } else {
580  if (!(threads[i].httpd = evhttp_new(threads[i].base))) {
581  print_error("error in evhttp_new() on thread %d.", i);
582  } else {
583  int r;
584  if ((r = evhttp_accept_socket(threads[i].httpd, nfd))) {
585  print_error("error in evhttp_accept_socket() on thread %d.", i);
586  } else {
587  if (send_endpoint) {
588  if (!(threads[i].zmq_sock = zmq_socket(zmq_ctx, ZMQ_PUB))) {
589  print_error("cannot create zmq_socket.");
590  } else if (zmq_connect(threads[i].zmq_sock, send_endpoint)) {
591  print_error("cannot connect zmq_socket.");
592  zmq_close(threads[i].zmq_sock);
593  threads[i].zmq_sock = NULL;
594  } else {
595  uint64_t hwm = 1;
596  zmq_setsockopt(threads[i].zmq_sock, ZMQ_SNDHWM, &hwm, sizeof(uint64_t));
597  }
598  } else {
599  threads[i].zmq_sock = NULL;
600  }
601  if (!(threads[i].ctx = grn_ctx_open(0))) {
602  print_error("error in grn_ctx_open() on thread %d.", i);
603  } else if (grn_ctx_use(threads[i].ctx, db)) {
604  print_error("error in grn_db_open() on thread %d.", i);
605  } else {
606  GRN_TEXT_INIT(&(threads[i].cmd_buf), 0);
607  GRN_TEXT_INIT(&(threads[i].pass_through_parameters), 0);
608  threads[i].log_base_path = log_base_path;
609  threads[i].thread_id = i;
610  evhttp_set_gencb(threads[i].httpd, generic_handler, &threads[i]);
611  evhttp_set_timeout(threads[i].httpd, 10);
612  {
613  struct timeval tv = {1, 0};
614  evtimer_set(&(threads[i].pulse), timeout_handler, &threads[i]);
615  evtimer_add(&(threads[i].pulse), &tv);
616  }
617  if ((r = pthread_create(&(threads[i].thd), NULL, dispatch, threads[i].base))) {
618  print_error("error in pthread_create() on thread %d.", i);
619  }
620  }
621  }
622  }
623  }
624  }
625 
626  /* recv thread from learner */
627  if (recv_endpoint) {
628  recv_thd_data rthd;
629  rthd.db_path = db_path;
630  rthd.recv_endpoint = recv_endpoint;
631  rthd.zmq_ctx = zmq_ctx;
632 
633  if (pthread_create(&(rthd.thd), NULL, recv_from_learner, &rthd)) {
634  print_error("error in pthread_create() on thread %d.", i);
635  }
636  if (pthread_join(rthd.thd, NULL)) {
637  print_error("error in pthread_join() on thread %d.", i);
638  }
639  } else {
640  while (loop) { sleep(1); }
641  }
642 
643  /* join all httpd thread */
644  for (i = 0; i < nthreads; i++) {
645  if (threads[i].thd) {
646  if (pthread_join(threads[i].thd, NULL)) {
647  print_error("error in pthread_join() on thread %d.", i);
648  }
649  }
650  cleanup_httpd_thread(&(threads[i]));
651  }
652  return 0;
653 }
654 
655 static uint32_t
656 get_core_number(void)
657 {
658 #ifdef ACTUALLY_GET_CORE_NUMBER
659 #ifdef _SC_NPROCESSORS_CONF
660  return sysconf(_SC_NPROCESSORS_CONF);
661 #else /* _SC_NPROCESSORS_CONF */
662  int n_processors;
663  size_t length = sizeof(n_processors);
664  int mib[] = {CTL_HW, HW_NCPU};
665  if (sysctl(mib, sizeof(mib) / sizeof(mib[0]),
666  &n_processors, &length, NULL, 0) == 0 &&
667  length == sizeof(n_processors) &&
668  0 < n_processors) {
669  return n_processors;
670  } else {
671  return 1;
672  }
673 #endif /* _SC_NPROCESSORS_CONF */
674 #endif /* ACTUALLY_GET_CORE_NUMBER */
675  return 0;
676 }
677 
678 static void
679 usage(FILE *output)
680 {
681  fprintf(
682  output,
683  "Usage: groonga-suggest-httpd [options...] db_path\n"
684  "db_path:\n"
685  " specify groonga database path which is used for suggestion.\n"
686  "\n"
687  "options:\n"
688  " -p, --port <port number> : http server port number\n"
689  " (default: %d)\n"
690  /*
691  " --address <ip/hostname> : server address to listen\n"
692  " (default: %s)\n"
693  */
694  " -c <thread number> : number of server threads\n"
695  " (deprecated. use --n-threads)\n"
696  " -t, --n-threads <thread number> : number of server threads\n"
697  " (default: %d)\n"
698  " -s, --send-endpoint <send endpoint> : send endpoint\n"
699  " (ex. tcp://example.com:1234)\n"
700  " -r, --receive-endpoint <receive endpoint> : receive endpoint\n"
701  " (ex. tcp://example.com:1235)\n"
702  " -l, --log-base-path <path prefix> : log path prefix\n"
703  " --n-lines-per-log-file <lines number> : number of lines in a log file\n"
704  " use 0 for disabling this\n"
705  " (default: %d)\n"
706  " -d, --daemon : daemonize\n"
707  " --disable-max-fd-check : disable max FD check on start\n"
708  " -h, --help : show this message\n",
709  DEFAULT_PORT, default_max_threads, n_lines_per_log_file);
710 }
711 
712 int
713 main(int argc, char **argv)
714 {
715  int port_no = DEFAULT_PORT;
716  const char *max_threads_string = NULL, *port_string = NULL;
717  const char *address;
718  const char *send_endpoint = NULL, *recv_endpoint = NULL, *log_base_path = NULL;
719  const char *n_lines_per_log_file_string = NULL;
720  int n_processed_args, flags = RUN_MODE_ENABLE_MAX_FD_CHECK;
721  run_mode mode = run_mode_none;
722 
723  if (!(default_max_threads = get_core_number())) {
724  default_max_threads = DEFAULT_MAX_THREADS;
725  }
726 
727  /* parse options */
728  {
729  static grn_str_getopt_opt opts[] = {
730  {'c', NULL, NULL, 0, GETOPT_OP_NONE}, /* deprecated */
731  {'t', "n-threads", NULL, 0, GETOPT_OP_NONE},
732  {'h', "help", NULL, run_mode_usage, GETOPT_OP_UPDATE},
733  {'p', "port", NULL, 0, GETOPT_OP_NONE},
734  {'\0', "bind-address", NULL, 0, GETOPT_OP_NONE}, /* not supported yet */
735  {'s', "send-endpoint", NULL, 0, GETOPT_OP_NONE},
736  {'r', "receive-endpoint", NULL, 0, GETOPT_OP_NONE},
737  {'l', "log-base-path", NULL, 0, GETOPT_OP_NONE},
738  {'\0', "n-lines-per-log-file", NULL, 0, GETOPT_OP_NONE},
739  {'d', "daemon", NULL, run_mode_daemon, GETOPT_OP_UPDATE},
740  {'\0', "disable-max-fd-check", NULL, RUN_MODE_ENABLE_MAX_FD_CHECK,
741  GETOPT_OP_OFF},
742  {'\0', NULL, NULL, 0, 0}
743  };
744  opts[0].arg = &max_threads_string;
745  opts[1].arg = &max_threads_string;
746  opts[3].arg = &port_string;
747  opts[4].arg = &address;
748  opts[5].arg = &send_endpoint;
749  opts[6].arg = &recv_endpoint;
750  opts[7].arg = &log_base_path;
751  opts[8].arg = &n_lines_per_log_file_string;
752 
753  n_processed_args = grn_str_getopt(argc, argv, opts, &flags);
754  }
755 
756  /* main */
757  mode = (flags & RUN_MODE_MASK);
758  if (n_processed_args < 0 ||
759  (argc - n_processed_args) != 1 ||
760  mode == run_mode_error) {
761  usage(stderr);
762  return EXIT_FAILURE;
763  } else if (mode == run_mode_usage) {
764  usage(stdout);
765  return EXIT_SUCCESS;
766  } else {
767  grn_ctx ctx;
768  void *zmq_ctx;
769 
770  if (max_threads_string) {
771  max_threads = atoi(max_threads_string);
772  if (max_threads > MAX_THREADS) {
773  print_error("too many threads. limit to %d.", MAX_THREADS);
774  max_threads = MAX_THREADS;
775  }
776  } else {
777  max_threads = default_max_threads;
778  }
779 
780  if (port_string) {
781  port_no = atoi(port_string);
782  }
783 
784  if (flags & RUN_MODE_ENABLE_MAX_FD_CHECK) {
785  /* check environment */
786  struct rlimit rlim;
787  if (!getrlimit(RLIMIT_NOFILE, &rlim)) {
788  if (rlim.rlim_max < MIN_MAX_FDS) {
789  print_error("too small max fds. %d required.", MIN_MAX_FDS);
790  return -1;
791  }
792  rlim.rlim_cur = rlim.rlim_cur;
793  setrlimit(RLIMIT_NOFILE, &rlim);
794  }
795  }
796 
797  if (n_lines_per_log_file_string) {
798  int64_t n_lines;
799  n_lines = grn_atoll(n_lines_per_log_file_string,
800  n_lines_per_log_file_string + strlen(n_lines_per_log_file_string),
801  NULL);
802  if (n_lines < 0) {
803  print_error("--n-lines-per-log-file must be >= 0: <%s>",
804  n_lines_per_log_file_string);
805  return(EXIT_FAILURE);
806  }
807  if (n_lines > UINT32_MAX) {
808  print_error("--n-lines-per-log-file must be <= %ld: <%s>",
809  UINT32_MAX, n_lines_per_log_file_string);
810  return(EXIT_FAILURE);
811  }
812  n_lines_per_log_file = (uint32_t)n_lines;
813  }
814 
815  if (mode == run_mode_daemon) {
816  daemonize();
817  }
818 
819  grn_init();
820  grn_ctx_init(&ctx, 0);
821  if ((db = grn_db_open(&ctx, argv[n_processed_args]))) {
822  if ((zmq_ctx = zmq_init(1))) {
823  signal(SIGTERM, signal_handler);
824  signal(SIGINT, signal_handler);
825  signal(SIGQUIT, signal_handler);
826  signal(SIGUSR1, signal_reopen_log_file);
827 
828  serve_threads(max_threads, port_no, argv[n_processed_args], zmq_ctx,
829  send_endpoint, recv_endpoint, log_base_path);
830  zmq_term(zmq_ctx);
831  } else {
832  print_error("cannot create zmq context.");
833  }
834  grn_obj_close(&ctx, db);
835  } else {
836  print_error("cannot open db.");
837  }
838  grn_ctx_fin(&ctx);
839  grn_fin();
840  }
841  return 0;
842 }