Groonga 3.0.9 Source Code Document
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
groonga_suggest_learner.c
Go to the documentation of this file.
1 /* -*- c-basic-offset: 2 -*- */
2 /* Copyright(C) 2010-2013 Brazil
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License version 2.1 as published by the Free Software Foundation.
7 
8  This library is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  Lesser General Public License for more details.
12 
13  You should have received a copy of the GNU Lesser General Public
14  License along with this library; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 #include "zmq_compatible.h"
18 #include <stdio.h>
19 #include <signal.h>
20 #include <unistd.h>
21 #include <msgpack.h>
22 #include <pthread.h>
23 #include <groonga.h>
24 #include <inttypes.h>
25 #include <sys/stat.h>
26 #include <sys/types.h>
27 #include <dirent.h>
28 
29 #include "util.h"
30 
31 #include <evhttp.h>
32 
33 #define DEFAULT_RECV_ENDPOINT "tcp://*:1234"
34 #define DEFAULT_SEND_ENDPOINT "tcp://*:1235"
35 #define SEND_WAIT 1000 /* 0.001sec */
36 
37 #define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
38 
39 typedef struct {
40  const char *db_path;
41  const char *send_endpoint;
42  pthread_t thd;
43  void *zmq_ctx;
45 
46 static volatile sig_atomic_t loop = 1;
47 
48 static void
49 load_to_groonga(grn_ctx *ctx,
50  grn_obj *buf,
51  const char *query, uint32_t query_len,
52  const char *client_id, uint32_t client_id_len,
53  const char *learn_target_name, uint32_t learn_target_name_len,
54  uint64_t millisec,
55  int submit)
56 {
57  GRN_BULK_REWIND(buf);
58  GRN_TEXT_PUTS(ctx, buf, "load --table event_");
59  GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
60  GRN_TEXT_PUTS(ctx, buf, " --each 'suggest_preparer(_id,type,item,sequence,time,pair_");
61  GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
62  GRN_TEXT_PUTS(ctx, buf, ")'");
65 
66  GRN_BULK_REWIND(buf);
67  GRN_TEXT_PUTS(ctx, buf, "{\"item\":");
68  grn_text_esc(ctx, buf, query, query_len);
69  GRN_TEXT_PUTS(ctx, buf, ",\"sequence\":");
70  grn_text_esc(ctx, buf, client_id, client_id_len);
71  GRN_TEXT_PUTS(ctx, buf, ",\"time\":");
72  grn_text_ftoa(ctx, buf, (double)millisec / 1000);
73  if (submit) {
74  GRN_TEXT_PUTS(ctx, buf, ",\"type\":\"submit\"}");
75  } else {
76  GRN_TEXT_PUTS(ctx, buf, "}");
77  }
78  /* printf("%.*s\n", GRN_TEXT_LEN(buf), GRN_TEXT_VALUE(buf)); */
80 
81  grn_ctx_send(ctx, CONST_STR_LEN("]"), 0);
82 
83  {
84  char *res;
85  int flags;
86  unsigned int res_len;
87  grn_ctx_recv(ctx, &res, &res_len, &flags);
88  }
89 }
90 
91 void
93  grn_obj *buf,
94  const char *query, uint32_t query_len,
95  const char *client_id, uint32_t client_id_len,
96  const char *learn_target_names,
97  uint32_t learn_target_names_len,
98  uint64_t millisec,
99  int submit)
100 {
101  if (millisec && query && client_id && learn_target_names) {
102  unsigned int tn_len;
103  const char *tn, *tnp, *tne;
104  tn = tnp = learn_target_names;
105  tne = learn_target_names + learn_target_names_len;
106  while (tnp <= tne) {
107  if (tnp == tne || *tnp == '|') {
108  tn_len = tnp - tn;
109 
110  /*
111  printf("sec: %" PRIu64 " query %.*s client_id: %.*s target: %.*s\n",
112  millisec,
113  query_len, query,
114  client_id_len, client_id,
115  tn_len, tn);
116  */
117  load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
118  tn, tn_len, millisec, submit);
119 
120  tn = ++tnp;
121  } else {
122  tnp++;
123  }
124  }
125  }
126 }
127 
128 #define PACK_KEY_FROM_ID(id) do { \
129  int _k_len; \
130  char _k_buf[GRN_TABLE_MAX_KEY_SIZE]; \
131  _k_len = grn_table_get_key(ctx, ref_table, (id), _k_buf, GRN_TABLE_MAX_KEY_SIZE); \
132  msgpack_pack_raw(&pk, _k_len); \
133  msgpack_pack_raw_body(&pk, _k_buf, _k_len); \
134 } while (0)
135 
136 #define PACK_MAP_ITEM(col_name) do { \
137  grn_obj _v; \
138  msgpack_pack_raw(&pk, sizeof(#col_name) - 1); \
139  msgpack_pack_raw_body(&pk, CONST_STR_LEN(#col_name)); \
140  switch (col_##col_name->header.type) { \
141  case GRN_COLUMN_FIX_SIZE: \
142  GRN_VALUE_FIX_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
143  break; \
144  case GRN_COLUMN_VAR_SIZE: \
145  if ((col_##col_name->header.flags & GRN_OBJ_COLUMN_TYPE_MASK) == GRN_OBJ_COLUMN_VECTOR) { \
146  GRN_VALUE_FIX_SIZE_INIT(&_v, GRN_OBJ_VECTOR, grn_obj_get_range(ctx, col_##col_name)); \
147  } else { \
148  GRN_VALUE_VAR_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
149  } \
150  break; \
151  } \
152  grn_obj_get_value(ctx, col_##col_name, rec_id, &_v); \
153  \
154  switch (_v.header.type) { \
155  case GRN_BULK: \
156  switch (_v.header.domain) { \
157  case GRN_DB_SHORT_TEXT: \
158  msgpack_pack_raw(&pk, GRN_TEXT_LEN(&_v)); \
159  msgpack_pack_raw_body(&pk, GRN_TEXT_VALUE(&_v), GRN_TEXT_LEN(&_v)); \
160  break; \
161  case GRN_DB_INT32: \
162  msgpack_pack_int32(&pk, GRN_INT32_VALUE(&_v)); \
163  break; \
164  case GRN_DB_UINT32: \
165  msgpack_pack_uint32(&pk, GRN_UINT32_VALUE(&_v)); \
166  break; \
167  case GRN_DB_TIME: \
168  msgpack_pack_double(&pk, (double)GRN_TIME_VALUE(&_v) / GRN_TIME_USEC_PER_SEC); \
169  break; \
170  default: /* ref. to ShortText key */ \
171  PACK_KEY_FROM_ID(GRN_RECORD_VALUE(&_v)); \
172  } \
173  break; \
174  case GRN_UVECTOR: /* ref.s to ShortText key */ \
175  { \
176  grn_id *_idv = (grn_id *)GRN_BULK_HEAD(&_v), *_idve = (grn_id *)GRN_BULK_CURR(&_v); \
177  msgpack_pack_array(&pk, _idve - _idv); \
178  for (; _idv < _idve; _idv++) { \
179  PACK_KEY_FROM_ID(*_idv); \
180  } \
181  } \
182  break; \
183  default: \
184  print_error("invalid groonga object type(%d) for msgpack.", _v.header.type); \
185  msgpack_pack_nil(&pk); \
186  break; \
187  } \
188  grn_obj_close(ctx, &_v); \
189 } while (0)
190 
191 static int
192 zmq_send_to_httpd(void *zmq_send_sock, void *data, size_t size)
193 {
194  zmq_msg_t msg;
195  if (!zmq_msg_init_size(&msg, size)) {
196  memcpy((void *)zmq_msg_data(&msg), data, size);
197  if (zmq_msg_send(&msg, zmq_send_sock, 0)) {
198  print_error("zmq_send() error");
199  return -1;
200  }
201  zmq_msg_close(&msg);
202  } else {
203  print_error("zmq_msg_init_size() error");
204  }
205  return 0;
206 }
207 
208 static void
209 send_handler(void *zmq_send_sock, grn_ctx *ctx)
210 {
211  grn_table_cursor *cur;
212  if ((cur = grn_table_cursor_open(ctx, grn_ctx_db(ctx), NULL, 0, NULL, 0,
213  0, -1, 0))) {
214  grn_id table_id;
215  while (loop && (table_id = grn_table_cursor_next(ctx, cur)) != GRN_ID_NIL) {
216  grn_obj *table;
217  if ((table = grn_ctx_at(ctx, table_id))) {
218  int name_len;
219  char name_buf[GRN_TABLE_MAX_KEY_SIZE];
220 
221  name_len = grn_obj_name(ctx, table, name_buf,
223 
224  if (name_len > 5) {
225  if (table->header.type == GRN_TABLE_PAT_KEY &&
226  !memcmp(name_buf, CONST_STR_LEN("item_"))) {
227  /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */
228  grn_obj *ref_table;
229  grn_table_cursor *tc;
230  grn_obj *col_last, *col_kana, *col_freq, *col_freq2,
231  *col_buzz, *col_boost;
232 
233  col_kana = grn_obj_column(ctx, table, CONST_STR_LEN("kana"));
234  col_freq = grn_obj_column(ctx, table, CONST_STR_LEN("freq"));
235  col_last = grn_obj_column(ctx, table, CONST_STR_LEN("last"));
236  col_boost = grn_obj_column(ctx, table, CONST_STR_LEN("boost"));
237  col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2"));
238  col_buzz = grn_obj_column(ctx, table, CONST_STR_LEN("buzz"));
239 
240  ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_kana));
241 
242  if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL,
243  0, 0, -1, 0))) {
244  grn_id rec_id;
245  while (loop && (rec_id = grn_table_cursor_next(ctx, tc))
246  != GRN_ID_NIL) {
247  char *key;
248  size_t key_len;
249  msgpack_packer pk;
250  msgpack_sbuffer sbuf;
251 
252  msgpack_sbuffer_init(&sbuf);
253  msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
254 
255  msgpack_pack_map(&pk, 8);
256 
257  /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */
258  msgpack_pack_raw(&pk, 6);
259  msgpack_pack_raw_body(&pk, CONST_STR_LEN("target"));
260  msgpack_pack_raw(&pk, name_len);
261  msgpack_pack_raw_body(&pk, name_buf, name_len);
262 
263  msgpack_pack_raw(&pk, 4);
264  msgpack_pack_raw_body(&pk, CONST_STR_LEN("_key"));
265  key_len = grn_table_cursor_get_key(ctx, tc, (void **)&key);
266  msgpack_pack_raw(&pk, key_len);
267  msgpack_pack_raw_body(&pk, key, key_len);
268 
269  PACK_MAP_ITEM(last);
270  PACK_MAP_ITEM(kana);
271  PACK_MAP_ITEM(freq);
272  PACK_MAP_ITEM(freq2);
273  PACK_MAP_ITEM(buzz);
274  PACK_MAP_ITEM(boost);
275 
276  zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
277 
278  usleep(SEND_WAIT);
279 
280  msgpack_sbuffer_destroy(&sbuf);
281  }
282  grn_table_cursor_close(ctx, tc);
283  }
284  } else if (table->header.type == GRN_TABLE_HASH_KEY &&
285  !memcmp(name_buf, CONST_STR_LEN("pair_"))) {
286  grn_obj *ref_table;
287  grn_table_cursor *tc;
288  grn_obj *col_pre, *col_post, *col_freq0, *col_freq1, *col_freq2;
289 
290  col_pre = grn_obj_column(ctx, table, CONST_STR_LEN("pre"));
291  col_post = grn_obj_column(ctx, table, CONST_STR_LEN("post"));
292  col_freq0 = grn_obj_column(ctx, table, CONST_STR_LEN("freq0"));
293  col_freq1 = grn_obj_column(ctx, table, CONST_STR_LEN("freq1"));
294  col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2"));
295 
296  ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_pre));
297 
298  if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL,
299  0, 0, -1, 0))) {
300  grn_id rec_id;
301  while (loop && (rec_id = grn_table_cursor_next(ctx, tc))
302  != GRN_ID_NIL) {
303  uint64_t *key;
304  msgpack_packer pk;
305  msgpack_sbuffer sbuf;
306 
307  /* skip freq0 == 0 && freq1 == 0 && freq2 == 0 */
308  {
309  grn_obj f;
310  grn_obj_get_value(ctx, col_freq0, rec_id, &f);
311  if (!GRN_INT32_VALUE(&f)) {
312  grn_obj_get_value(ctx, col_freq1, rec_id, &f);
313  if (!GRN_INT32_VALUE(&f)) {
314  grn_obj_get_value(ctx, col_freq2, rec_id, &f);
315  if (!GRN_INT32_VALUE(&f)) { continue; }
316  }
317  }
318  }
319 
320  /* make pair_* message */
321  msgpack_sbuffer_init(&sbuf);
322  msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
323 
324  msgpack_pack_map(&pk, 7);
325  /* ["_key","UInt64"],["pre","item_all"],["post","item_all"],["freq2","Int32"],["freq1","Int32"],["freq0","Int32"] */
326 
327  msgpack_pack_raw(&pk, 6);
328  msgpack_pack_raw_body(&pk, CONST_STR_LEN("target"));
329  msgpack_pack_raw(&pk, name_len);
330  msgpack_pack_raw_body(&pk, name_buf, name_len);
331 
332  msgpack_pack_raw(&pk, 4);
333  msgpack_pack_raw_body(&pk, CONST_STR_LEN("_key"));
334  grn_table_cursor_get_key(ctx, tc, (void **)&key);
335  msgpack_pack_uint64(&pk, *key);
336 
337  PACK_MAP_ITEM(pre);
338  PACK_MAP_ITEM(post);
339  PACK_MAP_ITEM(freq0);
340  PACK_MAP_ITEM(freq1);
341  PACK_MAP_ITEM(freq2);
342 
343  zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
344 
345  usleep(SEND_WAIT);
346 
347  msgpack_sbuffer_destroy(&sbuf);
348  }
349  grn_table_cursor_close(ctx, tc);
350  }
351  }
352  }
353  grn_obj_unlink(ctx, table);
354  }
355  }
356  grn_table_cursor_close(ctx, cur);
357  }
358 }
359 
360 static void *
361 send_to_httpd(void *arg)
362 {
363  send_thd_data *thd = arg;
364  void *zmq_send_sock;
365  if ((zmq_send_sock = zmq_socket(thd->zmq_ctx, ZMQ_PUB))) {
366  if (!zmq_bind(zmq_send_sock, thd->send_endpoint)) {
367  grn_ctx ctx;
368  if (!(grn_ctx_init(&ctx, 0))) {
369  grn_obj *db;
370  if ((db = grn_db_open(&ctx, thd->db_path))) {
371  uint64_t hwm = 1;
372  zmq_setsockopt(zmq_send_sock, ZMQ_SNDHWM, &hwm, sizeof(uint64_t));
373  while (loop) {
374  send_handler(zmq_send_sock, &ctx);
375  }
376  grn_obj_close(&ctx, db);
377  } else {
378  print_error("error in grn_db_open() on send thread.");
379  }
380  grn_ctx_fin(&ctx);
381  } else {
382  print_error("error in grn_ctx_init() on send thread.");
383  }
384  } else {
385  print_error("cannot bind zmq_socket.");
386  }
387  } else {
388  print_error("cannot create zmq_socket.");
389  }
390  return NULL;
391 }
392 
393 static void
394 handle_msg(msgpack_object *obj, grn_ctx *ctx, grn_obj *buf)
395 {
396  int submit_flag = 0;
397  uint64_t millisec = 0;
398  const char *query = NULL,
399  *client_id = NULL, *learn_target_names = NULL;
400  uint32_t query_len = 0, client_id_len = 0, learn_target_names_len = 0;
401  if (obj->type == MSGPACK_OBJECT_MAP) {
402  int i;
403  for (i = 0; i < obj->via.map.size; i++) {
404  msgpack_object_kv *kv;
405  kv = &(obj->via.map.ptr[i]);
406  if (kv->key.type == MSGPACK_OBJECT_RAW && kv->key.via.raw.size) {
407  switch (kv->key.via.raw.ptr[0]) {
408  case 'i':
409  if (kv->val.type == MSGPACK_OBJECT_RAW) {
410  client_id_len = kv->val.via.raw.size;
411  client_id = kv->val.via.raw.ptr;
412  }
413  break;
414  case 'q':
415  if (kv->val.type == MSGPACK_OBJECT_RAW) {
416  query_len = kv->val.via.raw.size;
417  query = kv->val.via.raw.ptr;
418  }
419  break;
420  case 'l':
421  if (kv->val.type == MSGPACK_OBJECT_RAW) {
422  learn_target_names_len = kv->val.via.raw.size;
423  learn_target_names = kv->val.via.raw.ptr;
424  }
425  break;
426  case 's':
427  if (kv->val.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
428  millisec = kv->val.via.u64;
429  }
430  break;
431  case 't':
432  if (kv->val.type == MSGPACK_OBJECT_BOOLEAN) {
433  submit_flag = (kv->val.via.boolean ? 1 : 0);
434  }
435  break;
436  default:
437  break;
438  }
439  }
440  }
441  load_to_multi_targets(ctx, buf, query, query_len,
442  client_id, client_id_len,
443  learn_target_names, learn_target_names_len,
444  millisec, submit_flag);
445  }
446 }
447 
448 static void
449 recv_event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
450 {
451  grn_obj buf;
452  zmq_pollitem_t items[] = {
453  { zmq_sock, 0, ZMQ_POLLIN, 0}
454  };
455  GRN_TEXT_INIT(&buf, 0);
456  while (loop) {
457  zmq_poll(items, 1, 10000);
458  if (items[0].revents & ZMQ_POLLIN) { /* always true */
459  zmq_msg_t msg;
460  if (zmq_msg_init(&msg)) {
461  print_error("cannot init zmq message.");
462  } else {
463  if (zmq_msg_recv(&msg, zmq_sock, 0)) {
464  print_error("cannot recv zmq message.");
465  } else {
466  msgpack_object obj;
467  msgpack_unpack_return ret;
468  ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
469  if (MSGPACK_UNPACK_SUCCESS == ret) {
470  /* msgpack_object_print(stdout, obj); */
471  handle_msg(&obj, ctx, &buf);
472  }
473  msgpack_zone_clear(mempool);
474  }
475  zmq_msg_close(&msg);
476  }
477  }
478  }
479  grn_obj_unlink(ctx, &buf);
480 }
481 
483  FILE *fp;
484  char *path;
485  uint64_t line;
486  /* datas from one line */
487  int submit;
488  char *query;
489  uint64_t millisec;
490  char *client_id;
492  /* link list */
494 };
496 
497 #if 0
498 static void
499 print_log_file_list(suggest_log_file *list)
500 {
501  while (list) {
502  printf("fp:%p millisec:%" PRIu64 " next:%p\n",
503  list->fp, list->millisec, list->next);
504  list = list->next;
505  }
506 }
507 #endif
508 
509 static void
510 free_log_line_data(suggest_log_file *l)
511 {
512  if (l->query) {
513  free(l->query);
514  l->query = NULL;
515  }
516  if (l->client_id) {
517  free(l->client_id);
518  l->client_id = NULL;
519  }
520  if (l->learn_target_name) {
521  free(l->learn_target_name);
522  l->learn_target_name = NULL;
523  }
524 }
525 
526 #define MAX_LOG_LENGTH 0x2000
527 
528 static void
529 read_log_line(suggest_log_file **list)
530 {
531  suggest_log_file *t = *list;
532  char line_buf[MAX_LOG_LENGTH];
533  while (1) {
534  free_log_line_data(t);
535  if (fgets(line_buf, MAX_LOG_LENGTH, t->fp)) {
536  char *eol;
537  t->line++;
538  if ((eol = strrchr(line_buf, '\n'))) {
539  const char *query, *types, *client_id, *learn_target_name;
540  struct evkeyvalq get_args;
541  *eol = '\0';
542  evhttp_parse_query(line_buf, &get_args);
543  parse_keyval(NULL,
544  &get_args, &query, &types, &client_id, NULL,
545  &learn_target_name, NULL, &(t->millisec), NULL, NULL, NULL,
546  NULL);
547  if (query && client_id && learn_target_name && t->millisec) {
548  t->query = evhttp_decode_uri(query);
549  t->submit = (types && !strcmp(types, "submit"));
550  t->client_id = evhttp_decode_uri(client_id);
551  t->learn_target_name = evhttp_decode_uri(learn_target_name);
552  evhttp_clear_headers(&get_args);
553  break;
554  }
555  print_error("invalid line path:%s line:%" PRIu64,
556  t->path, t->line);
557  evhttp_clear_headers(&get_args);
558  } else {
559  /* read until new line */
560  while (1) {
561  int c = fgetc(t->fp);
562  if (c == '\n' || c == EOF) { break; }
563  }
564  }
565  } else {
566  /* terminate reading log */
567  fclose(t->fp);
568  free(t->path);
569  *list = t->next;
570  free(t);
571  break;
572  }
573  }
574 }
575 
576 /* re-sorting by list->millisec asc with moving a head item. */
577 static void
578 sort_log_file_list(suggest_log_file **list)
579 {
580  suggest_log_file *p, *target;
581  target = *list;
582  if (!target || !target->next || target->millisec < target->next->millisec) {
583  return;
584  }
585  *list = target->next;
586  for (p = *list; p; p = p->next) {
587  if (!p->next || target->millisec > p->next->millisec) {
588  target->next = p->next;
589  p->next = target;
590  return;
591  }
592  }
593 }
594 
595 #define PATH_SEPARATOR '/'
596 
597 static suggest_log_file *
598 gather_log_file(const char *dir_path, unsigned int dir_path_len)
599 {
600  DIR *dir;
601  struct dirent *dirent;
602  char path[PATH_MAX + 1];
603  suggest_log_file *list = NULL;
604  if (!(dir = opendir(dir_path))) {
605  print_error("cannot open log directory.");
606  return NULL;
607  }
608  memcpy(path, dir_path, dir_path_len);
609  path[dir_path_len] = PATH_SEPARATOR;
610  while ((dirent = readdir(dir))) {
611  struct stat fstat;
612  unsigned int d_namlen, path_len;
613  if (*(dirent->d_name) == '.' && (
614  dirent->d_name[1] == '\0' ||
615  (dirent->d_name[1] == '.' && dirent->d_name[2] == '\0'))) {
616  continue;
617  }
618  d_namlen = strlen(dirent->d_name);
619  path_len = dir_path_len + 1 + d_namlen;
620  if (dir_path_len + d_namlen >= PATH_MAX) { continue; }
621  memcpy(path + dir_path_len + 1, dirent->d_name, d_namlen);
622  path[path_len] = '\0';
623  lstat(path, &fstat);
624  if (S_ISDIR(fstat.st_mode)) {
625  gather_log_file(path, path_len);
626  } else {
627  suggest_log_file *p = calloc(1, sizeof(suggest_log_file));
628  if (!(p->fp = fopen(path, "r"))) {
629  free(p);
630  } else {
631  if (list) {
632  p->next = list;
633  }
634  p->path = strdup(path);
635  list = p;
636  read_log_line(&list);
637  sort_log_file_list(&list);
638  }
639  }
640  /* print_log_file_list(list); */
641  }
642  return list;
643 }
644 
645 static void
646 load_log(grn_ctx *ctx, const char *log_dir_name)
647 {
648  grn_obj buf;
649  suggest_log_file *list;
650  GRN_TEXT_INIT(&buf, 0);
651  list = gather_log_file(log_dir_name, strlen(log_dir_name));
652  while (list) {
653  /*
654  printf("file:%s line:%" PRIu64 " query:%s millisec:%" PRIu64 "\n",
655  list->path, list->line, list->query, list->millisec);
656  */
657  load_to_multi_targets(ctx, &buf,
658  list->query, strlen(list->query),
659  list->client_id, strlen(list->client_id),
660  list->learn_target_name, strlen(list->learn_target_name),
661  list->millisec,
662  list->submit);
663  read_log_line(&list);
664  sort_log_file_list(&list);
665  }
666  grn_obj_close(ctx, &buf);
667 }
668 
669 static void
670 usage(FILE *output)
671 {
672  fprintf(output,
673  "Usage: groonga-suggest-learner [options...] db_path\n"
674  "options:\n"
675  " -r <recv endpoint>: recv endpoint (default: %s)\n"
676  " -s <send endpoint>: send endpoint (default: %s)\n"
677  " -l <log directory>: load from log files made on webserver.\n"
678  " -d : daemonize\n",
680 }
681 
682 static void
683 signal_handler(int sig)
684 {
685  loop = 0;
686 }
687 
688 int
689 main(int argc, char **argv)
690 {
691  int daemon = 0;
692  const char *recv_endpoint = DEFAULT_RECV_ENDPOINT,
693  *send_endpoint = DEFAULT_SEND_ENDPOINT,
694  *load_logfile_name = NULL;
695 
696  /* parse options */
697  {
698  int ch;
699 
700  while ((ch = getopt(argc, argv, "r:s:dl:")) != -1) {
701  switch(ch) {
702  case 'r':
703  recv_endpoint = optarg;
704  break;
705  case 's':
706  send_endpoint = optarg;
707  break;
708  case 'd':
709  daemon = 1;
710  break;
711  case 'l':
712  load_logfile_name = optarg;
713  break;
714  }
715  }
716  argc -= optind; argv += optind;
717  }
718 
719  /* main */
720  if (argc != 1) {
721  usage(stderr);
722  } else {
723  grn_ctx *ctx;
724  msgpack_zone *mempool;
725 
726  if (daemon) {
727  daemonize();
728  }
729 
730  grn_init();
731 
732  ctx = grn_ctx_open(0);
733  if (!(grn_db_open(ctx, argv[0]))) {
734  print_error("cannot open database.");
735  } else {
736  if (load_logfile_name) {
737  /* loading log mode */
738  load_log(ctx, load_logfile_name);
739  } else {
740  /* zeromq/msgpack recv mode */
741  if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
742  print_error("cannot create msgpack zone.");
743  } else {
744  void *zmq_ctx, *zmq_recv_sock;
745  if (!(zmq_ctx = zmq_init(1))) {
746  print_error("cannot create zmq context.");
747  } else {
748  if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
749  print_error("cannot create zmq_socket.");
750  } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
751  print_error("cannot bind zmq_socket.");
752  } else {
753  send_thd_data thd;
754 
755  signal(SIGTERM, signal_handler);
756  signal(SIGINT, signal_handler);
757  signal(SIGQUIT, signal_handler);
758 
759  zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
760  thd.db_path = argv[0];
761  thd.send_endpoint = send_endpoint;
762  thd.zmq_ctx = zmq_ctx;
763 
764  if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) {
765  print_error("error in pthread_create() for sending datas.");
766  }
767  recv_event_loop(mempool, zmq_recv_sock, ctx);
768  if (pthread_join(thd.thd, NULL)) {
769  print_error("error in pthread_join() for waiting completion of sending data.");
770  }
771  }
772  zmq_term(zmq_ctx);
773  }
774  msgpack_zone_free(mempool);
775  }
776  }
777  }
778  grn_obj_close(ctx, grn_ctx_db(ctx));
779  grn_ctx_fin(ctx);
780  grn_fin();
781  }
782  return 0;
783 }