MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
evrpc.c
1 /*
2  * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * 3. The name of the author may not be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 #ifdef HAVE_CONFIG_H
28 #include "config.h"
29 #endif
30 
31 #ifdef WIN32
32 #define WIN32_LEAN_AND_MEAN
33 #include <winsock2.h>
34 #include <windows.h>
35 #undef WIN32_LEAN_AND_MEAN
36 #endif
37 
38 #include <sys/types.h>
39 #ifndef WIN32
40 #include <sys/socket.h>
41 #endif
42 #ifdef HAVE_SYS_TIME_H
43 #include <sys/time.h>
44 #endif
45 #include <sys/queue.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #ifndef WIN32
49 #include <unistd.h>
50 #endif
51 #ifndef HAVE_TAILQFOREACH
52 #include <event-internal.h>
53 #endif
54 
55 #include <errno.h>
56 #include <signal.h>
57 #include <string.h>
58 #include <assert.h>
59 
60 #include "event.h"
61 #include "evrpc.h"
62 #include "evrpc-internal.h"
63 #include "evhttp.h"
64 #include "evutil.h"
65 #include "log.h"
66 
67 struct evrpc_base *
68 evrpc_init(struct evhttp *http_server)
69 {
70  struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
71  if (base == NULL)
72  return (NULL);
73 
74  /* we rely on the tagging sub system */
75  evtag_init();
76 
77  TAILQ_INIT(&base->registered_rpcs);
78  TAILQ_INIT(&base->input_hooks);
79  TAILQ_INIT(&base->output_hooks);
80  base->http_server = http_server;
81 
82  return (base);
83 }
84 
85 void
86 evrpc_free(struct evrpc_base *base)
87 {
88  struct evrpc *rpc;
89  struct evrpc_hook *hook;
90 
91  while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
92  assert(evrpc_unregister_rpc(base, rpc->uri));
93  }
94  while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
95  assert(evrpc_remove_hook(base, EVRPC_INPUT, hook));
96  }
97  while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
98  assert(evrpc_remove_hook(base, EVRPC_OUTPUT, hook));
99  }
100  free(base);
101 }
102 
103 void *
104 evrpc_add_hook(void *vbase,
105  enum EVRPC_HOOK_TYPE hook_type,
106  int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
107  void *cb_arg)
108 {
109  struct _evrpc_hooks *base = vbase;
110  struct evrpc_hook_list *head = NULL;
111  struct evrpc_hook *hook = NULL;
112  switch (hook_type) {
113  case EVRPC_INPUT:
114  head = &base->in_hooks;
115  break;
116  case EVRPC_OUTPUT:
117  head = &base->out_hooks;
118  break;
119  default:
120  assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
121  }
122 
123  hook = calloc(1, sizeof(struct evrpc_hook));
124  assert(hook != NULL);
125 
126  hook->process = cb;
127  hook->process_arg = cb_arg;
128  TAILQ_INSERT_TAIL(head, hook, next);
129 
130  return (hook);
131 }
132 
133 static int
134 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
135 {
136  struct evrpc_hook *hook = NULL;
137  TAILQ_FOREACH(hook, head, next) {
138  if (hook == handle) {
139  TAILQ_REMOVE(head, hook, next);
140  free(hook);
141  return (1);
142  }
143  }
144 
145  return (0);
146 }
147 
148 /*
149  * remove the hook specified by the handle
150  */
151 
152 int
153 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
154 {
155  struct _evrpc_hooks *base = vbase;
156  struct evrpc_hook_list *head = NULL;
157  switch (hook_type) {
158  case EVRPC_INPUT:
159  head = &base->in_hooks;
160  break;
161  case EVRPC_OUTPUT:
162  head = &base->out_hooks;
163  break;
164  default:
165  assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
166  }
167 
168  return (evrpc_remove_hook_internal(head, handle));
169 }
170 
171 static int
172 evrpc_process_hooks(struct evrpc_hook_list *head,
173  struct evhttp_request *req, struct evbuffer *evbuf)
174 {
175  struct evrpc_hook *hook;
176  TAILQ_FOREACH(hook, head, next) {
177  if (hook->process(req, evbuf, hook->process_arg) == -1)
178  return (-1);
179  }
180 
181  return (0);
182 }
183 
184 static void evrpc_pool_schedule(struct evrpc_pool *pool);
185 static void evrpc_request_cb(struct evhttp_request *, void *);
186 void evrpc_request_done(struct evrpc_req_generic*);
187 
188 /*
189  * Registers a new RPC with the HTTP server. The evrpc object is expected
190  * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
191  * calls this function.
192  */
193 
194 static char *
195 evrpc_construct_uri(const char *uri)
196 {
197  char *constructed_uri;
198  int constructed_uri_len;
199 
200  constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
201  if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
202  event_err(1, "%s: failed to register rpc at %s",
203  __func__, uri);
204  memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
205  memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
206  constructed_uri[constructed_uri_len - 1] = '\0';
207 
208  return (constructed_uri);
209 }
210 
211 int
212 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
213  void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
214 {
215  char *constructed_uri = evrpc_construct_uri(rpc->uri);
216 
217  rpc->base = base;
218  rpc->cb = cb;
219  rpc->cb_arg = cb_arg;
220 
221  TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
222 
223  evhttp_set_cb(base->http_server,
224  constructed_uri,
225  evrpc_request_cb,
226  rpc);
227 
228  free(constructed_uri);
229 
230  return (0);
231 }
232 
233 int
234 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
235 {
236  char *registered_uri = NULL;
237  struct evrpc *rpc;
238 
239  /* find the right rpc; linear search might be slow */
240  TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
241  if (strcmp(rpc->uri, name) == 0)
242  break;
243  }
244  if (rpc == NULL) {
245  /* We did not find an RPC with this name */
246  return (-1);
247  }
248  TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
249 
250  free((char *)rpc->uri);
251  free(rpc);
252 
253  registered_uri = evrpc_construct_uri(name);
254 
255  /* remove the http server callback */
256  assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
257 
258  free(registered_uri);
259  return (0);
260 }
261 
262 static void
263 evrpc_request_cb(struct evhttp_request *req, void *arg)
264 {
265  struct evrpc *rpc = arg;
266  struct evrpc_req_generic *rpc_state = NULL;
267 
268  /* let's verify the outside parameters */
269  if (req->type != EVHTTP_REQ_POST ||
270  EVBUFFER_LENGTH(req->input_buffer) <= 0)
271  goto error;
272 
273  /*
274  * we might want to allow hooks to suspend the processing,
275  * but at the moment, we assume that they just act as simple
276  * filters.
277  */
278  if (evrpc_process_hooks(&rpc->base->input_hooks,
279  req, req->input_buffer) == -1)
280  goto error;
281 
282  rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
283  if (rpc_state == NULL)
284  goto error;
285 
286  /* let's check that we can parse the request */
287  rpc_state->request = rpc->request_new();
288  if (rpc_state->request == NULL)
289  goto error;
290 
291  rpc_state->rpc = rpc;
292 
293  if (rpc->request_unmarshal(
294  rpc_state->request, req->input_buffer) == -1) {
295  /* we failed to parse the request; that's a bummer */
296  goto error;
297  }
298 
299  /* at this point, we have a well formed request, prepare the reply */
300 
301  rpc_state->reply = rpc->reply_new();
302  if (rpc_state->reply == NULL)
303  goto error;
304 
305  rpc_state->http_req = req;
306  rpc_state->done = evrpc_request_done;
307 
308  /* give the rpc to the user; they can deal with it */
309  rpc->cb(rpc_state, rpc->cb_arg);
310 
311  return;
312 
313 error:
314  evrpc_reqstate_free(rpc_state);
315  evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
316  return;
317 }
318 
319 void
320 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
321 {
322  /* clean up all memory */
323  if (rpc_state != NULL) {
324  struct evrpc *rpc = rpc_state->rpc;
325 
326  if (rpc_state->request != NULL)
327  rpc->request_free(rpc_state->request);
328  if (rpc_state->reply != NULL)
329  rpc->reply_free(rpc_state->reply);
330  free(rpc_state);
331  }
332 }
333 
334 void
335 evrpc_request_done(struct evrpc_req_generic* rpc_state)
336 {
337  struct evhttp_request *req = rpc_state->http_req;
338  struct evrpc *rpc = rpc_state->rpc;
339  struct evbuffer* data = NULL;
340 
341  if (rpc->reply_complete(rpc_state->reply) == -1) {
342  /* the reply was not completely filled in. error out */
343  goto error;
344  }
345 
346  if ((data = evbuffer_new()) == NULL) {
347  /* out of memory */
348  goto error;
349  }
350 
351  /* serialize the reply */
352  rpc->reply_marshal(data, rpc_state->reply);
353 
354  /* do hook based tweaks to the request */
355  if (evrpc_process_hooks(&rpc->base->output_hooks,
356  req, data) == -1)
357  goto error;
358 
359  /* on success, we are going to transmit marshaled binary data */
360  if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
361  evhttp_add_header(req->output_headers,
362  "Content-Type", "application/octet-stream");
363  }
364 
365  evhttp_send_reply(req, HTTP_OK, "OK", data);
366 
367  evbuffer_free(data);
368 
369  evrpc_reqstate_free(rpc_state);
370 
371  return;
372 
373 error:
374  if (data != NULL)
375  evbuffer_free(data);
376  evrpc_reqstate_free(rpc_state);
377  evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
378  return;
379 }
380 
381 /* Client implementation of RPC site */
382 
383 static int evrpc_schedule_request(struct evhttp_connection *connection,
384  struct evrpc_request_wrapper *ctx);
385 
386 struct evrpc_pool *
387 evrpc_pool_new(struct event_base *base)
388 {
389  struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
390  if (pool == NULL)
391  return (NULL);
392 
393  TAILQ_INIT(&pool->connections);
394  TAILQ_INIT(&pool->requests);
395 
396  TAILQ_INIT(&pool->input_hooks);
397  TAILQ_INIT(&pool->output_hooks);
398 
399  pool->base = base;
400  pool->timeout = -1;
401 
402  return (pool);
403 }
404 
405 static void
406 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
407 {
408  free(request->name);
409  free(request);
410 }
411 
412 void
413 evrpc_pool_free(struct evrpc_pool *pool)
414 {
415  struct evhttp_connection *connection;
417  struct evrpc_hook *hook;
418 
419  while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
420  TAILQ_REMOVE(&pool->requests, request, next);
421  /* if this gets more complicated we need our own function */
422  evrpc_request_wrapper_free(request);
423  }
424 
425  while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
426  TAILQ_REMOVE(&pool->connections, connection, next);
427  evhttp_connection_free(connection);
428  }
429 
430  while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
431  assert(evrpc_remove_hook(pool, EVRPC_INPUT, hook));
432  }
433 
434  while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
435  assert(evrpc_remove_hook(pool, EVRPC_OUTPUT, hook));
436  }
437 
438  free(pool);
439 }
440 
441 /*
442  * Add a connection to the RPC pool. A request scheduled on the pool
443  * may use any available connection.
444  */
445 
446 void
447 evrpc_pool_add_connection(struct evrpc_pool *pool,
448  struct evhttp_connection *connection) {
449  assert(connection->http_server == NULL);
450  TAILQ_INSERT_TAIL(&pool->connections, connection, next);
451 
452  /*
453  * associate an event base with this connection
454  */
455  if (pool->base != NULL)
456  evhttp_connection_set_base(connection, pool->base);
457 
458  /*
459  * unless a timeout was specifically set for a connection,
460  * the connection inherits the timeout from the pool.
461  */
462  if (connection->timeout == -1)
463  connection->timeout = pool->timeout;
464 
465  /*
466  * if we have any requests pending, schedule them with the new
467  * connections.
468  */
469 
470  if (TAILQ_FIRST(&pool->requests) != NULL) {
471  struct evrpc_request_wrapper *request =
472  TAILQ_FIRST(&pool->requests);
473  TAILQ_REMOVE(&pool->requests, request, next);
474  evrpc_schedule_request(connection, request);
475  }
476 }
477 
478 void
479 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
480 {
481  struct evhttp_connection *evcon;
482  TAILQ_FOREACH(evcon, &pool->connections, next) {
483  evcon->timeout = timeout_in_secs;
484  }
485  pool->timeout = timeout_in_secs;
486 }
487 
488 
489 static void evrpc_reply_done(struct evhttp_request *, void *);
490 static void evrpc_request_timeout(int, short, void *);
491 
492 /*
493  * Finds a connection object associated with the pool that is currently
494  * idle and can be used to make a request.
495  */
496 static struct evhttp_connection *
497 evrpc_pool_find_connection(struct evrpc_pool *pool)
498 {
499  struct evhttp_connection *connection;
500  TAILQ_FOREACH(connection, &pool->connections, next) {
501  if (TAILQ_FIRST(&connection->requests) == NULL)
502  return (connection);
503  }
504 
505  return (NULL);
506 }
507 
508 /*
509  * We assume that the ctx is no longer queued on the pool.
510  */
511 static int
512 evrpc_schedule_request(struct evhttp_connection *connection,
513  struct evrpc_request_wrapper *ctx)
514 {
515  struct evhttp_request *req = NULL;
516  struct evrpc_pool *pool = ctx->pool;
517  struct evrpc_status status;
518  char *uri = NULL;
519  int res = 0;
520 
521  if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
522  goto error;
523 
524  /* serialize the request data into the output buffer */
525  ctx->request_marshal(req->output_buffer, ctx->request);
526 
527  uri = evrpc_construct_uri(ctx->name);
528  if (uri == NULL)
529  goto error;
530 
531  /* we need to know the connection that we might have to abort */
532  ctx->evcon = connection;
533 
534  /* apply hooks to the outgoing request */
535  if (evrpc_process_hooks(&pool->output_hooks,
536  req, req->output_buffer) == -1)
537  goto error;
538 
539  if (pool->timeout > 0) {
540  /*
541  * a timeout after which the whole rpc is going to be aborted.
542  */
543  struct timeval tv;
544  evutil_timerclear(&tv);
545  tv.tv_sec = pool->timeout;
546  evtimer_add(&ctx->ev_timeout, &tv);
547  }
548 
549  /* start the request over the connection */
550  res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
551  free(uri);
552 
553  if (res == -1)
554  goto error;
555 
556  return (0);
557 
558 error:
559  memset(&status, 0, sizeof(status));
560  status.error = EVRPC_STATUS_ERR_UNSTARTED;
561  (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
562  evrpc_request_wrapper_free(ctx);
563  return (-1);
564 }
565 
566 int
567 evrpc_make_request(struct evrpc_request_wrapper *ctx)
568 {
569  struct evrpc_pool *pool = ctx->pool;
570 
571  /* initialize the event structure for this rpc */
572  evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
573  if (pool->base != NULL)
574  event_base_set(pool->base, &ctx->ev_timeout);
575 
576  /* we better have some available connections on the pool */
577  assert(TAILQ_FIRST(&pool->connections) != NULL);
578 
579  /*
580  * if no connection is available, we queue the request on the pool,
581  * the next time a connection is empty, the rpc will be send on that.
582  */
583  TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
584 
585  evrpc_pool_schedule(pool);
586 
587  return (0);
588 }
589 
590 static void
591 evrpc_reply_done(struct evhttp_request *req, void *arg)
592 {
593  struct evrpc_request_wrapper *ctx = arg;
594  struct evrpc_pool *pool = ctx->pool;
595  struct evrpc_status status;
596  int res = -1;
597 
598  /* cancel any timeout we might have scheduled */
599  event_del(&ctx->ev_timeout);
600 
601  memset(&status, 0, sizeof(status));
602  status.http_req = req;
603 
604  /* we need to get the reply now */
605  if (req != NULL) {
606  /* apply hooks to the incoming request */
607  if (evrpc_process_hooks(&pool->input_hooks,
608  req, req->input_buffer) == -1) {
609  status.error = EVRPC_STATUS_ERR_HOOKABORTED;
610  res = -1;
611  } else {
612  res = ctx->reply_unmarshal(ctx->reply,
613  req->input_buffer);
614  if (res == -1) {
615  status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
616  }
617  }
618  } else {
619  status.error = EVRPC_STATUS_ERR_TIMEOUT;
620  }
621 
622  if (res == -1) {
623  /* clear everything that we might have written previously */
624  ctx->reply_clear(ctx->reply);
625  }
626 
627  (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
628 
629  evrpc_request_wrapper_free(ctx);
630 
631  /* the http layer owns the request structure */
632 
633  /* see if we can schedule another request */
634  evrpc_pool_schedule(pool);
635 }
636 
637 static void
638 evrpc_pool_schedule(struct evrpc_pool *pool)
639 {
640  struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
641  struct evhttp_connection *evcon;
642 
643  /* if no requests are pending, we have no work */
644  if (ctx == NULL)
645  return;
646 
647  if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
648  TAILQ_REMOVE(&pool->requests, ctx, next);
649  evrpc_schedule_request(evcon, ctx);
650  }
651 }
652 
653 static void
654 evrpc_request_timeout(int fd, short what, void *arg)
655 {
656  struct evrpc_request_wrapper *ctx = arg;
657  struct evhttp_connection *evcon = ctx->evcon;
658  assert(evcon != NULL);
659 
660  evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
661 }