Groonga 3.0.9 Source Code Document
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
grnslap.c
Go to the documentation of this file.
1 /* -*- c-basic-offset: 2 -*- */
2 /*
3  Copyright(C) 2009-2012 Brazil
4 
5  This library is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Lesser General Public
7  License version 2.1 as published by the Free Software Foundation.
8 
9  This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  Lesser General Public License for more details.
13 
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "lib/com.h"
20 #include "lib/ctx_impl.h"
21 #include <string.h>
22 #include <stdio.h>
23 #ifdef HAVE_SYS_WAIT_H
24 #include <sys/wait.h>
25 #endif /* HAVE_SYS_WAIT_H */
26 #ifdef HAVE_NETINET_IN_H
27 #include <netinet/in.h>
28 #endif /* HAVE_NETINET_IN_H */
29 
30 #define DEFAULT_PORT 10041
31 #define DEFAULT_HOST "localhost"
32 #define DEFAULT_MAX_CONCURRENCY 10
33 #define DEFAULT_MAX_THROUGHPUT 10000
34 #define MAX_DEST 256
35 
36 typedef struct {
37  const char *host;
38  uint16_t port;
40 
41 static int proto = 'g';
42 static int verbose = 0;
43 static int dest_cnt = 0;
44 static grn_slap_dest dests[MAX_DEST];
45 static int max_con = DEFAULT_MAX_CONCURRENCY;
46 static int max_tp = DEFAULT_MAX_THROUGHPUT;
47 
48 #include <stdarg.h>
49 static void
50 lprint(grn_ctx *ctx, const char *fmt, ...)
51 {
52  char buf[1024];
53  grn_timeval tv;
54  int len;
55  va_list argp;
56  grn_timeval_now(ctx, &tv);
57  grn_timeval2str(ctx, &tv, buf);
58  len = strlen(buf);
59  buf[len++] = '|';
60  va_start(argp, fmt);
61  vsnprintf(buf + len, 1023 - len, fmt, argp);
62  va_end(argp);
63  buf[1023] = '\0';
64  puts(buf);
65 }
66 
67 static void
68 parse_dest(char *deststr, grn_slap_dest *dest)
69 {
70  int p;
71  char *d;
72  if ((d = strchr(deststr, ':'))) {
73  if ((p = atoi(d + 1))) {
74  *d = '\0';
75  dest->host = deststr;
76  dest->port = p;
77  return;
78  }
79  }
80  dest->host = NULL;
81  dest->port = 0;
82 }
83 
84 static void
85 usage(void)
86 {
87  fprintf(stderr,
88  "Usage: grnslap [options...] [dest...]\n"
89  "options:\n"
90  " -P <protocol>: http or gqtp (default: gqtp)\n"
91  " -m <max concurrency>: number of max concurrency (default: %d)\n"
92  "dest: hostname:port number (default: \"%s:%d\")\n",
94 }
95 
96 #define BUFSIZE 0x1000000
97 
98 typedef struct _session session;
99 
100 struct _session {
103  struct timeval tv;
105  int stat;
106  int query_id;
107  int n_query;
109 };
110 
111 static grn_com_event ev;
112 static grn_com_queue fsessions;
113 static grn_hash *sessions;
114 static int done = 0;
115 static int nsent = 0;
116 static int nrecv = 0;
117 static int etime_min = INT32_MAX;
118 static int etime_max = 0;
119 static int64_t etime_amount = 0;
120 
121 static session *
122 session_open(grn_ctx *ctx, grn_slap_dest *dest)
123 {
124  grn_id id;
125  session *s;
126  grn_com *com;
127  if (!(com = grn_com_copen(ctx, &ev, dest->host, dest->port))) { return NULL; }
128  id = grn_hash_add(ctx, sessions, &com->fd, sizeof(grn_sock), (void **)&s, NULL);
129  com->opaque = s;
130  s->com = com;
131  s->id = id;
132  s->stat = 1;
133  return s;
134 }
135 
136 static void
137 session_close(grn_ctx *ctx, session *s)
138 {
139  if (!s->stat) { return; }
140  grn_com_close(ctx, s->com);
141  s->stat = 0;
142  grn_hash_delete_by_id(ctx, sessions, s->id, NULL);
143 }
144 
145 static session *
146 session_alloc(grn_ctx *ctx, grn_slap_dest *dest)
147 {
148  session *s;
149  while ((s = (session *)grn_com_queue_deque(ctx, &fsessions))) {
150  if (s->n_query < 1000000 && !s->com->closed) { return s; }
151  //session_close(ctx, s);
152  }
153  return session_open(ctx, dest);
154 }
155 
156 static void
157 msg_handler(grn_ctx *ctx, grn_obj *msg)
158 {
159  uint32_t etime;
160  struct timeval tv;
161  grn_msg *m = (grn_msg *)msg;
162  grn_com *com = ((grn_msg *)msg)->u.peer;
163  session *s = com->opaque;
164  s->stat = 3;
165  gettimeofday(&tv, NULL);
166  etime = (tv.tv_sec - s->tv.tv_sec) * 1000000 + (tv.tv_usec - s->tv.tv_usec);
167  if (etime > etime_max) { etime_max = etime; }
168  if (etime < etime_min) { etime_min = etime; }
169  if (ctx->rc) { m->header.proto = 0; }
170  switch (m->header.proto) {
171  case GRN_COM_PROTO_GQTP :
172  if (GRN_BULK_VSIZE(msg) == 2) {
173  etime_amount += etime;
174  } else {
175  if (verbose) {
176  GRN_TEXT_PUTC(ctx, msg, '\0');
177  lprint(ctx, "%8d(%4d) %8d : %s", s->query_id, s->n_sessions, etime, GRN_BULK_HEAD(msg));
178  }
179  }
180  if ((m->header.flags & GRN_CTX_TAIL)) {
181  grn_com_queue_enque(ctx, &fsessions, (grn_com_queue_entry *)s);
182  nrecv++;
183  }
184  break;
185  case GRN_COM_PROTO_HTTP :
186  nrecv++;
187  /* lprint(ctx, "recv: %d, %d", (int)GRN_BULK_VSIZE(msg), nrecv); */
188  grn_com_close_(ctx, com);
189  grn_com_queue_enque(ctx, &fsessions, (grn_com_queue_entry *)s);
190  break;
191  default :
192  grn_com_close_(ctx, com);
193  grn_com_queue_enque(ctx, &fsessions, (grn_com_queue_entry *)s);
194  break;
195  }
196  grn_msg_close(ctx, msg);
197 }
198 
199 static void * CALLBACK
200 receiver(void *arg)
201 {
202  grn_ctx ctx_, *ctx = &ctx_;
203  grn_ctx_init(ctx, 0);
204  while (!grn_com_event_poll(ctx, &ev, 100)) {
205  if (nsent == nrecv && done) { break; }
206  /*
207  {
208  session *s;
209  GRN_HASH_EACH(ctx, sessions, id, NULL, NULL, &s, {
210  printf("id=%d: fd=%d stat=%d q=%d n=%d\n", s->id, s->com->fd, s->stat, s->query_id, s->n_query);
211  });
212  }
213  */
214  }
215  grn_ctx_fin(ctx);
216  return NULL;
217 }
218 
219 static int
220 do_client()
221 {
222  int rc = -1;
223  grn_obj text;
224  grn_thread thread;
225  struct timeval tvb, tve;
226  grn_com_header sheader;
227  grn_ctx ctx_, *ctx = &ctx_;
228  grn_ctx_init(ctx, 0);
229  GRN_COM_QUEUE_INIT(&fsessions);
230  sessions = grn_hash_create(ctx, NULL, sizeof(grn_sock), sizeof(session), 0);
231  sheader.proto = GRN_COM_PROTO_GQTP;
232  sheader.qtype = 0;
233  sheader.keylen = 0;
234  sheader.level = 0;
235  sheader.flags = 0;
236  sheader.status = 0;
237  sheader.opaque = 0;
238  sheader.cas = 0;
239  GRN_TEXT_INIT(&text, 0);
240  rc = grn_bulk_reserve(ctx, &text, BUFSIZE);
241  if (!rc) {
242  char *buf = GRN_TEXT_VALUE(&text);
243  if (!grn_com_event_init(ctx, &ev, 1000, sizeof(grn_com))) {
244  ev.msg_handler = msg_handler;
245  if (!THREAD_CREATE(thread, receiver, NULL)) {
246  int cnt = 0;
247  gettimeofday(&tvb, NULL);
248  lprint(ctx, "begin: procotol=%c max_concurrency=%d max_tp=%d", proto, max_con, max_tp);
249  while (fgets(buf, BUFSIZE, stdin)) {
250  uint32_t size = strlen(buf) - 1;
251  session *s = session_alloc(ctx, dests + (cnt++ % dest_cnt));
252  if (s) {
253  gettimeofday(&s->tv, NULL);
254  s->n_query++;
255  s->query_id = ++nsent;
256  s->n_sessions = (nsent - nrecv);
257  switch (proto) {
258  case 'H' :
259  case 'h' :
260  if (grn_com_send_http(ctx, s->com, buf, size, 0)) {
261  fprintf(stderr, "grn_com_send_http failed\n");
262  }
263  s->stat = 2;
264  /*
265  lprint(ctx, "sent %04d %04d %d",
266  s->n_query, s->query_id, s->com->fd);
267  */
268  break;
269  default :
270  if (grn_com_send(ctx, s->com, &sheader, buf, size, 0)) {
271  fprintf(stderr, "grn_com_send failed\n");
272  }
273  break;
274  }
275  } else {
276  fprintf(stderr, "grn_com_copen failed\n");
277  }
278  for (;;) {
279  gettimeofday(&tve, NULL);
280  if ((nrecv < max_tp * (tve.tv_sec - tvb.tv_sec)) &&
281  (nsent - nrecv) < max_con) { break; }
282  /* lprint(ctx, "s:%d r:%d", nsent, nrecv); */
283  grn_nanosleep(1000000);
284  }
285  if (!(nsent % 1000)) { lprint(ctx, " : %d", nsent); }
286  }
287  done = 1;
288  if (THREAD_JOIN(thread)) {
289  fprintf(stderr, "THREAD_JOIN failed\n");
290  }
291  gettimeofday(&tve, NULL);
292  {
293  double qps;
294  uint64_t etime = (tve.tv_sec - tvb.tv_sec);
295  etime *= 1000000;
296  etime += (tve.tv_usec - tvb.tv_usec);
297  qps = (double)nsent * 1000000 / etime;
298  lprint(ctx, "end : n=%d min=%d max=%d avg=%d qps=%f etime=%d.%06d", nsent, etime_min, etime_max, (int)(etime_amount / nsent), qps, etime / 1000000, etime % 1000000);
299  }
300  {
301  session *s;
302  GRN_HASH_EACH(ctx, sessions, id, NULL, NULL, &s, {
303  session_close(ctx, s);
304  });
305  }
306  rc = 0;
307  } else {
308  fprintf(stderr, "THREAD_CREATE failed\n");
309  }
310  grn_com_event_fin(ctx, &ev);
311  } else {
312  fprintf(stderr, "grn_com_event_init failed\n");
313  }
314  }
315  grn_obj_unlink(ctx, &text);
316  grn_hash_close(ctx, sessions);
317  grn_ctx_fin(ctx);
318  return rc;
319 }
320 
321 enum {
324 };
325 
326 int
327 main(int argc, char **argv)
328 {
329  const char *protostr = NULL, *maxconstr = NULL, *maxtpstr = NULL;
330  int r, i, flags = 0;
331  static grn_str_getopt_opt opts[] = {
332  {'P', NULL, NULL, 0, GETOPT_OP_NONE},
333  {'m', NULL, NULL, 0, GETOPT_OP_NONE},
334  {'t', NULL, NULL, 0, GETOPT_OP_NONE},
335  {'h', NULL, NULL, flag_usage, GETOPT_OP_ON},
336  {'v', NULL, NULL, flag_verbose, GETOPT_OP_ON},
337  {'\0', NULL, NULL, 0, 0}
338  };
339  opts[0].arg = &protostr;
340  opts[1].arg = &maxconstr;
341  opts[2].arg = &maxtpstr;
342  i = grn_str_getopt(argc, argv, opts, &flags);
343  if (protostr) { proto = *protostr; }
344  if (maxconstr) { max_con = atoi(maxconstr); }
345  if (maxtpstr) { max_tp = atoi(maxtpstr); }
346  if (flags & flag_verbose) { verbose = 1; }
347 
348  if (argc <= i) {
349  dests[0].host = DEFAULT_HOST;
350  dests[0].port = DEFAULT_PORT;
351  dest_cnt = 1;
352  } else if (i > 0 && argc <= (i + MAX_DEST)){
353  for (dest_cnt = 0; i < argc; i++) {
354  parse_dest(argv[i], &dests[dest_cnt]);
355  if (dests[dest_cnt].host) {
356  dest_cnt++;
357  }
358  }
359  if (!dest_cnt) { flags |= flag_usage; }
360  } else {
361  /* too much dests */
362  flags |= flag_usage;
363  }
364 
365  if (grn_init()) { return -1; }
366  if (flags & flag_usage) {
367  usage(); r = -1;
368  } else {
369  r = do_client();
370  }
371  grn_fin();
372  return r;
373 }