26 #include <sys/types.h>
33 #define DEFAULT_RECV_ENDPOINT "tcp://*:1234"
34 #define DEFAULT_SEND_ENDPOINT "tcp://*:1235"
35 #define SEND_WAIT 1000
37 #define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
46 static volatile sig_atomic_t loop = 1;
51 const char *query, uint32_t query_len,
52 const char *client_id, uint32_t client_id_len,
53 const char *learn_target_name, uint32_t learn_target_name_len,
59 GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
60 GRN_TEXT_PUTS(ctx, buf,
" --each 'suggest_preparer(_id,type,item,sequence,time,pair_");
61 GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
94 const char *query, uint32_t query_len,
95 const char *client_id, uint32_t client_id_len,
96 const char *learn_target_names,
97 uint32_t learn_target_names_len,
101 if (millisec && query && client_id && learn_target_names) {
103 const char *tn, *tnp, *tne;
104 tn = tnp = learn_target_names;
105 tne = learn_target_names + learn_target_names_len;
107 if (tnp == tne || *tnp ==
'|') {
117 load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
118 tn, tn_len, millisec, submit);
128 #define PACK_KEY_FROM_ID(id) do { \
130 char _k_buf[GRN_TABLE_MAX_KEY_SIZE]; \
131 _k_len = grn_table_get_key(ctx, ref_table, (id), _k_buf, GRN_TABLE_MAX_KEY_SIZE); \
132 msgpack_pack_raw(&pk, _k_len); \
133 msgpack_pack_raw_body(&pk, _k_buf, _k_len); \
136 #define PACK_MAP_ITEM(col_name) do { \
138 msgpack_pack_raw(&pk, sizeof(#col_name) - 1); \
139 msgpack_pack_raw_body(&pk, CONST_STR_LEN(#col_name)); \
140 switch (col_##col_name->header.type) { \
141 case GRN_COLUMN_FIX_SIZE: \
142 GRN_VALUE_FIX_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
144 case GRN_COLUMN_VAR_SIZE: \
145 if ((col_##col_name->header.flags & GRN_OBJ_COLUMN_TYPE_MASK) == GRN_OBJ_COLUMN_VECTOR) { \
146 GRN_VALUE_FIX_SIZE_INIT(&_v, GRN_OBJ_VECTOR, grn_obj_get_range(ctx, col_##col_name)); \
148 GRN_VALUE_VAR_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
152 grn_obj_get_value(ctx, col_##col_name, rec_id, &_v); \
154 switch (_v.header.type) { \
156 switch (_v.header.domain) { \
157 case GRN_DB_SHORT_TEXT: \
158 msgpack_pack_raw(&pk, GRN_TEXT_LEN(&_v)); \
159 msgpack_pack_raw_body(&pk, GRN_TEXT_VALUE(&_v), GRN_TEXT_LEN(&_v)); \
162 msgpack_pack_int32(&pk, GRN_INT32_VALUE(&_v)); \
164 case GRN_DB_UINT32: \
165 msgpack_pack_uint32(&pk, GRN_UINT32_VALUE(&_v)); \
168 msgpack_pack_double(&pk, (double)GRN_TIME_VALUE(&_v) / GRN_TIME_USEC_PER_SEC); \
171 PACK_KEY_FROM_ID(GRN_RECORD_VALUE(&_v)); \
176 grn_id *_idv = (grn_id *)GRN_BULK_HEAD(&_v), *_idve = (grn_id *)GRN_BULK_CURR(&_v); \
177 msgpack_pack_array(&pk, _idve - _idv); \
178 for (; _idv < _idve; _idv++) { \
179 PACK_KEY_FROM_ID(*_idv); \
184 print_error("invalid groonga object type(%d) for msgpack.", _v.header.type); \
185 msgpack_pack_nil(&pk); \
188 grn_obj_close(ctx, &_v); \
192 zmq_send_to_httpd(
void *zmq_send_sock,
void *data,
size_t size)
195 if (!zmq_msg_init_size(&msg, size)) {
196 memcpy((
void *)zmq_msg_data(&msg), data, size);
197 if (zmq_msg_send(&msg, zmq_send_sock, 0)) {
209 send_handler(
void *zmq_send_sock,
grn_ctx *ctx)
230 grn_obj *col_last, *col_kana, *col_freq, *col_freq2,
231 *col_buzz, *col_boost;
250 msgpack_sbuffer sbuf;
252 msgpack_sbuffer_init(&sbuf);
253 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
255 msgpack_pack_map(&pk, 8);
258 msgpack_pack_raw(&pk, 6);
260 msgpack_pack_raw(&pk, name_len);
261 msgpack_pack_raw_body(&pk, name_buf, name_len);
263 msgpack_pack_raw(&pk, 4);
266 msgpack_pack_raw(&pk, key_len);
267 msgpack_pack_raw_body(&pk, key, key_len);
276 zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
280 msgpack_sbuffer_destroy(&sbuf);
288 grn_obj *col_pre, *col_post, *col_freq0, *col_freq1, *col_freq2;
305 msgpack_sbuffer sbuf;
321 msgpack_sbuffer_init(&sbuf);
322 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
324 msgpack_pack_map(&pk, 7);
327 msgpack_pack_raw(&pk, 6);
329 msgpack_pack_raw(&pk, name_len);
330 msgpack_pack_raw_body(&pk, name_buf, name_len);
332 msgpack_pack_raw(&pk, 4);
335 msgpack_pack_uint64(&pk, *key);
343 zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
347 msgpack_sbuffer_destroy(&sbuf);
361 send_to_httpd(
void *arg)
365 if ((zmq_send_sock = zmq_socket(thd->
zmq_ctx, ZMQ_PUB))) {
372 zmq_setsockopt(zmq_send_sock,
ZMQ_SNDHWM, &hwm,
sizeof(uint64_t));
374 send_handler(zmq_send_sock, &ctx);
378 print_error(
"error in grn_db_open() on send thread.");
382 print_error(
"error in grn_ctx_init() on send thread.");
397 uint64_t millisec = 0;
398 const char *query = NULL,
399 *client_id = NULL, *learn_target_names = NULL;
400 uint32_t query_len = 0, client_id_len = 0, learn_target_names_len = 0;
401 if (obj->type == MSGPACK_OBJECT_MAP) {
403 for (i = 0; i < obj->via.map.size; i++) {
404 msgpack_object_kv *kv;
405 kv = &(obj->via.map.ptr[
i]);
406 if (kv->key.type == MSGPACK_OBJECT_RAW && kv->key.via.raw.size) {
407 switch (kv->key.via.raw.ptr[0]) {
409 if (kv->val.type == MSGPACK_OBJECT_RAW) {
410 client_id_len = kv->val.via.raw.size;
411 client_id = kv->val.via.raw.ptr;
415 if (kv->val.type == MSGPACK_OBJECT_RAW) {
416 query_len = kv->val.via.raw.size;
417 query = kv->val.via.raw.ptr;
421 if (kv->val.type == MSGPACK_OBJECT_RAW) {
422 learn_target_names_len = kv->val.via.raw.size;
423 learn_target_names = kv->val.via.raw.ptr;
427 if (kv->val.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
428 millisec = kv->val.via.u64;
432 if (kv->val.type == MSGPACK_OBJECT_BOOLEAN) {
433 submit_flag = (kv->val.via.boolean ? 1 : 0);
442 client_id, client_id_len,
443 learn_target_names, learn_target_names_len,
444 millisec, submit_flag);
449 recv_event_loop(msgpack_zone *mempool,
void *zmq_sock,
grn_ctx *ctx)
452 zmq_pollitem_t items[] = {
453 { zmq_sock, 0, ZMQ_POLLIN, 0}
457 zmq_poll(items, 1, 10000);
458 if (items[0].revents & ZMQ_POLLIN) {
460 if (zmq_msg_init(&msg)) {
463 if (zmq_msg_recv(&msg, zmq_sock, 0)) {
467 msgpack_unpack_return ret;
468 ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
469 if (MSGPACK_UNPACK_SUCCESS == ret) {
471 handle_msg(&obj, ctx, &buf);
473 msgpack_zone_clear(mempool);
502 printf(
"fp:%p millisec:%" PRIu64
" next:%p\n",
526 #define MAX_LOG_LENGTH 0x2000
534 free_log_line_data(t);
538 if ((eol = strrchr(line_buf,
'\n'))) {
540 struct evkeyvalq get_args;
542 evhttp_parse_query(line_buf, &get_args);
544 &get_args, &query, &types, &client_id, NULL,
545 &learn_target_name, NULL, &(t->
millisec), NULL, NULL, NULL,
547 if (query && client_id && learn_target_name && t->
millisec) {
548 t->
query = evhttp_decode_uri(query);
549 t->
submit = (types && !strcmp(types,
"submit"));
550 t->
client_id = evhttp_decode_uri(client_id);
552 evhttp_clear_headers(&get_args);
557 evhttp_clear_headers(&get_args);
561 int c = fgetc(t->
fp);
562 if (c ==
'\n' || c == EOF) {
break; }
585 *list = target->
next;
586 for (p = *list; p; p = p->
next) {
595 #define PATH_SEPARATOR '/'
598 gather_log_file(
const char *dir_path,
unsigned int dir_path_len)
601 struct dirent *dirent;
604 if (!(dir = opendir(dir_path))) {
608 memcpy(path, dir_path, dir_path_len);
610 while ((dirent = readdir(dir))) {
612 unsigned int d_namlen, path_len;
613 if (*(dirent->d_name) ==
'.' && (
614 dirent->d_name[1] ==
'\0' ||
615 (dirent->d_name[1] ==
'.' && dirent->d_name[2] ==
'\0'))) {
618 d_namlen = strlen(dirent->d_name);
619 path_len = dir_path_len + 1 + d_namlen;
620 if (dir_path_len + d_namlen >=
PATH_MAX) {
continue; }
621 memcpy(path + dir_path_len + 1, dirent->d_name, d_namlen);
622 path[path_len] =
'\0';
624 if (S_ISDIR(fstat.st_mode)) {
625 gather_log_file(path, path_len);
628 if (!(p->
fp = fopen(path,
"r"))) {
636 read_log_line(&list);
637 sort_log_file_list(&list);
646 load_log(
grn_ctx *ctx,
const char *log_dir_name)
651 list = gather_log_file(log_dir_name, strlen(log_dir_name));
663 read_log_line(&list);
664 sort_log_file_list(&list);
673 "Usage: groonga-suggest-learner [options...] db_path\n"
675 " -r <recv endpoint>: recv endpoint (default: %s)\n"
676 " -s <send endpoint>: send endpoint (default: %s)\n"
677 " -l <log directory>: load from log files made on webserver.\n"
683 signal_handler(
int sig)
694 *load_logfile_name = NULL;
700 while ((ch = getopt(argc, argv,
"r:s:dl:")) != -1) {
703 recv_endpoint = optarg;
706 send_endpoint = optarg;
712 load_logfile_name = optarg;
716 argc -= optind; argv += optind;
724 msgpack_zone *mempool;
736 if (load_logfile_name) {
738 load_log(ctx, load_logfile_name);
741 if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
744 void *zmq_ctx, *zmq_recv_sock;
745 if (!(zmq_ctx = zmq_init(1))) {
748 if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
750 }
else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
755 signal(SIGTERM, signal_handler);
756 signal(SIGINT, signal_handler);
757 signal(SIGQUIT, signal_handler);
759 zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE,
"", 0);
764 if (pthread_create(&(thd.
thd), NULL, send_to_httpd, &thd)) {
765 print_error(
"error in pthread_create() for sending datas.");
767 recv_event_loop(mempool, zmq_recv_sock, ctx);
768 if (pthread_join(thd.
thd, NULL)) {
769 print_error(
"error in pthread_join() for waiting completion of sending data.");
774 msgpack_zone_free(mempool);