MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
memcached.c
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * memcached - memory caching daemon
4  *
5  * http://www.danga.com/memcached/
6  *
7  * Copyright 2003 Danga Interactive, Inc. All rights reserved.
8  *
9  * Use and distribution licensed under the BSD license. See
10  * the LICENSE file for full text.
11  *
12  * Authors:
13  * Anatoly Vorobey <mellon@pobox.com>
14  * Brad Fitzpatrick <brad@danga.com>
15  */
16 #include "config.h"
17 #include "config_static.h"
18 #include "memcached.h"
19 #include "memcached/extension_loggers.h"
20 #include "utilities/engine_loader.h"
21 
22 #include <signal.h>
23 #include <getopt.h>
24 #include <fcntl.h>
25 #include <errno.h>
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <string.h>
29 #include <time.h>
30 #include <assert.h>
31 #include <limits.h>
32 #include <ctype.h>
33 #include <stdarg.h>
34 #include <stddef.h>
35 
36 #include "memcached_mysql.h"
37 
38 #define INNODB_MEMCACHED
39 
40 static inline void item_set_cas(const void *cookie, item *it, uint64_t cas) {
41  settings.engine.v1->item_set_cas(settings.engine.v0, cookie, it, cas);
42 }
43 
44 /* The item must always be called "it" */
45 #define SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
46  thread_stats->slab_stats[info.clsid].slab_op++;
47 
48 #define THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
49  thread_stats->thread_op++;
50 
51 #define THREAD_GUTS2(conn, thread_stats, slab_op, thread_op) \
52  thread_stats->slab_op++; \
53  thread_stats->thread_op++;
54 
55 #define SLAB_THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
56  SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
57  THREAD_GUTS(conn, thread_stats, slab_op, thread_op)
58 
59 #define STATS_INCR1(GUTS, conn, slab_op, thread_op, key, nkey) { \
60  struct independent_stats *independent_stats = get_independent_stats(conn); \
61  struct thread_stats *thread_stats = \
62  &independent_stats->thread_stats[conn->thread->index]; \
63  topkeys_t *topkeys = independent_stats->topkeys; \
64  pthread_mutex_lock(&thread_stats->mutex); \
65  GUTS(conn, thread_stats, slab_op, thread_op); \
66  pthread_mutex_unlock(&thread_stats->mutex); \
67  TK(topkeys, slab_op, key, nkey, current_time); \
68 }
69 
70 #define STATS_INCR(conn, op, key, nkey) \
71  STATS_INCR1(THREAD_GUTS, conn, op, op, key, nkey)
72 
73 #define SLAB_INCR(conn, op, key, nkey) \
74  STATS_INCR1(SLAB_GUTS, conn, op, op, key, nkey)
75 
76 #define STATS_TWO(conn, slab_op, thread_op, key, nkey) \
77  STATS_INCR1(THREAD_GUTS2, conn, slab_op, thread_op, key, nkey)
78 
79 #define SLAB_TWO(conn, slab_op, thread_op, key, nkey) \
80  STATS_INCR1(SLAB_THREAD_GUTS, conn, slab_op, thread_op, key, nkey)
81 
82 #define STATS_HIT(conn, op, key, nkey) \
83  SLAB_TWO(conn, op##_hits, cmd_##op, key, nkey)
84 
85 #define STATS_MISS(conn, op, key, nkey) \
86  STATS_TWO(conn, op##_misses, cmd_##op, key, nkey)
87 
88 #define STATS_NOKEY(conn, op) { \
89  struct thread_stats *thread_stats = \
90  get_thread_stats(conn); \
91  pthread_mutex_lock(&thread_stats->mutex); \
92  thread_stats->op++; \
93  pthread_mutex_unlock(&thread_stats->mutex); \
94 }
95 
96 #define STATS_NOKEY2(conn, op1, op2) { \
97  struct thread_stats *thread_stats = \
98  get_thread_stats(conn); \
99  pthread_mutex_lock(&thread_stats->mutex); \
100  thread_stats->op1++; \
101  thread_stats->op2++; \
102  pthread_mutex_unlock(&thread_stats->mutex); \
103 }
104 
105 #define STATS_ADD(conn, op, amt) { \
106  struct thread_stats *thread_stats = \
107  get_thread_stats(conn); \
108  pthread_mutex_lock(&thread_stats->mutex); \
109  thread_stats->op += amt; \
110  pthread_mutex_unlock(&thread_stats->mutex); \
111 }
112 
113 volatile sig_atomic_t memcached_shutdown;
114 
115 /*
116  * We keep the current time of day in a global variable that's updated by a
117  * timer event. This saves us a bunch of time() system calls (we really only
118  * need to get the time once a second, whereas there can be tens of thousands
119  * of requests a second) and allows us to use server-start-relative timestamps
120  * rather than absolute UNIX timestamps, a space savings on systems where
121  * sizeof(time_t) > sizeof(unsigned int).
122  */
123 volatile rel_time_t current_time;
124 
125 /*
126  * forward declarations
127  */
128 static SOCKET new_socket(struct addrinfo *ai);
129 static int try_read_command(conn *c);
130 static inline struct independent_stats *get_independent_stats(conn *c);
131 static inline struct thread_stats *get_thread_stats(conn *c);
132 static void register_callback(ENGINE_HANDLE *eh,
133  ENGINE_EVENT_TYPE type,
134  EVENT_CALLBACK cb, const void *cb_data);
135 enum try_read_result {
136  READ_DATA_RECEIVED,
137  READ_NO_DATA_RECEIVED,
138  READ_ERROR,
139  READ_MEMORY_ERROR
140 };
141 
142 static enum try_read_result try_read_network(conn *c);
143 static enum try_read_result try_read_udp(conn *c);
144 
145 /* stats */
146 static void stats_init(void);
147 static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate);
148 static void process_stat_settings(ADD_STAT add_stats, void *c);
149 
150 
151 /* defaults */
152 static void settings_init(void);
153 
154 /* event handling, network IO */
155 static void event_handler(const int fd, const short which, void *arg);
156 static void complete_nread(conn *c);
157 static char *process_command(conn *c, char *command);
158 static void write_and_free(conn *c, char *buf, int bytes);
159 static int ensure_iov_space(conn *c);
160 static int add_iov(conn *c, const void *buf, int len);
161 static int add_msghdr(conn *c);
162 
163 
164 /* time handling */
165 static void set_current_time(void); /* update the global variable holding
166  global 32-bit seconds-since-start time
167  (to avoid 64 bit time_t) */
168 
170 struct stats stats;
171 struct settings settings;
172 static time_t process_started; /* when the process was started */
173 
175 static conn *listen_conn = NULL;
176 static int udp_socket[100];
177 static int num_udp_socket;
178 static struct event_base *main_base;
179 static struct independent_stats *default_independent_stats;
180 
181 static struct engine_event_handler *engine_event_handlers[MAX_ENGINE_EVENT_TYPE + 1];
182 
183 enum transmit_result {
184  TRANSMIT_COMPLETE,
185  TRANSMIT_INCOMPLETE,
186  TRANSMIT_SOFT_ERROR,
187  TRANSMIT_HARD_ERROR
188 };
189 
190 static enum transmit_result transmit(conn *c);
191 
192 #define REALTIME_MAXDELTA 60*60*24*30
193 
194 // Perform all callbacks of a given type for the given connection.
195 static void perform_callbacks(ENGINE_EVENT_TYPE type,
196  const void *data,
197  const void *c) {
198  for (struct engine_event_handler *h = engine_event_handlers[type];
199  h; h = h->next) {
200  h->cb(c, type, data, h->cb_data);
201  }
202 }
203 
204 /*
205  * given time value that's either unix time or delta from current unix time,
206  * return unix time. Use the fact that delta can't exceed one month
207  * (and real time value can't be that low).
208  */
209 static rel_time_t realtime(const time_t exptime) {
210  /* no. of seconds in 30 days - largest possible delta exptime */
211 
212  if (exptime == 0) return 0; /* 0 means never expire */
213 
214  if (exptime > REALTIME_MAXDELTA) {
215  /* if item expiration is at/before the server started, give it an
216  expiration time of 1 second after the server started.
217  (because 0 means don't expire). without this, we'd
218  underflow and wrap around to some large value way in the
219  future, effectively making items expiring in the past
220  really expiring never */
221  if (exptime <= process_started)
222  return (rel_time_t)1;
223  return (rel_time_t)(exptime - process_started);
224  } else {
225  return (rel_time_t)(exptime + current_time);
226  }
227 }
228 
232 static time_t abstime(const rel_time_t exptime)
233 {
234  return process_started + exptime;
235 }
236 
237 static void stats_init(void) {
238  stats.daemon_conns = 0;
239  stats.rejected_conns = 0;
240  stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
241 
242  stats_prefix_init();
243 }
244 
245 static void stats_reset(const void *cookie) {
246  struct conn *conn = (struct conn*)cookie;
247  STATS_LOCK();
248  stats.rejected_conns = 0;
249  stats.total_conns = 0;
250  stats_prefix_clear();
251  STATS_UNLOCK();
252  threadlocal_stats_reset(get_independent_stats(conn)->thread_stats);
253  settings.engine.v1->reset_stats(settings.engine.v0, cookie);
254 }
255 
256 static void settings_init(void) {
257  settings.use_cas = true;
258  settings.access = 0700;
259  settings.port = 11211;
260  settings.udpport = 11211;
261  /* By default this string should be NULL for getaddrinfo() */
262  settings.inter = NULL;
263  settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
264  settings.maxconns = 1000; /* to limit connections-related memory to about 5MB */
265  settings.verbose = 0;
266  settings.oldest_live = 0;
267  settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
268  settings.socketpath = NULL; /* by default, not using a unix socket */
269  settings.factor = 1.25;
270  settings.chunk_size = 48; /* space for a modest key and value */
271  settings.num_threads = 4; /* N workers */
272  settings.num_threads_per_udp = 0;
273  settings.prefix_delimiter = ':';
274  settings.detail_enabled = 0;
275  settings.allow_detailed = true;
276  settings.reqs_per_event = DEFAULT_REQS_PER_EVENT;
277  settings.backlog = 1024;
278  settings.binding_protocol = negotiating_prot;
279  settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
280  settings.topkeys = 0;
281  settings.require_sasl = false;
282  settings.extensions.logger = get_stderr_logger();
283 }
284 
285 /*
286  * Adds a message header to a connection.
287  *
288  * Returns 0 on success, -1 on out-of-memory.
289  */
290 static int add_msghdr(conn *c)
291 {
292  struct msghdr *msg;
293 
294  assert(c != NULL);
295 
296  if (c->msgsize == c->msgused) {
297  msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
298  if (! msg)
299  return -1;
300  c->msglist = msg;
301  c->msgsize *= 2;
302  }
303 
304  msg = c->msglist + c->msgused;
305 
306  /* this wipes msg_iovlen, msg_control, msg_controllen, and
307  msg_flags, the last 3 of which aren't defined on solaris: */
308  memset(msg, 0, sizeof(struct msghdr));
309 
310  msg->msg_iov = &c->iov[c->iovused];
311 
312  if (c->request_addr_size > 0) {
313  msg->msg_name = &c->request_addr;
314  msg->msg_namelen = c->request_addr_size;
315  }
316 
317  c->msgbytes = 0;
318  c->msgused++;
319 
320  if (IS_UDP(c->transport)) {
321  /* Leave room for the UDP header, which we'll fill in later. */
322  return add_iov(c, NULL, UDP_HEADER_SIZE);
323  }
324 
325  return 0;
326 }
327 
328 static const char *prot_text(enum protocol prot) {
329  char *rv = "unknown";
330  switch(prot) {
331  case ascii_prot:
332  rv = "ascii";
333  break;
334  case binary_prot:
335  rv = "binary";
336  break;
337  case negotiating_prot:
338  rv = "auto-negotiate";
339  break;
340  }
341  return rv;
342 }
343 
344 struct {
345  pthread_mutex_t mutex;
346  bool disabled;
347  ssize_t count;
348  uint64_t num_disable;
349 } listen_state;
350 
351 static bool is_listen_disabled(void) {
352  bool ret;
353  pthread_mutex_lock(&listen_state.mutex);
354  ret = listen_state.disabled;
355  pthread_mutex_unlock(&listen_state.mutex);
356  return ret;
357 }
358 
359 static uint64_t get_listen_disabled_num(void) {
360  uint64_t ret;
361  pthread_mutex_lock(&listen_state.mutex);
362  ret = listen_state.num_disable;
363  pthread_mutex_unlock(&listen_state.mutex);
364  return ret;
365 }
366 
367 static void disable_listen(void) {
368  pthread_mutex_lock(&listen_state.mutex);
369  listen_state.disabled = true;
370  listen_state.count = 10;
371  ++listen_state.num_disable;
372  pthread_mutex_unlock(&listen_state.mutex);
373 
374  conn *next;
375  for (next = listen_conn; next; next = next->next) {
376  update_event(next, 0);
377  if (listen(next->sfd, 1) != 0) {
378  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
379  "listen() failed",
380  strerror(errno));
381  }
382  }
383 }
384 
385 void safe_close(SOCKET sfd) {
386  if (sfd != INVALID_SOCKET) {
387  int rval;
388  while ((rval = closesocket(sfd)) == SOCKET_ERROR &&
389  (errno == EINTR || errno == EAGAIN)) {
390  /* go ahead and retry */
391  }
392 
393  if (rval == SOCKET_ERROR) {
394  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
395  "Failed to close socket %d (%s)!!\n", (int)sfd,
396  strerror(errno));
397  } else {
398  STATS_LOCK();
399  stats.curr_conns--;
400  STATS_UNLOCK();
401 
402  if (is_listen_disabled()) {
403  notify_dispatcher();
404  }
405  }
406  }
407 }
408 
409 /*
410  * Free list management for connections.
411  */
412 cache_t *conn_cache; /* suffix cache */
413 
426 static bool conn_reset_buffersize(conn *c) {
427  bool ret = true;
428 
429  if (c->rsize != DATA_BUFFER_SIZE) {
430  void *ptr = malloc(DATA_BUFFER_SIZE);
431  if (ptr != NULL) {
432  free(c->rbuf);
433  c->rbuf = ptr;
434  c->rsize = DATA_BUFFER_SIZE;
435  } else {
436  ret = false;
437  }
438  }
439 
440  if (c->wsize != DATA_BUFFER_SIZE) {
441  void *ptr = malloc(DATA_BUFFER_SIZE);
442  if (ptr != NULL) {
443  free(c->wbuf);
444  c->wbuf = ptr;
445  c->wsize = DATA_BUFFER_SIZE;
446  } else {
447  ret = false;
448  }
449  }
450 
451  if (c->isize != ITEM_LIST_INITIAL) {
452  void *ptr = malloc(sizeof(item *) * ITEM_LIST_INITIAL);
453  if (ptr != NULL) {
454  free(c->ilist);
455  c->ilist = ptr;
456  c->isize = ITEM_LIST_INITIAL;
457  } else {
458  ret = false;
459  }
460  }
461 
462  if (c->suffixsize != SUFFIX_LIST_INITIAL) {
463  void *ptr = malloc(sizeof(char *) * SUFFIX_LIST_INITIAL);
464  if (ptr != NULL) {
465  free(c->suffixlist);
466  c->suffixlist = ptr;
467  c->suffixsize = SUFFIX_LIST_INITIAL;
468  } else {
469  ret = false;
470  }
471  }
472 
473  if (c->iovsize != IOV_LIST_INITIAL) {
474  void *ptr = malloc(sizeof(struct iovec) * IOV_LIST_INITIAL);
475  if (ptr != NULL) {
476  free(c->iov);
477  c->iov = ptr;
478  c->iovsize = IOV_LIST_INITIAL;
479  } else {
480  ret = false;
481  }
482  }
483 
484  if (c->msgsize != MSG_LIST_INITIAL) {
485  void *ptr = malloc(sizeof(struct msghdr) * MSG_LIST_INITIAL);
486  if (ptr != NULL) {
487  free(c->msglist);
488  c->msglist = ptr;
489  c->msgsize = MSG_LIST_INITIAL;
490  } else {
491  ret = false;
492  }
493  }
494 
495  return ret;
496 }
497 
507 static int conn_constructor(void *buffer, void *unused1, int unused2) {
508  (void)unused1; (void)unused2;
509 
510  conn *c = buffer;
511  memset(c, 0, sizeof(*c));
512  MEMCACHED_CONN_CREATE(c);
513 
514  if (!conn_reset_buffersize(c)) {
515  free(c->rbuf);
516  free(c->wbuf);
517  free(c->ilist);
518  free(c->suffixlist);
519  free(c->iov);
520  free(c->msglist);
521  settings.extensions.logger->log(EXTENSION_LOG_WARNING,
522  NULL,
523  "Failed to allocate buffers for connection\n");
524  return 1;
525  }
526 
527  STATS_LOCK();
528  stats.conn_structs++;
529  STATS_UNLOCK();
530 
531  return 0;
532 }
533 
540 static void conn_destructor(void *buffer, void *unused) {
541  (void)unused;
542  conn *c = buffer;
543  free(c->rbuf);
544  free(c->wbuf);
545  free(c->ilist);
546  free(c->suffixlist);
547  free(c->iov);
548  free(c->msglist);
549 
550  STATS_LOCK();
551  stats.conn_structs--;
552  STATS_UNLOCK();
553 }
554 
555 conn *conn_new(const SOCKET sfd, STATE_FUNC init_state,
556  const int event_flags,
557  const int read_buffer_size, enum network_transport transport,
558  struct event_base *base, struct timeval *timeout) {
559  conn *c = cache_alloc(conn_cache);
560  if (c == NULL) {
561  return NULL;
562  }
563 
564  assert(c->thread == NULL);
565 
566  if (c->rsize < read_buffer_size) {
567  void *mem = malloc(read_buffer_size);
568  if (mem) {
569  c->rsize = read_buffer_size;
570  free(c->rbuf);
571  c->rbuf = mem;
572  } else {
573  assert(c->thread == NULL);
574  cache_free(conn_cache, c);
575  return NULL;
576  }
577  }
578 
579  c->transport = transport;
580  c->protocol = settings.binding_protocol;
581 
582  /* unix socket mode doesn't need this, so zeroed out. but why
583  * is this done for every command? presumably for UDP
584  * mode. */
585  if (!settings.socketpath) {
586  c->request_addr_size = sizeof(c->request_addr);
587  } else {
588  c->request_addr_size = 0;
589  }
590 
591  if (settings.verbose > 1) {
592  if (init_state == conn_listening) {
593  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
594  "<%d server listening (%s)\n", sfd,
595  prot_text(c->protocol));
596  } else if (IS_UDP(transport)) {
597  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
598  "<%d server listening (udp)\n", sfd);
599  } else if (c->protocol == negotiating_prot) {
600  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
601  "<%d new auto-negotiating client connection\n",
602  sfd);
603  } else if (c->protocol == ascii_prot) {
604  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
605  "<%d new ascii client connection.\n", sfd);
606  } else if (c->protocol == binary_prot) {
607  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
608  "<%d new binary client connection.\n", sfd);
609  } else {
610  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
611  "<%d new unknown (%d) client connection\n",
612  sfd, c->protocol);
613  assert(false);
614  }
615  }
616 
617  c->sfd = sfd;
618  c->state = init_state;
619  c->rlbytes = 0;
620  c->cmd = -1;
621  c->ascii_cmd = NULL;
622  c->rbytes = c->wbytes = 0;
623  c->wcurr = c->wbuf;
624  c->rcurr = c->rbuf;
625  c->ritem = 0;
626  c->icurr = c->ilist;
627  c->suffixcurr = c->suffixlist;
628  c->ileft = 0;
629  c->suffixleft = 0;
630  c->iovused = 0;
631  c->msgcurr = 0;
632  c->msgused = 0;
633  c->next = NULL;
634  c->list_state = 0;
635 
636  c->write_and_go = init_state;
637  c->write_and_free = 0;
638  c->item = 0;
639 
640  c->noreply = false;
641 
642  event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
643  event_base_set(base, &c->event);
644  c->ev_flags = event_flags;
645 
646  if (!register_event(c, timeout)) {
647  assert(c->thread == NULL);
648  cache_free(conn_cache, c);
649  return NULL;
650  }
651 
652  STATS_LOCK();
653  stats.total_conns++;
654  STATS_UNLOCK();
655 
656  c->aiostat = ENGINE_SUCCESS;
657  c->ewouldblock = false;
658  c->refcount = 1;
659 
660  MEMCACHED_CONN_ALLOCATE(c->sfd);
661 
662  perform_callbacks(ON_CONNECT, NULL, c);
663 
664  return c;
665 }
666 
667 static void conn_cleanup(conn *c) {
668  assert(c != NULL);
669 
670  if (c->item) {
671  settings.engine.v1->release(settings.engine.v0, c, c->item);
672  c->item = 0;
673  }
674 
675  if (c->ileft != 0) {
676  for (; c->ileft > 0; c->ileft--,c->icurr++) {
677  settings.engine.v1->release(settings.engine.v0, c, *(c->icurr));
678  }
679  }
680 
681  if (c->suffixleft != 0) {
682  for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
683  cache_free(c->thread->suffix_cache, *(c->suffixcurr));
684  }
685  }
686 
687  if (c->write_and_free) {
688  free(c->write_and_free);
689  c->write_and_free = 0;
690  }
691 
692  if (c->sasl_conn) {
693  sasl_dispose(&c->sasl_conn);
694  c->sasl_conn = NULL;
695  }
696 
697  if (c->engine_storage) {
698  settings.engine.v1->clean_engine(settings.engine.v0, c,
699  c->engine_storage);
700  }
701 
702  c->engine_storage = NULL;
703  c->tap_iterator = NULL;
704  c->thread = NULL;
705  assert(c->next == NULL);
706  c->ascii_cmd = NULL;
707  c->sfd = INVALID_SOCKET;
708  c->tap_nack_mode = false;
709 }
710 
711 void conn_close(conn *c) {
712  assert(c != NULL);
713  assert(c->sfd == INVALID_SOCKET);
714 
715  if (c->ascii_cmd != NULL) {
716  c->ascii_cmd->abort(c->ascii_cmd, c);
717  }
718 
719  assert(c->thread);
720  LOCK_THREAD(c->thread);
721  /* remove from pending-io list */
722  if (settings.verbose > 1 && list_contains(c->thread->pending_io, c)) {
723  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
724  "Current connection was in the pending-io list.. Nuking it\n");
725  }
726  c->thread->pending_io = list_remove(c->thread->pending_io, c);
727  c->thread->pending_close = list_remove(c->thread->pending_close, c);
728  UNLOCK_THREAD(c->thread);
729 
730  conn_cleanup(c);
731 
732  /*
733  * The contract with the object cache is that we should return the
734  * object in a constructed state. Reset the buffers to the default
735  * size
736  */
737  conn_reset_buffersize(c);
738  assert(c->thread == NULL);
739  cache_free(conn_cache, c);
740 }
741 
742 /*
743  * Shrinks a connection's buffers if they're too big. This prevents
744  * periodic large "get" requests from permanently chewing lots of server
745  * memory.
746  *
747  * This should only be called in between requests since it can wipe output
748  * buffers!
749  */
750 static void conn_shrink(conn *c) {
751  assert(c != NULL);
752 
753  if (IS_UDP(c->transport))
754  return;
755 
756  if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
757  char *newbuf;
758 
759  if (c->rcurr != c->rbuf)
760  memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
761 
762  newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
763 
764  if (newbuf) {
765  c->rbuf = newbuf;
766  c->rsize = DATA_BUFFER_SIZE;
767  }
768  /* TODO check other branch... */
769  c->rcurr = c->rbuf;
770  }
771 
772  if (c->isize > ITEM_LIST_HIGHWAT) {
773  item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
774  if (newbuf) {
775  c->ilist = newbuf;
776  c->isize = ITEM_LIST_INITIAL;
777  }
778  /* TODO check error condition? */
779  }
780 
781  if (c->msgsize > MSG_LIST_HIGHWAT) {
782  struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
783  if (newbuf) {
784  c->msglist = newbuf;
785  c->msgsize = MSG_LIST_INITIAL;
786  }
787  /* TODO check error condition? */
788  }
789 
790  if (c->iovsize > IOV_LIST_HIGHWAT) {
791  struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
792  if (newbuf) {
793  c->iov = newbuf;
794  c->iovsize = IOV_LIST_INITIAL;
795  }
796  /* TODO check return value */
797  }
798 }
799 
803 const char *state_text(STATE_FUNC state) {
804  if (state == conn_listening) {
805  return "conn_listening";
806  } else if (state == conn_new_cmd) {
807  return "conn_new_cmd";
808  } else if (state == conn_waiting) {
809  return "conn_waiting";
810  } else if (state == conn_read) {
811  return "conn_read";
812  } else if (state == conn_parse_cmd) {
813  return "conn_parse_cmd";
814  } else if (state == conn_write) {
815  return "conn_write";
816  } else if (state == conn_nread) {
817  return "conn_nread";
818  } else if (state == conn_swallow) {
819  return "conn_swallow";
820  } else if (state == conn_closing) {
821  return "conn_closing";
822  } else if (state == conn_mwrite) {
823  return "conn_mwrite";
824  } else if (state == conn_ship_log) {
825  return "conn_ship_log";
826  } else if (state == conn_add_tap_client) {
827  return "conn_add_tap_client";
828  } else if (state == conn_setup_tap_stream) {
829  return "conn_setup_tap_stream";
830  } else if (state == conn_pending_close) {
831  return "conn_pending_close";
832  } else if (state == conn_immediate_close) {
833  return "conn_immediate_close";
834  } else {
835  return "Unknown";
836  }
837 }
838 
839 /*
840  * Sets a connection's current state in the state machine. Any special
841  * processing that needs to happen on certain state transitions can
842  * happen here.
843  */
844 void conn_set_state(conn *c, STATE_FUNC state) {
845  assert(c != NULL);
846 
847  if (state != c->state) {
848  /*
849  * The connections in the "tap thread" behaves differently than
850  * normal connections because they operate in a full duplex mode.
851  * New messages may appear from both sides, so we can't block on
852  * read from the nework / engine
853  */
854  if (c->thread == tap_thread) {
855  if (state == conn_waiting) {
856  c->which = EV_WRITE;
857  state = conn_ship_log;
858  }
859  }
860 
861  if (settings.verbose > 2 || c->state == conn_closing
862  || c->state == conn_add_tap_client) {
863  settings.extensions.logger->log(EXTENSION_LOG_DETAIL, c,
864  "%d: going from %s to %s\n",
865  c->sfd, state_text(c->state),
866  state_text(state));
867  }
868 
869  c->state = state;
870 
871  if (state == conn_write || state == conn_mwrite) {
872  MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
873  }
874  }
875 }
876 
877 /*
878  * Ensures that there is room for another struct iovec in a connection's
879  * iov list.
880  *
881  * Returns 0 on success, -1 on out-of-memory.
882  */
883 static int ensure_iov_space(conn *c) {
884  assert(c != NULL);
885 
886  if (c->iovused >= c->iovsize) {
887  int i, iovnum;
888  struct iovec *new_iov = (struct iovec *)realloc(c->iov,
889  (c->iovsize * 2) * sizeof(struct iovec));
890  if (! new_iov)
891  return -1;
892  c->iov = new_iov;
893  c->iovsize *= 2;
894 
895  /* Point all the msghdr structures at the new list. */
896  for (i = 0, iovnum = 0; i < c->msgused; i++) {
897  c->msglist[i].msg_iov = &c->iov[iovnum];
898  iovnum += c->msglist[i].msg_iovlen;
899  }
900  }
901 
902  return 0;
903 }
904 
905 
906 /*
907  * Adds data to the list of pending data that will be written out to a
908  * connection.
909  *
910  * Returns 0 on success, -1 on out-of-memory.
911  */
912 
913 static int add_iov(conn *c, const void *buf, int len) {
914  struct msghdr *m;
915  int leftover;
916  bool limit_to_mtu;
917 
918  assert(c != NULL);
919 
920  do {
921  m = &c->msglist[c->msgused - 1];
922 
923  /*
924  * Limit UDP packets, and the first payloads of TCP replies, to
925  * UDP_MAX_PAYLOAD_SIZE bytes.
926  */
927  limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
928 
929  /* We may need to start a new msghdr if this one is full. */
930  if (m->msg_iovlen == IOV_MAX ||
931  (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
932  add_msghdr(c);
933  m = &c->msglist[c->msgused - 1];
934  }
935 
936  if (ensure_iov_space(c) != 0)
937  return -1;
938 
939  /* If the fragment is too big to fit in the datagram, split it up */
940  if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
941  leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
942  len -= leftover;
943  } else {
944  leftover = 0;
945  }
946 
947  m = &c->msglist[c->msgused - 1];
948  m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
949  m->msg_iov[m->msg_iovlen].iov_len = len;
950 
951  c->msgbytes += len;
952  c->iovused++;
953  m->msg_iovlen++;
954 
955  buf = ((char *)buf) + len;
956  len = leftover;
957  } while (leftover > 0);
958 
959  return 0;
960 }
961 
962 
963 /*
964  * Constructs a set of UDP headers and attaches them to the outgoing messages.
965  */
966 static int build_udp_headers(conn *c) {
967  int i;
968  unsigned char *hdr;
969 
970  assert(c != NULL);
971 
972  if (c->msgused > c->hdrsize) {
973  void *new_hdrbuf;
974  if (c->hdrbuf)
975  new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
976  else
977  new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
978  if (! new_hdrbuf)
979  return -1;
980  c->hdrbuf = (unsigned char *)new_hdrbuf;
981  c->hdrsize = c->msgused * 2;
982  }
983 
984  hdr = c->hdrbuf;
985  for (i = 0; i < c->msgused; i++) {
986  c->msglist[i].msg_iov[0].iov_base = (void*)hdr;
987  c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
988  *hdr++ = c->request_id / 256;
989  *hdr++ = c->request_id % 256;
990  *hdr++ = i / 256;
991  *hdr++ = i % 256;
992  *hdr++ = c->msgused / 256;
993  *hdr++ = c->msgused % 256;
994  *hdr++ = 0;
995  *hdr++ = 0;
996  assert((void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
997  }
998 
999  return 0;
1000 }
1001 
1002 
1003 static void out_string(conn *c, const char *str) {
1004  size_t len;
1005 
1006  assert(c != NULL);
1007 
1008  if (c->noreply) {
1009  if (settings.verbose > 1) {
1010  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1011  ">%d NOREPLY %s\n", c->sfd, str);
1012  }
1013  c->noreply = false;
1014  if (c->sbytes > 0) {
1015  conn_set_state(c, conn_swallow);
1016  } else {
1017  conn_set_state(c, conn_new_cmd);
1018  }
1019  return;
1020  }
1021 
1022  if (settings.verbose > 1) {
1023  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1024  ">%d %s\n", c->sfd, str);
1025  }
1026 
1027  /* Nuke a partial output... */
1028  c->msgcurr = 0;
1029  c->msgused = 0;
1030  c->iovused = 0;
1031  add_msghdr(c);
1032 
1033  len = strlen(str);
1034  if ((len + 2) > c->wsize) {
1035  /* ought to be always enough. just fail for simplicity */
1036  str = "SERVER_ERROR output line too long";
1037  len = strlen(str);
1038  }
1039 
1040  memcpy(c->wbuf, str, len);
1041  memcpy(c->wbuf + len, "\r\n", 2);
1042  c->wbytes = len + 2;
1043  c->wcurr = c->wbuf;
1044 
1045  conn_set_state(c, conn_write);
1046 
1047  if (c->sbytes > 0) {
1048  c->write_and_go = conn_swallow;
1049  } else {
1050  c->write_and_go = conn_new_cmd;
1051  }
1052 
1053  return;
1054 }
1055 
1056 /*
1057  * we get here after reading the value in set/add/replace commands. The command
1058  * has been stored in c->cmd, and the item is ready in c->item.
1059  */
1060 static void complete_update_ascii(conn *c) {
1061  assert(c != NULL);
1062 
1063  item *it = c->item;
1064  item_info info = { .nvalue = 1 };
1065  if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
1066  settings.engine.v1->release(settings.engine.v0, c, it);
1067  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1068  "%d: Failed to get item info\n",
1069  c->sfd);
1070  out_string(c, "SERVER_ERROR failed to get item details");
1071  return;
1072  }
1073 
1074  c->sbytes = 2; // swallow \r\n
1075  ENGINE_ERROR_CODE ret = c->aiostat;
1076  c->aiostat = ENGINE_SUCCESS;
1077  if (ret == ENGINE_SUCCESS) {
1078  ret = settings.engine.v1->store(settings.engine.v0, c, it, &c->cas,
1079  c->store_op, 0);
1080  }
1081 
1082 #ifdef ENABLE_DTRACE
1083  switch (c->store_op) {
1084  case OPERATION_ADD:
1085  MEMCACHED_COMMAND_ADD(c->sfd, info.key, info.nkey,
1086  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1087  break;
1088  case OPERATION_REPLACE:
1089  MEMCACHED_COMMAND_REPLACE(c->sfd, info.key, info.nkey,
1090  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1091  break;
1092  case OPERATION_APPEND:
1093  MEMCACHED_COMMAND_APPEND(c->sfd, info.key, info.nkey,
1094  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1095  break;
1096  case OPERATION_PREPEND:
1097  MEMCACHED_COMMAND_PREPEND(c->sfd, info.key, info.nkey,
1098  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1099  break;
1100  case OPERATION_SET:
1101  MEMCACHED_COMMAND_SET(c->sfd, info.key, info.nkey,
1102  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1103  break;
1104  case OPERATION_CAS:
1105  MEMCACHED_COMMAND_CAS(c->sfd, info.key, info.nkey, info.nbytes, c->cas);
1106  break;
1107  }
1108 #endif
1109 
1110  switch (ret) {
1111  case ENGINE_SUCCESS:
1112  out_string(c, "STORED");
1113  break;
1114  case ENGINE_KEY_EEXISTS:
1115  out_string(c, "EXISTS");
1116  break;
1117  case ENGINE_KEY_ENOENT:
1118  out_string(c, "NOT_FOUND");
1119  break;
1120  case ENGINE_NOT_STORED:
1121  out_string(c, "NOT_STORED");
1122  break;
1123  case ENGINE_DISCONNECT:
1124  c->state = conn_closing;
1125  break;
1126  case ENGINE_ENOTSUP:
1127  out_string(c, "SERVER_ERROR not supported");
1128  break;
1129  case ENGINE_ENOMEM:
1130  out_string(c, "SERVER_ERROR out of memory");
1131  break;
1132  case ENGINE_TMPFAIL:
1133  out_string(c, "SERVER_ERROR temporary failure");
1134  break;
1135  case ENGINE_EINVAL:
1136  out_string(c, "CLIENT_ERROR invalid arguments");
1137  break;
1138  case ENGINE_E2BIG:
1139  out_string(c, "CLIENT_ERROR value too big");
1140  break;
1141  case ENGINE_EACCESS:
1142  out_string(c, "CLIENT_ERROR access control violation");
1143  break;
1144  case ENGINE_NOT_MY_VBUCKET:
1145  out_string(c, "SERVER_ERROR not my vbucket");
1146  break;
1147  case ENGINE_FAILED:
1148  out_string(c, "SERVER_ERROR failure");
1149  break;
1150  case ENGINE_EWOULDBLOCK:
1151  c->ewouldblock = true;
1152  break;
1153  case ENGINE_WANT_MORE:
1154  assert(false);
1155  c->state = conn_closing;
1156  break;
1157 
1158  default:
1159  out_string(c, "SERVER_ERROR internal");
1160  }
1161 
1162  if (c->store_op == OPERATION_CAS) {
1163  switch (ret) {
1164  case ENGINE_SUCCESS:
1165  SLAB_INCR(c, cas_hits, info.key, info.nkey);
1166  break;
1167  case ENGINE_KEY_EEXISTS:
1168  SLAB_INCR(c, cas_badval, info.key, info.nkey);
1169  break;
1170  case ENGINE_KEY_ENOENT:
1171  STATS_NOKEY(c, cas_misses);
1172  break;
1173  default:
1174  ;
1175  }
1176  } else {
1177  SLAB_INCR(c, cmd_set, info.key, info.nkey);
1178  }
1179 
1180  if (!c->ewouldblock) {
1181  /* release the c->item reference */
1182  settings.engine.v1->release(settings.engine.v0, c, c->item);
1183  c->item = 0;
1184  }
1185 }
1186 
1190 static void* binary_get_request(conn *c) {
1191  char *ret = c->rcurr;
1192  ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
1193  c->binary_header.request.extlen);
1194 
1195  assert(ret >= c->rbuf);
1196  return ret;
1197 }
1198 
1202 static char* binary_get_key(conn *c) {
1203  return c->rcurr - (c->binary_header.request.keylen);
1204 }
1205 
1219 static ssize_t key_to_printable_buffer(char *dest, size_t destsz,
1220  int client, bool from_client,
1221  const char *prefix,
1222  const char *key,
1223  size_t nkey)
1224 {
1225  ssize_t nw = snprintf(dest, destsz, "%c%d %s ", from_client ? '>' : '<',
1226  client, prefix);
1227  if (nw == -1) {
1228  return -1;
1229  }
1230 
1231  char *ptr = dest + nw;
1232  destsz -= nw;
1233  if (nkey > destsz) {
1234  nkey = destsz;
1235  }
1236 
1237  for (ssize_t ii = 0; ii < nkey; ++ii, ++key, ++ptr) {
1238  if (isgraph(*key)) {
1239  *ptr = *key;
1240  } else {
1241  *ptr = '.';
1242  }
1243  }
1244 
1245  *ptr = '\0';
1246  return ptr - dest;
1247 }
1248 
1261 static ssize_t bytes_to_output_string(char *dest, size_t destsz,
1262  int client, bool from_client,
1263  const char *prefix,
1264  const char *data,
1265  size_t size)
1266 {
1267  ssize_t nw = snprintf(dest, destsz, "%c%d %s", from_client ? '>' : '<',
1268  client, prefix);
1269  if (nw == -1) {
1270  return -1;
1271  }
1272  ssize_t offset = nw;
1273 
1274  for (ssize_t ii = 0; ii < size; ++ii) {
1275  if (ii % 4 == 0) {
1276  if ((nw = snprintf(dest + offset, destsz - offset, "\n%c%d ",
1277  from_client ? '>' : '<', client)) == -1) {
1278  return -1;
1279  }
1280  offset += nw;
1281  }
1282  if ((nw = snprintf(dest + offset, destsz - offset,
1283  " 0x%02x", (unsigned char)data[ii])) == -1) {
1284  return -1;
1285  }
1286  offset += nw;
1287  }
1288 
1289  if ((nw = snprintf(dest + offset, destsz - offset, "\n")) == -1) {
1290  return -1;
1291  }
1292 
1293  return offset + nw;
1294 }
1295 
1296 static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
1298 
1299  assert(c);
1300 
1301  c->msgcurr = 0;
1302  c->msgused = 0;
1303  c->iovused = 0;
1304  if (add_msghdr(c) != 0) {
1305  /* XXX: out_string is inappropriate here */
1306  out_string(c, "SERVER_ERROR out of memory");
1307  return;
1308  }
1309 
1310  header = (protocol_binary_response_header *)c->wbuf;
1311 
1312  header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1313  header->response.opcode = c->binary_header.request.opcode;
1314  header->response.keylen = (uint16_t)htons(key_len);
1315 
1316  header->response.extlen = (uint8_t)hdr_len;
1317  header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
1318  header->response.status = (uint16_t)htons(err);
1319 
1320  header->response.bodylen = htonl(body_len);
1321  header->response.opaque = c->opaque;
1322  header->response.cas = htonll(c->cas);
1323 
1324  if (settings.verbose > 1) {
1325  char buffer[1024];
1326  if (bytes_to_output_string(buffer, sizeof(buffer), c->sfd, false,
1327  "Writing bin response:",
1328  (const char*)header->bytes,
1329  sizeof(header->bytes)) != -1) {
1330  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1331  "%s", buffer);
1332  }
1333  }
1334 
1335  add_iov(c, c->wbuf, sizeof(header->response));
1336 }
1337 
1344 static protocol_binary_response_status engine_error_2_protocol_error(ENGINE_ERROR_CODE e) {
1346 
1347  switch (e) {
1348  case ENGINE_SUCCESS:
1349  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
1350  case ENGINE_KEY_ENOENT:
1351  return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1352  case ENGINE_KEY_EEXISTS:
1353  return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1354  case ENGINE_ENOMEM:
1355  return PROTOCOL_BINARY_RESPONSE_ENOMEM;
1356  case ENGINE_TMPFAIL:
1357  return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1358  case ENGINE_NOT_STORED:
1359  return PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1360  case ENGINE_EINVAL:
1361  return PROTOCOL_BINARY_RESPONSE_EINVAL;
1362  case ENGINE_ENOTSUP:
1363  return PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED;
1364  case ENGINE_E2BIG:
1365  return PROTOCOL_BINARY_RESPONSE_E2BIG;
1366  case ENGINE_NOT_MY_VBUCKET:
1367  return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1368  default:
1369  ret = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1370  }
1371 
1372  return ret;
1373 }
1374 
1375 static void write_bin_packet(conn *c, protocol_binary_response_status err, int swallow) {
1376  ssize_t len;
1377  char buffer[1024] = { [sizeof(buffer) - 1] = '\0' };
1378 
1379  switch (err) {
1380  case PROTOCOL_BINARY_RESPONSE_SUCCESS:
1381  len = 0;
1382  break;
1383  case PROTOCOL_BINARY_RESPONSE_ENOMEM:
1384  len = snprintf(buffer, sizeof(buffer), "Out of memory");
1385  break;
1386  case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
1387  len = snprintf(buffer, sizeof(buffer), "Temporary failure");
1388  break;
1389  case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
1390  len = snprintf(buffer, sizeof(buffer), "Unknown command");
1391  break;
1392  case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
1393  len = snprintf(buffer, sizeof(buffer), "Not found");
1394  break;
1395  case PROTOCOL_BINARY_RESPONSE_EINVAL:
1396  len = snprintf(buffer, sizeof(buffer), "Invalid arguments");
1397  break;
1398  case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
1399  len = snprintf(buffer, sizeof(buffer), "Data exists for key");
1400  break;
1401  case PROTOCOL_BINARY_RESPONSE_E2BIG:
1402  len = snprintf(buffer, sizeof(buffer), "Too large");
1403  break;
1404  case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
1405  len = snprintf(buffer, sizeof(buffer),
1406  "Non-numeric server-side value for incr or decr");
1407  break;
1408  case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
1409  len = snprintf(buffer, sizeof(buffer), "Not stored");
1410  break;
1411  case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
1412  len = snprintf(buffer, sizeof(buffer), "Auth failure");
1413  break;
1414  case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
1415  len = snprintf(buffer, sizeof(buffer), "Not supported");
1416  break;
1417  case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
1418  len = snprintf(buffer, sizeof(buffer),
1419  "I'm not responsible for this vbucket");
1420  break;
1421 
1422  default:
1423  len = snprintf(buffer, sizeof(buffer), "UNHANDLED ERROR (%d)", err);
1424  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1425  ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
1426  }
1427 
1428  /* Allow the engine to pass extra error information */
1429  if (settings.engine.v1->errinfo != NULL) {
1430  size_t elen = settings.engine.v1->errinfo(settings.engine.v0, c, buffer + len + 2,
1431  sizeof(buffer) - len - 3);
1432 
1433  if (elen > 0) {
1434  memcpy(buffer + len, ": ", 2);
1435  len += elen + 2;
1436  }
1437  }
1438 
1439  if (err != PROTOCOL_BINARY_RESPONSE_SUCCESS && settings.verbose > 1) {
1440  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1441  ">%d Writing an error: %s\n", c->sfd,
1442  buffer);
1443  }
1444 
1445  add_bin_header(c, err, 0, 0, len);
1446  if (len > 0) {
1447  add_iov(c, buffer, len);
1448  }
1449  conn_set_state(c, conn_mwrite);
1450  if (swallow > 0) {
1451  c->sbytes = swallow;
1452  c->write_and_go = conn_swallow;
1453  } else {
1454  c->write_and_go = conn_new_cmd;
1455  }
1456 }
1457 
1458 /* Form and send a response to a command over the binary protocol */
1459 static void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) {
1460  if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1461  c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1462  add_bin_header(c, 0, hlen, keylen, dlen);
1463  if(dlen > 0) {
1464  add_iov(c, d, dlen);
1465  }
1466  conn_set_state(c, conn_mwrite);
1467  c->write_and_go = conn_new_cmd;
1468  } else {
1469  conn_set_state(c, conn_new_cmd);
1470  }
1471 }
1472 
1473 
1474 static void complete_incr_bin(conn *c) {
1476  protocol_binary_request_incr* req = binary_get_request(c);
1477 
1478  assert(c != NULL);
1479  assert(c->wsize >= sizeof(*rsp));
1480 
1481  /* fix byteorder in the request */
1482  uint64_t delta = ntohll(req->message.body.delta);
1483  uint64_t initial = ntohll(req->message.body.initial);
1484  rel_time_t expiration = ntohl(req->message.body.expiration);
1485  char *key = binary_get_key(c);
1486  size_t nkey = c->binary_header.request.keylen;
1487  bool incr = (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT ||
1488  c->cmd == PROTOCOL_BINARY_CMD_INCREMENTQ);
1489 
1490  if (settings.verbose > 1) {
1491  char buffer[1024];
1492  ssize_t nw;
1493  nw = key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1494  incr ? "INCR" : "DECR", key, nkey);
1495  if (nw != -1) {
1496  if (snprintf(buffer + nw, sizeof(buffer) - nw,
1497  " %" PRIu64 ", %" PRIu64 ", %" PRIu64 "\n",
1498  delta, initial, (uint64_t)expiration) != -1) {
1499  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s",
1500  buffer);
1501  }
1502  }
1503  }
1504 
1505  ENGINE_ERROR_CODE ret = c->aiostat;
1506  c->aiostat = ENGINE_SUCCESS;
1507  if (ret == ENGINE_SUCCESS) {
1508  ret = settings.engine.v1->arithmetic(settings.engine.v0,
1509  c, key, nkey, incr,
1510  req->message.body.expiration != 0xffffffff,
1511  delta, initial, expiration,
1512  &c->cas,
1513  &rsp->message.body.value,
1514  c->binary_header.request.vbucket);
1515  }
1516 
1517  switch (ret) {
1518  case ENGINE_SUCCESS:
1519  rsp->message.body.value = htonll(rsp->message.body.value);
1520  write_bin_response(c, &rsp->message.body, 0, 0,
1521  sizeof (rsp->message.body.value));
1522  if (incr) {
1523  STATS_INCR(c, incr_hits, key, nkey);
1524  } else {
1525  STATS_INCR(c, decr_hits, key, nkey);
1526  }
1527  break;
1528  case ENGINE_KEY_EEXISTS:
1529  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1530  break;
1531  case ENGINE_KEY_ENOENT:
1532  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1533  if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1534  STATS_INCR(c, incr_misses, key, nkey);
1535  } else {
1536  STATS_INCR(c, decr_misses, key, nkey);
1537  }
1538  break;
1539  case ENGINE_ENOMEM:
1540  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1541  break;
1542  case ENGINE_TMPFAIL:
1543  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1544  break;
1545  case ENGINE_EINVAL:
1546  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0);
1547  break;
1548  case ENGINE_NOT_STORED:
1549  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1550  break;
1551  case ENGINE_DISCONNECT:
1552  c->state = conn_closing;
1553  break;
1554  case ENGINE_ENOTSUP:
1555  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1556  break;
1557  case ENGINE_NOT_MY_VBUCKET:
1558  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1559  break;
1560  case ENGINE_EWOULDBLOCK:
1561  c->ewouldblock = true;
1562  break;
1563  default:
1564  abort();
1565  }
1566 }
1567 
1568 static void complete_update_bin(conn *c) {
1569  protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1570  assert(c != NULL);
1571 
1572  item *it = c->item;
1573  item_info info = { .nvalue = 1 };
1574  if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
1575  settings.engine.v1->release(settings.engine.v0, c, it);
1576  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1577  "%d: Failed to get item info\n",
1578  c->sfd);
1579  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1580  return;
1581  }
1582 
1583  ENGINE_ERROR_CODE ret = c->aiostat;
1584  c->aiostat = ENGINE_SUCCESS;
1585  if (ret == ENGINE_SUCCESS) {
1586  ret = settings.engine.v1->store(settings.engine.v0, c,
1587  it, &c->cas, c->store_op,
1588  c->binary_header.request.vbucket);
1589  }
1590 
1591 #ifdef ENABLE_DTRACE
1592  switch (c->cmd) {
1593  case OPERATION_ADD:
1594  MEMCACHED_COMMAND_ADD(c->sfd, info.key, info.nkey,
1595  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1596  break;
1597  case OPERATION_REPLACE:
1598  MEMCACHED_COMMAND_REPLACE(c->sfd, info.key, info.nkey,
1599  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1600  break;
1601  case OPERATION_APPEND:
1602  MEMCACHED_COMMAND_APPEND(c->sfd, info.key, info.nkey,
1603  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1604  break;
1605  case OPERATION_PREPEND:
1606  MEMCACHED_COMMAND_PREPEND(c->sfd, info.key, info.nkey,
1607  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1608  break;
1609  case OPERATION_SET:
1610  MEMCACHED_COMMAND_SET(c->sfd, info.key, info.nkey,
1611  (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1612  break;
1613  }
1614 #endif
1615 
1616  switch (ret) {
1617  case ENGINE_SUCCESS:
1618  /* Stored */
1619  write_bin_response(c, NULL, 0, 0, 0);
1620  break;
1621  case ENGINE_KEY_EEXISTS:
1622  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1623  break;
1624  case ENGINE_KEY_ENOENT:
1625  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1626  break;
1627  case ENGINE_ENOMEM:
1628  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1629  break;
1630  case ENGINE_TMPFAIL:
1631  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1632  break;
1633  case ENGINE_EWOULDBLOCK:
1634  c->ewouldblock = true;
1635  break;
1636  case ENGINE_DISCONNECT:
1637  c->state = conn_closing;
1638  break;
1639  case ENGINE_ENOTSUP:
1640  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1641  break;
1642  case ENGINE_NOT_MY_VBUCKET:
1643  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1644  break;
1645  default:
1646  if (c->store_op == OPERATION_ADD) {
1647  eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1648  } else if(c->store_op == OPERATION_REPLACE) {
1649  eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1650  } else {
1651  eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1652  }
1653  write_bin_packet(c, eno, 0);
1654  }
1655 
1656  if (c->store_op == OPERATION_CAS) {
1657  switch (ret) {
1658  case ENGINE_SUCCESS:
1659  SLAB_INCR(c, cas_hits, info.key, info.nkey);
1660  break;
1661  case ENGINE_KEY_EEXISTS:
1662  SLAB_INCR(c, cas_badval, info.key, info.nkey);
1663  break;
1664  case ENGINE_KEY_ENOENT:
1665  STATS_NOKEY(c, cas_misses);
1666  break;
1667  default:
1668  ;
1669  }
1670  } else {
1671  SLAB_INCR(c, cmd_set, info.key, info.nkey);
1672  }
1673 
1674  if (!c->ewouldblock) {
1675  /* release the c->item reference */
1676  settings.engine.v1->release(settings.engine.v0, c, c->item);
1677  c->item = 0;
1678  }
1679 }
1680 
1681 static void process_bin_get(conn *c) {
1682  item *it;
1683 
1685  char* key = binary_get_key(c);
1686  size_t nkey = c->binary_header.request.keylen;
1687 
1688  if (settings.verbose > 1) {
1689  char buffer[1024];
1690  if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1691  "GET", key, nkey) != -1) {
1692  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
1693  buffer);
1694  }
1695  }
1696 
1697  ENGINE_ERROR_CODE ret = c->aiostat;
1698  c->aiostat = ENGINE_SUCCESS;
1699  if (ret == ENGINE_SUCCESS) {
1700  ret = settings.engine.v1->get(settings.engine.v0, c, &it, key, nkey,
1701  c->binary_header.request.vbucket);
1702  }
1703 
1704  uint16_t keylen;
1705  uint32_t bodylen;
1706  item_info info = { .nvalue = 1 };
1707 
1708  switch (ret) {
1709  case ENGINE_SUCCESS:
1710  if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
1711  settings.engine.v1->release(settings.engine.v0, c, it);
1712  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1713  "%d: Failed to get item info\n",
1714  c->sfd);
1715  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1716  break;
1717  }
1718 
1719  keylen = 0;
1720  bodylen = sizeof(rsp->message.body) + info.nbytes;
1721 
1722  STATS_HIT(c, get, key, nkey);
1723 
1724  if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1725  bodylen += nkey;
1726  keylen = nkey;
1727  }
1728  add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1729  rsp->message.header.response.cas = htonll(info.cas);
1730 
1731  // add the flags
1732  rsp->message.body.flags = info.flags;
1733  add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1734 
1735  if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1736  add_iov(c, info.key, nkey);
1737  }
1738 
1739  add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
1740  conn_set_state(c, conn_mwrite);
1741  /* Remember this item so we can garbage collect it later */
1742  c->item = it;
1743  break;
1744  case ENGINE_KEY_ENOENT:
1745  STATS_MISS(c, get, key, nkey);
1746 
1747  MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1748 
1749  if (c->noreply) {
1750  conn_set_state(c, conn_new_cmd);
1751  } else {
1752  if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1753  char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1754  add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1755  0, nkey, nkey);
1756  memcpy(ofs, key, nkey);
1757  add_iov(c, ofs, nkey);
1758  conn_set_state(c, conn_mwrite);
1759  } else {
1760  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1761  }
1762  }
1763  break;
1764  case ENGINE_EWOULDBLOCK:
1765  c->ewouldblock = true;
1766  break;
1767  case ENGINE_DISCONNECT:
1768  c->state = conn_closing;
1769  break;
1770  case ENGINE_TMPFAIL:
1771  break;
1772  case ENGINE_ENOTSUP:
1773  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1774  break;
1775  case ENGINE_NOT_MY_VBUCKET:
1776  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1777  break;
1778  default:
1779  /* @todo add proper error handling! */
1780  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1781  "Unknown error code: %d\n", ret);
1782  abort();
1783  }
1784 
1785  if (settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) {
1786  stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS);
1787  }
1788 }
1789 
1790 static void append_bin_stats(const char *key, const uint16_t klen,
1791  const char *val, const uint32_t vlen,
1792  conn *c) {
1793  char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1794  uint32_t bodylen = klen + vlen;
1796  .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
1797  .response.opcode = PROTOCOL_BINARY_CMD_STAT,
1798  .response.keylen = (uint16_t)htons(klen),
1799  .response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1800  .response.bodylen = htonl(bodylen),
1801  .response.opaque = c->opaque
1802  };
1803 
1804  memcpy(buf, header.bytes, sizeof(header.response));
1805  buf += sizeof(header.response);
1806 
1807  if (klen > 0) {
1808  memcpy(buf, key, klen);
1809  buf += klen;
1810 
1811  if (vlen > 0) {
1812  memcpy(buf, val, vlen);
1813  }
1814  }
1815 
1816  c->dynamic_buffer.offset += sizeof(header.response) + bodylen;
1817 }
1818 
1824 static void append_ascii_stats(const char *key, const uint16_t klen,
1825  const char *val, const uint32_t vlen,
1826  conn *c) {
1827  char *pos = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1828  uint32_t nbytes = 5; /* "END\r\n" or "STAT " */
1829 
1830  if (klen == 0 && vlen == 0) {
1831  memcpy(pos, "END\r\n", 5);
1832  } else {
1833  memcpy(pos, "STAT ", 5);
1834  memcpy(pos + nbytes, key, klen);
1835  nbytes += klen;
1836  if (vlen != 0) {
1837  pos[nbytes] = ' ';
1838  ++nbytes;
1839  memcpy(pos + nbytes, val, vlen);
1840  nbytes += vlen;
1841  }
1842  memcpy(pos + nbytes, "\r\n", 2);
1843  nbytes += 2;
1844  }
1845 
1846  c->dynamic_buffer.offset += nbytes;
1847 }
1848 
1849 static bool grow_dynamic_buffer(conn *c, size_t needed) {
1850  size_t nsize = c->dynamic_buffer.size;
1851  size_t available = nsize - c->dynamic_buffer.offset;
1852  bool rv = true;
1853 
1854  /* Special case: No buffer -- need to allocate fresh */
1855  if (c->dynamic_buffer.buffer == NULL) {
1856  nsize = 1024;
1857  available = c->dynamic_buffer.size = c->dynamic_buffer.offset = 0;
1858  }
1859 
1860  while (needed > available) {
1861  assert(nsize > 0);
1862  nsize = nsize << 1;
1863  available = nsize - c->dynamic_buffer.offset;
1864  }
1865 
1866  if (nsize != c->dynamic_buffer.size) {
1867  char *ptr = realloc(c->dynamic_buffer.buffer, nsize);
1868  if (ptr) {
1869  c->dynamic_buffer.buffer = ptr;
1870  c->dynamic_buffer.size = nsize;
1871  } else {
1872  rv = false;
1873  }
1874  }
1875 
1876  return rv;
1877 }
1878 
1879 static void append_stats(const char *key, const uint16_t klen,
1880  const char *val, const uint32_t vlen,
1881  const void *cookie)
1882 {
1883  /* value without a key is invalid */
1884  if (klen == 0 && vlen > 0) {
1885  return ;
1886  }
1887 
1888  conn *c = (conn*)cookie;
1889 
1890  if (c->protocol == binary_prot) {
1891  size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1892  if (!grow_dynamic_buffer(c, needed)) {
1893  return ;
1894  }
1895  append_bin_stats(key, klen, val, vlen, c);
1896  } else {
1897  size_t needed = vlen + klen + 10; // 10 == "STAT = \r\n"
1898  if (!grow_dynamic_buffer(c, needed)) {
1899  return ;
1900  }
1901  append_ascii_stats(key, klen, val, vlen, c);
1902  }
1903 
1904  assert(c->dynamic_buffer.offset <= c->dynamic_buffer.size);
1905 }
1906 
1907 static void process_bin_stat(conn *c) {
1908  char *subcommand = binary_get_key(c);
1909  size_t nkey = c->binary_header.request.keylen;
1910 
1911  if (settings.verbose > 1) {
1912  char buffer[1024];
1913  if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1914  "STATS", subcommand, nkey) != -1) {
1915  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
1916  buffer);
1917  }
1918  }
1919 
1920  ENGINE_ERROR_CODE ret = c->aiostat;
1921  c->aiostat = ENGINE_SUCCESS;
1922  c->ewouldblock = false;
1923 
1924  if (ret == ENGINE_SUCCESS) {
1925  if (nkey == 0) {
1926  /* request all statistics */
1927  ret = settings.engine.v1->get_stats(settings.engine.v0, c, NULL, 0, append_stats);
1928  if (ret == ENGINE_SUCCESS) {
1929  server_stats(&append_stats, c, false);
1930  }
1931  } else if (strncmp(subcommand, "reset", 5) == 0) {
1932  stats_reset(c);
1933  settings.engine.v1->reset_stats(settings.engine.v0, c);
1934  } else if (strncmp(subcommand, "settings", 8) == 0) {
1935  process_stat_settings(&append_stats, c);
1936  } else if (strncmp(subcommand, "detail", 6) == 0) {
1937  char *subcmd_pos = subcommand + 6;
1938  if (settings.allow_detailed) {
1939  if (strncmp(subcmd_pos, " dump", 5) == 0) {
1940  int len;
1941  char *dump_buf = stats_prefix_dump(&len);
1942  if (dump_buf == NULL || len <= 0) {
1943  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1944  return ;
1945  } else {
1946  append_stats("detailed", strlen("detailed"), dump_buf, len, c);
1947  free(dump_buf);
1948  }
1949  } else if (strncmp(subcmd_pos, " on", 3) == 0) {
1950  settings.detail_enabled = 1;
1951  } else if (strncmp(subcmd_pos, " off", 4) == 0) {
1952  settings.detail_enabled = 0;
1953  } else {
1954  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1955  return;
1956  }
1957  } else {
1958  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1959  return;
1960  }
1961  } else if (strncmp(subcommand, "aggregate", 9) == 0) {
1962  server_stats(&append_stats, c, true);
1963  } else if (strncmp(subcommand, "topkeys", 7) == 0) {
1964  topkeys_t *tk = get_independent_stats(c)->topkeys;
1965  if (tk != NULL) {
1966  topkeys_stats(tk, c, current_time, append_stats);
1967  } else {
1968  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1969  return;
1970  }
1971  } else {
1972  ret = settings.engine.v1->get_stats(settings.engine.v0, c,
1973  subcommand, nkey,
1974  append_stats);
1975  }
1976  }
1977 
1978  switch (ret) {
1979  case ENGINE_SUCCESS:
1980  append_stats(NULL, 0, NULL, 0, c);
1981  write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
1982  c->dynamic_buffer.buffer = NULL;
1983  break;
1984  case ENGINE_ENOMEM:
1985  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1986  break;
1987  case ENGINE_TMPFAIL:
1988  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1989  break;
1990  case ENGINE_KEY_ENOENT:
1991  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1992  break;
1993  case ENGINE_DISCONNECT:
1994  c->state = conn_closing;
1995  break;
1996  case ENGINE_ENOTSUP:
1997  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1998  break;
1999  case ENGINE_EWOULDBLOCK:
2000  c->ewouldblock = true;
2001  break;
2002  default:
2003  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2004  }
2005 }
2006 
2007 static void bin_read_chunk(conn *c, enum bin_substates next_substate, uint32_t chunk) {
2008  assert(c);
2009  c->substate = next_substate;
2010  c->rlbytes = chunk;
2011 
2012  /* Ok... do we have room for everything in our buffer? */
2013  ptrdiff_t offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
2014  if (c->rlbytes > c->rsize - offset) {
2015  size_t nsize = c->rsize;
2016  size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
2017 
2018  while (size > nsize) {
2019  nsize *= 2;
2020  }
2021 
2022  if (nsize != c->rsize) {
2023  if (settings.verbose > 1) {
2024  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2025  "%d: Need to grow buffer from %lu to %lu\n",
2026  c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
2027  }
2028  char *newm = realloc(c->rbuf, nsize);
2029  if (newm == NULL) {
2030  if (settings.verbose) {
2031  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2032  "%d: Failed to grow buffer.. closing connection\n",
2033  c->sfd);
2034  }
2035  conn_set_state(c, conn_closing);
2036  return;
2037  }
2038 
2039  c->rbuf= newm;
2040  /* rcurr should point to the same offset in the packet */
2041  c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
2042  c->rsize = nsize;
2043  }
2044  if (c->rbuf != c->rcurr) {
2045  memmove(c->rbuf, c->rcurr, c->rbytes);
2046  c->rcurr = c->rbuf;
2047  if (settings.verbose > 1) {
2048  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2049  "%d: Repack input buffer\n",
2050  c->sfd);
2051  }
2052  }
2053  }
2054 
2055  /* preserve the header in the buffer.. */
2056  c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
2057  conn_set_state(c, conn_nread);
2058 }
2059 
2060 static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
2061  bin_read_chunk(c, next_substate, c->keylen + extra);
2062 }
2063 
2064 
2065 /* Just write an error message and disconnect the client */
2066 static void handle_binary_protocol_error(conn *c) {
2067  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2068  if (settings.verbose) {
2069  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2070  "%d: Protocol error (opcode %02x), close connection\n",
2071  c->sfd, c->binary_header.request.opcode);
2072  }
2073  c->write_and_go = conn_closing;
2074 }
2075 
2076 static void init_sasl_conn(conn *c) {
2077  assert(c);
2078  if (!c->sasl_conn) {
2079  int result=sasl_server_new("memcached",
2080  NULL, NULL, NULL, NULL,
2081  NULL, 0, &c->sasl_conn);
2082  if (result != SASL_OK) {
2083  if (settings.verbose) {
2084  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2085  "%d: Failed to initialize SASL conn.\n",
2086  c->sfd);
2087  }
2088  c->sasl_conn = NULL;
2089  }
2090  }
2091 }
2092 
2093 static void get_auth_data(const void *cookie, auth_data_t *data) {
2094  conn *c = (conn*)cookie;
2095  if (c->sasl_conn) {
2096  sasl_getprop(c->sasl_conn, SASL_USERNAME, (void*)&data->username);
2097 #ifdef ENABLE_ISASL
2098  sasl_getprop(c->sasl_conn, ISASL_CONFIG, (void*)&data->config);
2099 #endif
2100  }
2101 }
2102 
2103 #ifdef SASL_ENABLED
2104 static void bin_list_sasl_mechs(conn *c) {
2105  init_sasl_conn(c);
2106  const char *result_string = NULL;
2107  unsigned int string_length = 0;
2108  int result=sasl_listmech(c->sasl_conn, NULL,
2109  "", /* What to prepend the string with */
2110  " ", /* What to separate mechanisms with */
2111  "", /* What to append to the string */
2112  &result_string, &string_length,
2113  NULL);
2114  if (result != SASL_OK) {
2115  /* Perhaps there's a better error for this... */
2116  if (settings.verbose) {
2117  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2118  "%d: Failed to list SASL mechanisms.\n",
2119  c->sfd);
2120  }
2121  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2122  return;
2123  }
2124  write_bin_response(c, (char*)result_string, 0, 0, string_length);
2125 }
2126 #endif
2127 
2128 struct sasl_tmp {
2129  int ksize;
2130  int vsize;
2131  char data[]; /* data + ksize == value */
2132 };
2133 
2134 static void process_bin_sasl_auth(conn *c) {
2135  assert(c->binary_header.request.extlen == 0);
2136 
2137  int nkey = c->binary_header.request.keylen;
2138  int vlen = c->binary_header.request.bodylen - nkey;
2139 
2140  if (nkey > MAX_SASL_MECH_LEN) {
2141  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
2142  c->write_and_go = conn_swallow;
2143  return;
2144  }
2145 
2146  char *key = binary_get_key(c);
2147  assert(key);
2148 
2149  size_t buffer_size = sizeof(struct sasl_tmp) + nkey + vlen + 2;
2150  struct sasl_tmp *data = calloc(sizeof(struct sasl_tmp) + buffer_size, 1);
2151  if (!data) {
2152  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2153  c->write_and_go = conn_swallow;
2154  return;
2155  }
2156 
2157  data->ksize = nkey;
2158  data->vsize = vlen;
2159  memcpy(data->data, key, nkey);
2160 
2161  c->item = data;
2162  c->ritem = data->data + nkey;
2163  c->rlbytes = vlen;
2164  conn_set_state(c, conn_nread);
2165  c->substate = bin_reading_sasl_auth_data;
2166 }
2167 
2168 static void process_bin_complete_sasl_auth(conn *c) {
2169  const char *out = NULL;
2170  unsigned int outlen = 0;
2171 
2172  assert(c->item);
2173  init_sasl_conn(c);
2174 
2175  int nkey = c->binary_header.request.keylen;
2176  int vlen = c->binary_header.request.bodylen - nkey;
2177 
2178  struct sasl_tmp *stmp = c->item;
2179  char mech[nkey+1];
2180  memcpy(mech, stmp->data, nkey);
2181  mech[nkey] = 0x00;
2182 
2183  if (settings.verbose) {
2184  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2185  "%d: mech: ``%s'' with %d bytes of data\n", c->sfd, mech, vlen);
2186  }
2187 
2188  const char *challenge = vlen == 0 ? NULL : (stmp->data + nkey);
2189 
2190  int result=-1;
2191 
2192  switch (c->cmd) {
2193  case PROTOCOL_BINARY_CMD_SASL_AUTH:
2194  result = sasl_server_start(c->sasl_conn, mech,
2195  challenge, vlen,
2196  &out, &outlen);
2197  break;
2198  case PROTOCOL_BINARY_CMD_SASL_STEP:
2199  result = sasl_server_step(c->sasl_conn,
2200  challenge, vlen,
2201  &out, &outlen);
2202  break;
2203  default:
2204  assert(false); /* CMD should be one of the above */
2205  /* This code is pretty much impossible, but makes the compiler
2206  happier */
2207  if (settings.verbose) {
2208  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2209  "%d: Unhandled command %d with challenge %s\n",
2210  c->sfd, c->cmd, challenge);
2211  }
2212  break;
2213  }
2214 
2215  free(c->item);
2216  c->item = NULL;
2217  c->ritem = NULL;
2218 
2219  if (settings.verbose) {
2220  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2221  "%d: sasl result code: %d\n",
2222  c->sfd, result);
2223  }
2224 
2225  switch(result) {
2226  case SASL_OK:
2227  write_bin_response(c, "Authenticated", 0, 0, strlen("Authenticated"));
2228  auth_data_t data;
2229  get_auth_data(c, &data);
2230  perform_callbacks(ON_AUTH, (const void*)&data, c);
2231  STATS_NOKEY(c, auth_cmds);
2232  break;
2233  case SASL_CONTINUE:
2234  add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen);
2235  if(outlen > 0) {
2236  add_iov(c, out, outlen);
2237  }
2238  conn_set_state(c, conn_mwrite);
2239  c->write_and_go = conn_new_cmd;
2240  break;
2241  default:
2242  if (settings.verbose) {
2243  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2244  "%d: Unknown sasl response: %d\n",
2245  c->sfd, result);
2246  }
2247  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2248  STATS_NOKEY2(c, auth_cmds, auth_errors);
2249  }
2250 }
2251 
2252 static bool authenticated(conn *c) {
2253  bool rv = false;
2254 
2255  switch (c->cmd) {
2256  case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
2257  case PROTOCOL_BINARY_CMD_SASL_AUTH: /* FALLTHROUGH */
2258  case PROTOCOL_BINARY_CMD_SASL_STEP: /* FALLTHROUGH */
2259  case PROTOCOL_BINARY_CMD_VERSION: /* FALLTHROUGH */
2260  rv = true;
2261  break;
2262  default:
2263  if (c->sasl_conn) {
2264  const void *uname = NULL;
2265  sasl_getprop(c->sasl_conn, SASL_USERNAME, &uname);
2266  rv = uname != NULL;
2267  }
2268  }
2269 
2270  if (settings.verbose > 1) {
2271  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2272  "%d: authenticated() in cmd 0x%02x is %s\n",
2273  c->sfd, c->cmd, rv ? "true" : "false");
2274  }
2275 
2276  return rv;
2277 }
2278 
2279 static bool binary_response_handler(const void *key, uint16_t keylen,
2280  const void *ext, uint8_t extlen,
2281  const void *body, uint32_t bodylen,
2282  uint8_t datatype, uint16_t status,
2283  uint64_t cas, const void *cookie)
2284 {
2285  conn *c = (conn*)cookie;
2286  /* Look at append_bin_stats */
2287  size_t needed = keylen + extlen + bodylen + sizeof(protocol_binary_response_header);
2288  if (!grow_dynamic_buffer(c, needed)) {
2289  if (settings.verbose > 0) {
2290  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2291  "<%d ERROR: Failed to allocate memory for response\n",
2292  c->sfd);
2293  }
2294  return false;
2295  }
2296 
2297  char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
2299  .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
2300  .response.opcode = c->binary_header.request.opcode,
2301  .response.keylen = (uint16_t)htons(keylen),
2302  .response.extlen = extlen,
2303  .response.datatype = datatype,
2304  .response.status = (uint16_t)htons(status),
2305  .response.bodylen = htonl(bodylen + keylen + extlen),
2306  .response.opaque = c->opaque,
2307  .response.cas = htonll(cas),
2308  };
2309 
2310  memcpy(buf, header.bytes, sizeof(header.response));
2311  buf += sizeof(header.response);
2312 
2313  if (extlen > 0) {
2314  memcpy(buf, ext, extlen);
2315  buf += extlen;
2316  }
2317 
2318  if (keylen > 0) {
2319  memcpy(buf, key, keylen);
2320  buf += keylen;
2321  }
2322 
2323  if (bodylen > 0) {
2324  memcpy(buf, body, bodylen);
2325  }
2326 
2327  c->dynamic_buffer.offset += needed;
2328 
2329  return true;
2330 }
2331 
2337  uint64_t connect;
2338  uint64_t mutation;
2339  uint64_t checkpoint_start;
2340  uint64_t checkpoint_end;
2341  uint64_t delete;
2342  uint64_t flush;
2343  uint64_t opaque;
2344  uint64_t vbucket_set;
2345 };
2346 
2347 struct tap_stats {
2348  pthread_mutex_t mutex;
2349  struct tap_cmd_stats sent;
2350  struct tap_cmd_stats received;
2351 } tap_stats = { .mutex = PTHREAD_MUTEX_INITIALIZER };
2352 
2353 static void ship_tap_log(conn *c) {
2354  assert(c->thread->type == TAP);
2355  c->msgcurr = 0;
2356  c->msgused = 0;
2357  c->iovused = 0;
2358  if (add_msghdr(c) != 0) {
2359  if (settings.verbose) {
2360  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2361  "%d: Failed to create output headers. Shutting down tap connection\n", c->sfd);
2362  }
2363  conn_set_state(c, conn_closing);
2364  return ;
2365  }
2366  /* @todo add check for buffer overflow of c->wbuf) */
2367  c->wcurr = c->wbuf;
2368 
2369  bool more_data = true;
2370  bool send_data = false;
2371  bool disconnect = false;
2372 
2373  item *it;
2374  uint32_t bodylen;
2375  int ii = 0;
2376  c->icurr = c->ilist;
2377  do {
2378  /* @todo fixme! */
2379  if (ii++ == 10) {
2380  break;
2381  }
2382 
2383  void *engine;
2384  uint16_t nengine;
2385  uint8_t ttl;
2386  uint16_t tap_flags;
2387  uint32_t seqno;
2388  uint16_t vbucket;
2389 
2390  tap_event_t event = c->tap_iterator(settings.engine.v0, c, &it,
2391  &engine, &nengine, &ttl,
2392  &tap_flags, &seqno, &vbucket);
2393  union {
2399  } msg = {
2400  .mutation.message.header.request.magic = (uint8_t)PROTOCOL_BINARY_REQ,
2401  };
2402 
2403  msg.opaque.message.header.request.opaque = htonl(seqno);
2404  msg.opaque.message.body.tap.enginespecific_length = htons(nengine);
2405  msg.opaque.message.body.tap.ttl = ttl;
2406  msg.opaque.message.body.tap.flags = htons(tap_flags);
2407  msg.opaque.message.header.request.extlen = 8;
2408  msg.opaque.message.header.request.vbucket = htons(vbucket);
2409  item_info info = { .nvalue = 1 };
2410 
2411  switch (event) {
2412  case TAP_NOOP :
2413  send_data = true;
2414  msg.noop.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
2415  msg.noop.message.header.request.extlen = 0;
2416  msg.noop.message.header.request.bodylen = htonl(0);
2417  memcpy(c->wcurr, msg.noop.bytes, sizeof(msg.noop.bytes));
2418  add_iov(c, c->wcurr, sizeof(msg.noop.bytes));
2419  c->wcurr += sizeof(msg.noop.bytes);
2420  c->wbytes += sizeof(msg.noop.bytes);
2421  break;
2422  case TAP_PAUSE :
2423  more_data = false;
2424  break;
2425  case TAP_CHECKPOINT_START:
2426  case TAP_CHECKPOINT_END:
2427  case TAP_MUTATION:
2428  if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
2429  settings.engine.v1->release(settings.engine.v0, c, it);
2430  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2431  "%d: Failed to get item info\n", c->sfd);
2432  break;
2433  }
2434  send_data = true;
2435  c->ilist[c->ileft++] = it;
2436 
2437  if (event == TAP_CHECKPOINT_START) {
2438  msg.mutation.message.header.request.opcode =
2439  PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START;
2440  pthread_mutex_lock(&tap_stats.mutex);
2441  tap_stats.sent.checkpoint_start++;
2442  pthread_mutex_unlock(&tap_stats.mutex);
2443  } else if (event == TAP_CHECKPOINT_END) {
2444  msg.mutation.message.header.request.opcode =
2445  PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END;
2446  pthread_mutex_lock(&tap_stats.mutex);
2447  tap_stats.sent.checkpoint_end++;
2448  pthread_mutex_unlock(&tap_stats.mutex);
2449  } else if (event == TAP_MUTATION) {
2450  msg.mutation.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_MUTATION;
2451  pthread_mutex_lock(&tap_stats.mutex);
2452  tap_stats.sent.mutation++;
2453  pthread_mutex_unlock(&tap_stats.mutex);
2454  }
2455 
2456  msg.mutation.message.header.request.cas = htonll(info.cas);
2457  msg.mutation.message.header.request.keylen = htons(info.nkey);
2458  msg.mutation.message.header.request.extlen = 16;
2459 
2460  bodylen = 16 + info.nkey + nengine;
2461  if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2462  bodylen += info.nbytes;
2463  }
2464  msg.mutation.message.header.request.bodylen = htonl(bodylen);
2465  msg.mutation.message.body.item.flags = htonl(info.flags);
2466  msg.mutation.message.body.item.expiration = htonl(info.exptime);
2467  msg.mutation.message.body.tap.enginespecific_length = htons(nengine);
2468  msg.mutation.message.body.tap.ttl = ttl;
2469  msg.mutation.message.body.tap.flags = htons(tap_flags);
2470  memcpy(c->wcurr, msg.mutation.bytes, sizeof(msg.mutation.bytes));
2471 
2472  add_iov(c, c->wcurr, sizeof(msg.mutation.bytes));
2473  c->wcurr += sizeof(msg.mutation.bytes);
2474  c->wbytes += sizeof(msg.mutation.bytes);
2475 
2476  if (nengine > 0) {
2477  memcpy(c->wcurr, engine, nengine);
2478  add_iov(c, c->wcurr, nengine);
2479  c->wcurr += nengine;
2480  c->wbytes += nengine;
2481  }
2482 
2483  add_iov(c, info.key, info.nkey);
2484  if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2485  add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
2486  }
2487 
2488  break;
2489  case TAP_DELETION:
2490  /* This is a delete */
2491  if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
2492  settings.engine.v1->release(settings.engine.v0, c, it);
2493  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2494  "%d: Failed to get item info\n", c->sfd);
2495  break;
2496  }
2497  send_data = true;
2498  c->ilist[c->ileft++] = it;
2499  msg.delete.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_DELETE;
2500  msg.delete.message.header.request.cas = htonll(info.cas);
2501  msg.delete.message.header.request.keylen = htons(info.nkey);
2502 
2503  bodylen = 8 + info.nkey + nengine;
2504  if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2505  bodylen += info.nbytes;
2506  }
2507  msg.delete.message.header.request.bodylen = htonl(bodylen);
2508 
2509  memcpy(c->wcurr, msg.delete.bytes, sizeof(msg.delete.bytes));
2510  add_iov(c, c->wcurr, sizeof(msg.delete.bytes));
2511  c->wcurr += sizeof(msg.delete.bytes);
2512  c->wbytes += sizeof(msg.delete.bytes);
2513 
2514  if (nengine > 0) {
2515  memcpy(c->wcurr, engine, nengine);
2516  add_iov(c, c->wcurr, nengine);
2517  c->wcurr += nengine;
2518  c->wbytes += nengine;
2519  }
2520 
2521  add_iov(c, info.key, info.nkey);
2522  if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2523  add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
2524  }
2525 
2526  pthread_mutex_lock(&tap_stats.mutex);
2527  tap_stats.sent.delete++;
2528  pthread_mutex_unlock(&tap_stats.mutex);
2529  break;
2530 
2531  case TAP_DISCONNECT:
2532  disconnect = true;
2533  more_data = false;
2534  break;
2535  case TAP_VBUCKET_SET:
2536  case TAP_FLUSH:
2537  case TAP_OPAQUE:
2538  send_data = true;
2539 
2540  if (event == TAP_OPAQUE) {
2541  msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_OPAQUE;
2542  pthread_mutex_lock(&tap_stats.mutex);
2543  tap_stats.sent.opaque++;
2544  pthread_mutex_unlock(&tap_stats.mutex);
2545 
2546  } else if (event == TAP_FLUSH) {
2547  msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_FLUSH;
2548  pthread_mutex_lock(&tap_stats.mutex);
2549  tap_stats.sent.flush++;
2550  pthread_mutex_unlock(&tap_stats.mutex);
2551  } else if (event == TAP_VBUCKET_SET) {
2552  msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET;
2553  msg.flush.message.body.tap.flags = htons(tap_flags);
2554  pthread_mutex_lock(&tap_stats.mutex);
2555  tap_stats.sent.vbucket_set++;
2556  pthread_mutex_unlock(&tap_stats.mutex);
2557  }
2558 
2559  msg.flush.message.header.request.bodylen = htonl(8 + nengine);
2560  memcpy(c->wcurr, msg.flush.bytes, sizeof(msg.flush.bytes));
2561  add_iov(c, c->wcurr, sizeof(msg.flush.bytes));
2562  c->wcurr += sizeof(msg.flush.bytes);
2563  c->wbytes += sizeof(msg.flush.bytes);
2564  if (nengine > 0) {
2565  memcpy(c->wcurr, engine, nengine);
2566  add_iov(c, c->wcurr, nengine);
2567  c->wcurr += nengine;
2568  c->wbytes += nengine;
2569  }
2570  break;
2571  default:
2572  abort();
2573  }
2574  } while (more_data);
2575 
2576  c->ewouldblock = false;
2577  if (send_data) {
2578  conn_set_state(c, conn_mwrite);
2579  if (disconnect) {
2580  c->write_and_go = conn_closing;
2581  } else {
2582  c->write_and_go = conn_ship_log;
2583  }
2584  } else {
2585  if (disconnect) {
2586  conn_set_state(c, conn_closing);
2587  } else {
2588  /* No more items to ship to the slave at this time.. suspend.. */
2589  if (settings.verbose > 1) {
2590  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2591  "%d: No more items in tap log.. waiting\n",
2592  c->sfd);
2593  }
2594  c->ewouldblock = true;
2595  }
2596  }
2597 }
2598 
2599 static void process_bin_unknown_packet(conn *c) {
2600  void *packet = c->rcurr - (c->binary_header.request.bodylen +
2601  sizeof(c->binary_header));
2602 
2603  ENGINE_ERROR_CODE ret = c->aiostat;
2604  c->aiostat = ENGINE_SUCCESS;
2605  c->ewouldblock = false;
2606 
2607  if (ret == ENGINE_SUCCESS) {
2608  ret = settings.engine.v1->unknown_command(settings.engine.v0, c, packet,
2609  binary_response_handler);
2610  }
2611 
2612  if (ret == ENGINE_SUCCESS) {
2613  if (c->dynamic_buffer.buffer != NULL) {
2614  write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
2615  c->dynamic_buffer.buffer = NULL;
2616  } else {
2617  conn_set_state(c, conn_new_cmd);
2618  }
2619  } else if (ret == ENGINE_ENOTSUP) {
2620  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0);
2621  } else if (ret == ENGINE_EWOULDBLOCK) {
2622  c->ewouldblock = true;
2623  } else {
2624  /* FATAL ERROR, shut down connection */
2625  conn_set_state(c, conn_closing);
2626  }
2627 }
2628 
2629 static void process_bin_tap_connect(conn *c) {
2630  char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2631  sizeof(c->binary_header)));
2632  protocol_binary_request_tap_connect *req = (void*)packet;
2633  const char *key = packet + sizeof(req->bytes);
2634  const char *data = key + c->binary_header.request.keylen;
2635  uint32_t flags = 0;
2636  size_t ndata = c->binary_header.request.bodylen -
2637  c->binary_header.request.extlen -
2638  c->binary_header.request.keylen;
2639 
2640  if (c->binary_header.request.extlen == 4) {
2641  flags = ntohl(req->message.body.flags);
2642 
2643  if (flags & TAP_CONNECT_FLAG_BACKFILL) {
2644  /* the userdata has to be at least 8 bytes! */
2645  if (ndata < 8) {
2646  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2647  "%d: ERROR: Invalid tap connect message\n",
2648  c->sfd);
2649  conn_set_state(c, conn_closing);
2650  return ;
2651  }
2652  }
2653  } else {
2654  data -= 4;
2655  key -= 4;
2656  }
2657 
2658  if (settings.verbose && c->binary_header.request.keylen > 0) {
2659  char buffer[1024];
2660  int len = c->binary_header.request.keylen;
2661  if (len >= sizeof(buffer)) {
2662  len = sizeof(buffer) - 1;
2663  }
2664  memcpy(buffer, key, len);
2665  buffer[len] = '\0';
2666  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2667  "%d: Trying to connect with named tap connection: <%s>\n",
2668  c->sfd, buffer);
2669  }
2670 
2671  TAP_ITERATOR iterator = settings.engine.v1->get_tap_iterator(
2672  settings.engine.v0, c, key, c->binary_header.request.keylen,
2673  flags, data, ndata);
2674 
2675  if (iterator == NULL) {
2676  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2677  "%d: FATAL: The engine does not support tap\n",
2678  c->sfd);
2679  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
2680  c->write_and_go = conn_closing;
2681  } else {
2682  c->tap_iterator = iterator;
2683  c->which = EV_WRITE;
2684  conn_set_state(c, conn_ship_log);
2685  }
2686 }
2687 
2688 static void process_bin_tap_packet(tap_event_t event, conn *c) {
2689  assert(c != NULL);
2690  char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2691  sizeof(c->binary_header)));
2692  protocol_binary_request_tap_no_extras *tap = (void*)packet;
2693  uint16_t nengine = ntohs(tap->message.body.tap.enginespecific_length);
2694  uint16_t tap_flags = ntohs(tap->message.body.tap.flags);
2695  uint32_t seqno = ntohl(tap->message.header.request.opaque);
2696  uint8_t ttl = tap->message.body.tap.ttl;
2697  assert(ttl > 0);
2698  char *engine_specific = packet + sizeof(tap->bytes);
2699  char *key = engine_specific + nengine;
2700  uint16_t nkey = c->binary_header.request.keylen;
2701  char *data = key + nkey;
2702  uint32_t flags = 0;
2703  uint32_t exptime = 0;
2704  uint32_t ndata = c->binary_header.request.bodylen - nengine - nkey - 8;
2705 
2706  if (event == TAP_MUTATION || event == TAP_CHECKPOINT_START ||
2707  event == TAP_CHECKPOINT_END) {
2708  protocol_binary_request_tap_mutation *mutation = (void*)tap;
2709  flags = ntohl(mutation->message.body.item.flags);
2710  exptime = ntohl(mutation->message.body.item.expiration);
2711  key += 8;
2712  data += 8;
2713  ndata -= 8;
2714  }
2715 
2716  ENGINE_ERROR_CODE ret = c->aiostat;
2717  if (ret == ENGINE_SUCCESS) {
2718  ret = settings.engine.v1->tap_notify(settings.engine.v0, c,
2719  engine_specific, nengine,
2720  ttl - 1, tap_flags,
2721  event, seqno,
2722  key, nkey,
2723  flags, exptime,
2724  ntohll(tap->message.header.request.cas),
2725  data, ndata,
2726  c->binary_header.request.vbucket);
2727  }
2728 
2729  switch (ret) {
2730  case ENGINE_DISCONNECT:
2731  conn_set_state(c, conn_closing);
2732  break;
2733  case ENGINE_EWOULDBLOCK:
2734  c->ewouldblock = true;
2735  break;
2736  default:
2737  if ((tap_flags & TAP_FLAG_ACK) ||
2738  (ret != ENGINE_SUCCESS && c->tap_nack_mode))
2739  {
2740  write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2741  } else {
2742  conn_set_state(c, conn_new_cmd);
2743  }
2744  }
2745 }
2746 
2747 static void process_bin_tap_ack(conn *c) {
2748  assert(c != NULL);
2749  char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2750  sizeof(c->binary_header)));
2751  protocol_binary_response_no_extras *rsp = (void*)packet;
2752  uint32_t seqno = ntohl(rsp->message.header.response.opaque);
2753  uint16_t status = ntohs(rsp->message.header.response.status);
2754  char *key = packet + sizeof(rsp->bytes);
2755 
2756  ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
2757  if (settings.engine.v1->tap_notify != NULL) {
2758  ret = settings.engine.v1->tap_notify(settings.engine.v0, c, NULL, 0, 0, status,
2759  TAP_ACK, seqno, key,
2760  c->binary_header.request.keylen, 0, 0,
2761  0, NULL, 0, 0);
2762  }
2763 
2764  if (ret == ENGINE_DISCONNECT) {
2765  conn_set_state(c, conn_closing);
2766  } else {
2767  conn_set_state(c, conn_ship_log);
2768  }
2769 }
2770 
2774 static void process_bin_noop_response(conn *c) {
2775  assert(c != NULL);
2776  conn_set_state(c, conn_new_cmd);
2777 }
2778 
2779 static void process_bin_verbosity(conn *c) {
2780  char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2781  sizeof(c->binary_header)));
2782  protocol_binary_request_verbosity *req = (void*)packet;
2783  uint32_t level = (uint32_t)ntohl(req->message.body.level);
2784  if (level > MAX_VERBOSITY_LEVEL) {
2785  level = MAX_VERBOSITY_LEVEL;
2786  }
2787  settings.verbose = (int)level;
2788  perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
2789  write_bin_response(c, NULL, 0, 0, 0);
2790 }
2791 
2792 static void process_bin_packet(conn *c) {
2793  /* @todo this should be an array of funciton pointers and call through */
2794  switch (c->binary_header.request.opcode) {
2795  case PROTOCOL_BINARY_CMD_TAP_CONNECT:
2796  pthread_mutex_lock(&tap_stats.mutex);
2797  tap_stats.received.connect++;
2798  pthread_mutex_unlock(&tap_stats.mutex);
2799  conn_set_state(c, conn_add_tap_client);
2800  break;
2801  case PROTOCOL_BINARY_CMD_TAP_MUTATION:
2802  pthread_mutex_lock(&tap_stats.mutex);
2803  tap_stats.received.mutation++;
2804  pthread_mutex_unlock(&tap_stats.mutex);
2805  process_bin_tap_packet(TAP_MUTATION, c);
2806  break;
2807  case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
2808  pthread_mutex_lock(&tap_stats.mutex);
2809  tap_stats.received.checkpoint_start++;
2810  pthread_mutex_unlock(&tap_stats.mutex);
2811  process_bin_tap_packet(TAP_CHECKPOINT_START, c);
2812  break;
2813  case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
2814  pthread_mutex_lock(&tap_stats.mutex);
2815  tap_stats.received.checkpoint_end++;
2816  pthread_mutex_unlock(&tap_stats.mutex);
2817  process_bin_tap_packet(TAP_CHECKPOINT_END, c);
2818  break;
2819  case PROTOCOL_BINARY_CMD_TAP_DELETE:
2820  pthread_mutex_lock(&tap_stats.mutex);
2821  tap_stats.received.delete++;
2822  pthread_mutex_unlock(&tap_stats.mutex);
2823  process_bin_tap_packet(TAP_DELETION, c);
2824  break;
2825  case PROTOCOL_BINARY_CMD_TAP_FLUSH:
2826  pthread_mutex_lock(&tap_stats.mutex);
2827  tap_stats.received.flush++;
2828  pthread_mutex_unlock(&tap_stats.mutex);
2829  process_bin_tap_packet(TAP_FLUSH, c);
2830  break;
2831  case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
2832  pthread_mutex_lock(&tap_stats.mutex);
2833  tap_stats.received.opaque++;
2834  pthread_mutex_unlock(&tap_stats.mutex);
2835  process_bin_tap_packet(TAP_OPAQUE, c);
2836  break;
2837  case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
2838  pthread_mutex_lock(&tap_stats.mutex);
2839  tap_stats.received.vbucket_set++;
2840  pthread_mutex_unlock(&tap_stats.mutex);
2841  process_bin_tap_packet(TAP_VBUCKET_SET, c);
2842  break;
2843  case PROTOCOL_BINARY_CMD_VERBOSITY:
2844  process_bin_verbosity(c);
2845  break;
2846  default:
2847  process_bin_unknown_packet(c);
2848  }
2849 }
2850 
2851 
2852 
2853 typedef void (*RESPONSE_HANDLER)(conn*);
2858 static RESPONSE_HANDLER response_handlers[256] = {
2859  [PROTOCOL_BINARY_CMD_NOOP] = process_bin_noop_response,
2860  [PROTOCOL_BINARY_CMD_TAP_MUTATION] = process_bin_tap_ack,
2861  [PROTOCOL_BINARY_CMD_TAP_DELETE] = process_bin_tap_ack,
2862  [PROTOCOL_BINARY_CMD_TAP_FLUSH] = process_bin_tap_ack,
2863  [PROTOCOL_BINARY_CMD_TAP_OPAQUE] = process_bin_tap_ack,
2864  [PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET] = process_bin_tap_ack,
2865  [PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START] = process_bin_tap_ack,
2866  [PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END] = process_bin_tap_ack
2867 };
2868 
2869 static void dispatch_bin_command(conn *c) {
2870  int protocol_error = 0;
2871 
2872  int extlen = c->binary_header.request.extlen;
2873  uint16_t keylen = c->binary_header.request.keylen;
2874  uint32_t bodylen = c->binary_header.request.bodylen;
2875 
2876  if (settings.require_sasl && !authenticated(c)) {
2877  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2878  c->write_and_go = conn_closing;
2879  return;
2880  }
2881 
2882  MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
2883  c->noreply = true;
2884 
2885  /* binprot supports 16bit keys, but internals are still 8bit */
2886  if (keylen > KEY_MAX_LENGTH) {
2887  handle_binary_protocol_error(c);
2888  return;
2889  }
2890 
2891  switch (c->cmd) {
2892  case PROTOCOL_BINARY_CMD_SETQ:
2893  c->cmd = PROTOCOL_BINARY_CMD_SET;
2894  break;
2895  case PROTOCOL_BINARY_CMD_ADDQ:
2896  c->cmd = PROTOCOL_BINARY_CMD_ADD;
2897  break;
2898  case PROTOCOL_BINARY_CMD_REPLACEQ:
2899  c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
2900  break;
2901  case PROTOCOL_BINARY_CMD_DELETEQ:
2902  c->cmd = PROTOCOL_BINARY_CMD_DELETE;
2903  break;
2904  case PROTOCOL_BINARY_CMD_INCREMENTQ:
2905  c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
2906  break;
2907  case PROTOCOL_BINARY_CMD_DECREMENTQ:
2908  c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
2909  break;
2910  case PROTOCOL_BINARY_CMD_QUITQ:
2911  c->cmd = PROTOCOL_BINARY_CMD_QUIT;
2912  break;
2913  case PROTOCOL_BINARY_CMD_FLUSHQ:
2914  c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
2915  break;
2916  case PROTOCOL_BINARY_CMD_APPENDQ:
2917  c->cmd = PROTOCOL_BINARY_CMD_APPEND;
2918  break;
2919  case PROTOCOL_BINARY_CMD_PREPENDQ:
2920  c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
2921  break;
2922  case PROTOCOL_BINARY_CMD_GETQ:
2923  c->cmd = PROTOCOL_BINARY_CMD_GET;
2924  break;
2925  case PROTOCOL_BINARY_CMD_GETKQ:
2926  c->cmd = PROTOCOL_BINARY_CMD_GETK;
2927  break;
2928  default:
2929  c->noreply = false;
2930  }
2931 
2932  switch (c->cmd) {
2933  case PROTOCOL_BINARY_CMD_VERSION:
2934  if (extlen == 0 && keylen == 0 && bodylen == 0) {
2935  write_bin_response(c, VERSION, 0, 0, strlen(VERSION));
2936  } else {
2937  protocol_error = 1;
2938  }
2939  break;
2940  case PROTOCOL_BINARY_CMD_FLUSH:
2941  if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
2942  bin_read_key(c, bin_read_flush_exptime, extlen);
2943  } else {
2944  protocol_error = 1;
2945  }
2946  break;
2947  case PROTOCOL_BINARY_CMD_NOOP:
2948  if (extlen == 0 && keylen == 0 && bodylen == 0) {
2949  write_bin_response(c, NULL, 0, 0, 0);
2950  } else {
2951  protocol_error = 1;
2952  }
2953  break;
2954  case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
2955  case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
2956  case PROTOCOL_BINARY_CMD_REPLACE:
2957  if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
2958  bin_read_key(c, bin_reading_set_header, 8);
2959  } else {
2960  protocol_error = 1;
2961  }
2962  break;
2963  case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */
2964  case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */
2965  case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
2966  case PROTOCOL_BINARY_CMD_GETK:
2967  if (extlen == 0 && bodylen == keylen && keylen > 0) {
2968  bin_read_key(c, bin_reading_get_key, 0);
2969  } else {
2970  protocol_error = 1;
2971  }
2972  break;
2973  case PROTOCOL_BINARY_CMD_DELETE:
2974  if (keylen > 0 && extlen == 0 && bodylen == keylen) {
2975  bin_read_key(c, bin_reading_del_header, extlen);
2976  } else {
2977  protocol_error = 1;
2978  }
2979  break;
2980  case PROTOCOL_BINARY_CMD_INCREMENT:
2981  case PROTOCOL_BINARY_CMD_DECREMENT:
2982  if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
2983  bin_read_key(c, bin_reading_incr_header, 20);
2984  } else {
2985  protocol_error = 1;
2986  }
2987  break;
2988  case PROTOCOL_BINARY_CMD_APPEND:
2989  case PROTOCOL_BINARY_CMD_PREPEND:
2990  if (keylen > 0 && extlen == 0) {
2991  bin_read_key(c, bin_reading_set_header, 0);
2992  } else {
2993  protocol_error = 1;
2994  }
2995  break;
2996  case PROTOCOL_BINARY_CMD_STAT:
2997  if (extlen == 0) {
2998  bin_read_key(c, bin_reading_stat, 0);
2999  } else {
3000  protocol_error = 1;
3001  }
3002  break;
3003  case PROTOCOL_BINARY_CMD_QUIT:
3004  if (keylen == 0 && extlen == 0 && bodylen == 0) {
3005  write_bin_response(c, NULL, 0, 0, 0);
3006  c->write_and_go = conn_closing;
3007  if (c->noreply) {
3008  conn_set_state(c, conn_closing);
3009  }
3010  } else {
3011  protocol_error = 1;
3012  }
3013  break;
3014  case PROTOCOL_BINARY_CMD_TAP_CONNECT:
3015  if (settings.engine.v1->get_tap_iterator == NULL) {
3016  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, bodylen);
3017  } else {
3018  bin_read_chunk(c, bin_reading_packet,
3019  c->binary_header.request.bodylen);
3020  }
3021  break;
3022  case PROTOCOL_BINARY_CMD_TAP_MUTATION:
3023  case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
3024  case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
3025  case PROTOCOL_BINARY_CMD_TAP_DELETE:
3026  case PROTOCOL_BINARY_CMD_TAP_FLUSH:
3027  case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
3028  case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
3029  if (settings.engine.v1->tap_notify == NULL) {
3030  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, bodylen);
3031  } else {
3032  bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen);
3033  }
3034  break;
3035 #ifdef SASL_ENABLED
3036  case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
3037  if (extlen == 0 && keylen == 0 && bodylen == 0) {
3038  bin_list_sasl_mechs(c);
3039  } else {
3040  protocol_error = 1;
3041  }
3042  break;
3043  case PROTOCOL_BINARY_CMD_SASL_AUTH:
3044  case PROTOCOL_BINARY_CMD_SASL_STEP:
3045  if (extlen == 0 && keylen != 0) {
3046  bin_read_key(c, bin_reading_sasl_auth, 0);
3047  } else {
3048  protocol_error = 1;
3049  }
3050  break;
3051 #endif
3052  case PROTOCOL_BINARY_CMD_VERBOSITY:
3053  if (extlen == 4 && keylen == 0 && bodylen == 4) {
3054  bin_read_chunk(c, bin_reading_packet,
3055  c->binary_header.request.bodylen);
3056  } else {
3057  protocol_error = 1;
3058  }
3059  break;
3060  default:
3061  if (settings.engine.v1->unknown_command == NULL) {
3062  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
3063  bodylen);
3064  } else {
3065  bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen);
3066  }
3067  }
3068 
3069  if (protocol_error)
3070  handle_binary_protocol_error(c);
3071 }
3072 
3073 static void process_bin_update(conn *c) {
3074  char *key;
3075  uint16_t nkey;
3076  uint32_t vlen;
3077  item *it;
3078  protocol_binary_request_set* req = binary_get_request(c);
3079 
3080  assert(c != NULL);
3081 
3082  key = binary_get_key(c);
3083  nkey = c->binary_header.request.keylen;
3084 
3085  /* fix byteorder in the request */
3086  req->message.body.flags = req->message.body.flags;
3087  rel_time_t expiration = ntohl(req->message.body.expiration);
3088 
3089  vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
3090 
3091  if (settings.verbose > 1) {
3092  char buffer[1024];
3093  const char *prefix;
3094  if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
3095  prefix = "ADD";
3096  } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
3097  prefix = "SET";
3098  } else {
3099  prefix = "REPLACE";
3100  }
3101 
3102  size_t nw;
3103  nw = key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
3104  prefix, key, nkey);
3105 
3106  if (nw != -1) {
3107  if (snprintf(buffer + nw, sizeof(buffer) - nw,
3108  " Value len is %d\n", vlen)) {
3109  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s",
3110  buffer);
3111  }
3112  }
3113  }
3114 
3115  if (settings.detail_enabled) {
3116  stats_prefix_record_set(key, nkey);
3117  }
3118 
3119  ENGINE_ERROR_CODE ret = c->aiostat;
3120  c->aiostat = ENGINE_SUCCESS;
3121  c->ewouldblock = false;
3122  item_info info = { .nvalue = 1 };
3123 
3124  if (ret == ENGINE_SUCCESS) {
3125  ret = settings.engine.v1->allocate(settings.engine.v0, c,
3126  &it, key, nkey,
3127  vlen,
3128  req->message.body.flags,
3129  expiration);
3130  if (ret == ENGINE_SUCCESS && !settings.engine.v1->get_item_info(settings.engine.v0,
3131  c, it, &info)) {
3132  settings.engine.v1->release(settings.engine.v0, c, it);
3133  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
3134  return;
3135  }
3136  }
3137 
3138  switch (ret) {
3139  case ENGINE_SUCCESS:
3140  item_set_cas(c, it, c->binary_header.request.cas);
3141 
3142  switch (c->cmd) {
3143  case PROTOCOL_BINARY_CMD_ADD:
3144  c->store_op = OPERATION_ADD;
3145  break;
3146  case PROTOCOL_BINARY_CMD_SET:
3147  c->store_op = OPERATION_SET;
3148  break;
3149  case PROTOCOL_BINARY_CMD_REPLACE:
3150  c->store_op = OPERATION_REPLACE;
3151  break;
3152  default:
3153  assert(0);
3154  }
3155 
3156  if (c->binary_header.request.cas != 0) {
3157  c->store_op = OPERATION_CAS;
3158  }
3159 
3160  c->item = it;
3161  c->ritem = info.value[0].iov_base;
3162  c->rlbytes = vlen;
3163  conn_set_state(c, conn_nread);
3164  c->substate = bin_read_set_value;
3165  break;
3166  case ENGINE_EWOULDBLOCK:
3167  c->ewouldblock = true;
3168  break;
3169  case ENGINE_DISCONNECT:
3170  c->state = conn_closing;
3171  break;
3172  default:
3173  if (ret == ENGINE_E2BIG) {
3174  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
3175  } else {
3176  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
3177  }
3178 
3179  /*
3180  * Avoid stale data persisting in cache because we failed alloc.
3181  * Unacceptable for SET (but only if cas matches).
3182  * Anywhere else too?
3183  */
3184  if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
3185  /* @todo fix this for the ASYNC interface! */
3186  settings.engine.v1->remove(settings.engine.v0, c, key, nkey,
3187  ntohll(req->message.header.request.cas),
3188  c->binary_header.request.vbucket);
3189  }
3190 
3191  /* swallow the data line */
3192  c->write_and_go = conn_swallow;
3193  }
3194 }
3195 
3196 static void process_bin_append_prepend(conn *c) {
3197  char *key;
3198  int nkey;
3199  int vlen;
3200  item *it;
3201 
3202  assert(c != NULL);
3203 
3204  key = binary_get_key(c);
3205  nkey = c->binary_header.request.keylen;
3206  vlen = c->binary_header.request.bodylen - nkey;
3207 
3208  if (settings.verbose > 1) {
3209  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
3210  "Value len is %d\n", vlen);
3211  }
3212 
3213  if (settings.detail_enabled) {
3214  stats_prefix_record_set(key, nkey);
3215  }
3216 
3217  ENGINE_ERROR_CODE ret = c->aiostat;
3218  c->aiostat = ENGINE_SUCCESS;
3219  c->ewouldblock = false;
3220  item_info info = { .nvalue = 1 };
3221 
3222  if (ret == ENGINE_SUCCESS) {
3223  ret = settings.engine.v1->allocate(settings.engine.v0, c,
3224  &it, key, nkey,
3225  vlen, 0, 0);
3226  if (ret == ENGINE_SUCCESS && !settings.engine.v1->get_item_info(settings.engine.v0,
3227  c, it, &info)) {
3228  settings.engine.v1->release(settings.engine.v0, c, it);
3229  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
3230  return;
3231  }
3232  }
3233 
3234  switch (ret) {
3235  case ENGINE_SUCCESS:
3236  item_set_cas(c, it, c->binary_header.request.cas);
3237 
3238  switch (c->cmd) {
3239  case PROTOCOL_BINARY_CMD_APPEND:
3240  c->store_op = OPERATION_APPEND;
3241  break;
3242  case PROTOCOL_BINARY_CMD_PREPEND:
3243  c->store_op = OPERATION_PREPEND;
3244  break;
3245  default:
3246  assert(0);
3247  }
3248 
3249  c->item = it;
3250  c->ritem = info.value[0].iov_base;
3251  c->rlbytes = vlen;
3252  conn_set_state(c, conn_nread);
3253  c->substate = bin_read_set_value;
3254  break;
3255  case ENGINE_EWOULDBLOCK:
3256  c->ewouldblock = true;
3257  break;
3258  case ENGINE_DISCONNECT:
3259  c->state = conn_closing;
3260  break;
3261  default:
3262  if (ret == ENGINE_E2BIG) {
3263  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
3264  } else {
3265  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
3266  }
3267  /* swallow the data line */
3268  c->write_and_go = conn_swallow;
3269  }
3270 }
3271 
3272 static void process_bin_flush(conn *c) {
3273  time_t exptime = 0;
3274  protocol_binary_request_flush* req = binary_get_request(c);
3275 
3276  if (c->binary_header.request.extlen == sizeof(req->message.body)) {
3277  exptime = ntohl(req->message.body.expiration);
3278  }
3279 
3280  if (settings.verbose > 1) {
3281  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
3282  "%d: flush %ld", c->sfd,
3283  (long)exptime);
3284  }
3285 
3286  ENGINE_ERROR_CODE ret;
3287  ret = settings.engine.v1->flush(settings.engine.v0, c, exptime);
3288 
3289  if (ret == ENGINE_SUCCESS) {
3290  write_bin_response(c, NULL, 0, 0, 0);
3291  } else if (ret == ENGINE_ENOTSUP) {
3292  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
3293  } else {
3294  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
3295  }
3296  STATS_NOKEY(c, cmd_flush);
3297 }
3298 
3299 static void process_bin_delete(conn *c) {
3300  protocol_binary_request_delete* req = binary_get_request(c);
3301 
3302  char* key = binary_get_key(c);
3303  size_t nkey = c->binary_header.request.keylen;
3304 
3305  assert(c != NULL);
3306 
3307  if (settings.verbose > 1) {
3308  char buffer[1024];
3309  if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
3310  "DELETE", key, nkey) != -1) {
3311  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
3312  buffer);
3313  }
3314  }
3315 
3316  ENGINE_ERROR_CODE ret = c->aiostat;
3317  c->aiostat = ENGINE_SUCCESS;
3318  c->ewouldblock = false;
3319 
3320  if (ret == ENGINE_SUCCESS) {
3321  if (settings.detail_enabled) {
3322  stats_prefix_record_delete(key, nkey);
3323  }
3324  ret = settings.engine.v1->remove(settings.engine.v0, c, key, nkey,
3325  ntohll(req->message.header.request.cas),
3326  c->binary_header.request.vbucket);
3327  }
3328 
3329  /* For some reason the SLAB_INCR tries to access this... */
3330  item_info info = { .nvalue = 1 };
3331  switch (ret) {
3332  case ENGINE_SUCCESS:
3333  write_bin_response(c, NULL, 0, 0, 0);
3334  SLAB_INCR(c, delete_hits, key, nkey);
3335  break;
3336  case ENGINE_KEY_EEXISTS:
3337  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
3338  break;
3339  case ENGINE_KEY_ENOENT:
3340  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
3341  STATS_INCR(c, delete_misses, key, nkey);
3342  break;
3343  case ENGINE_NOT_MY_VBUCKET:
3344  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
3345  break;
3346  case ENGINE_EWOULDBLOCK:
3347  c->ewouldblock = true;
3348  break;
3349  default:
3350  write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
3351  }
3352 }
3353 
3354 static void complete_nread_binary(conn *c) {
3355  assert(c != NULL);
3356  assert(c->cmd >= 0);
3357 
3358  switch(c->substate) {
3359  case bin_reading_set_header:
3360  if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
3361  c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
3362  process_bin_append_prepend(c);
3363  } else {
3364  process_bin_update(c);
3365  }
3366  break;
3367  case bin_read_set_value:
3368  complete_update_bin(c);
3369  break;
3370  case bin_reading_get_key:
3371  process_bin_get(c);
3372  break;
3373  case bin_reading_stat:
3374  process_bin_stat(c);
3375  break;
3376  case bin_reading_del_header:
3377  process_bin_delete(c);
3378  break;
3379  case bin_reading_incr_header:
3380  complete_incr_bin(c);
3381  break;
3382  case bin_read_flush_exptime:
3383  process_bin_flush(c);
3384  break;
3385  case bin_reading_sasl_auth:
3386  process_bin_sasl_auth(c);
3387  break;
3388  case bin_reading_sasl_auth_data:
3389  process_bin_complete_sasl_auth(c);
3390  break;
3391  case bin_reading_packet:
3392  if (c->binary_header.request.magic == PROTOCOL_BINARY_RES) {
3393  RESPONSE_HANDLER handler;
3394  handler = response_handlers[c->binary_header.request.opcode];
3395  if (handler) {
3396  handler(c);
3397  } else {
3398  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
3399  "%d: ERROR: Unsupported response packet received: %u\n",
3400  c->sfd, (unsigned int)c->binary_header.request.opcode);
3401  conn_set_state(c, conn_closing);
3402  }
3403  } else {
3404  process_bin_packet(c);
3405  }
3406  break;
3407  default:
3408  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
3409  "Not handling substate %d\n", c->substate);
3410  abort();
3411  }
3412 }
3413 
3414 static void reset_cmd_handler(conn *c) {
3415  c->sbytes = 0;
3416  c->ascii_cmd = NULL;
3417  c->cmd = -1;
3418  c->substate = bin_no_state;
3419  if(c->item != NULL) {
3420  settings.engine.v1->release(settings.engine.v0, c, c->item);
3421  c->item = NULL;
3422  }
3423  conn_shrink(c);
3424  if (c->rbytes > 0) {
3425  conn_set_state(c, conn_parse_cmd);
3426  } else {
3427  conn_set_state(c, conn_waiting);
3428  }
3429 }
3430 
3431 static ENGINE_ERROR_CODE ascii_response_handler(const void *cookie,
3432  int nbytes,
3433  const char *dta)
3434 {
3435  conn *c = (conn*)cookie;
3436  if (!grow_dynamic_buffer(c, nbytes)) {
3437  if (settings.verbose > 0) {
3438  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
3439  "<%d ERROR: Failed to allocate memory for response\n",
3440  c->sfd);
3441  }
3442  return ENGINE_ENOMEM;
3443  }
3444 
3445  char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
3446  memcpy(buf, dta, nbytes);
3447  c->dynamic_buffer.offset += nbytes;
3448 
3449  return ENGINE_SUCCESS;
3450 }
3451 
3452 static void complete_nread_ascii(conn *c) {
3453  if (c->ascii_cmd != NULL) {
3454  c->ewouldblock = false;
3455  switch (c->ascii_cmd->execute(c->ascii_cmd->cookie, c, 0, NULL,
3456  ascii_response_handler)) {
3457  case ENGINE_SUCCESS:
3458  if (c->dynamic_buffer.buffer != NULL) {
3459  write_and_free(c, c->dynamic_buffer.buffer,
3460  c->dynamic_buffer.offset);
3461  c->dynamic_buffer.buffer = NULL;
3462  } else {
3463  conn_set_state(c, conn_new_cmd);
3464  }
3465  break;
3466  case ENGINE_EWOULDBLOCK:
3467  c->ewouldblock = true;
3468  break;
3469  case ENGINE_DISCONNECT:
3470  default:
3471  conn_set_state(c, conn_closing);
3472  }
3473  } else {
3474  complete_update_ascii(c);
3475  }
3476 }
3477 
3478 static void complete_nread(conn *c) {
3479  assert(c != NULL);
3480  assert(c->protocol == ascii_prot
3481  || c->protocol == binary_prot);
3482 
3483  if (c->protocol == ascii_prot) {
3484  complete_nread_ascii(c);
3485  } else if (c->protocol == binary_prot) {
3486  complete_nread_binary(c);
3487  }
3488 }
3489 
3490 #define COMMAND_TOKEN 0
3491 #define SUBCOMMAND_TOKEN 1
3492 #define KEY_TOKEN 1
3493 
3494 #define MAX_TOKENS 30
3495 
3496 /*
3497  * Tokenize the command string by replacing whitespace with '\0' and update
3498  * the token array tokens with pointer to start of each token and length.
3499  * Returns total number of tokens. The last valid token is the terminal
3500  * token (value points to the first unprocessed character of the string and
3501  * length zero).
3502  *
3503  * Usage example:
3504  *
3505  * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
3506  * for(int ix = 0; tokens[ix].length != 0; ix++) {
3507  * ...
3508  * }
3509  * ncommand = tokens[ix].value - command;
3510  * command = tokens[ix].value;
3511  * }
3512  */
3513 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
3514  char *s, *e;
3515  size_t ntokens = 0;
3516 
3517  assert(command != NULL && tokens != NULL && max_tokens > 1);
3518 
3519  for (s = e = command; ntokens < max_tokens - 1; ++e) {
3520  if (*e == ' ') {
3521  if (s != e) {
3522  tokens[ntokens].value = s;
3523  tokens[ntokens].length = e - s;
3524  ntokens++;
3525  *e = '\0';
3526  }
3527  s = e + 1;
3528  }
3529  else if (*e == '\0') {
3530  if (s != e) {
3531  tokens[ntokens].value = s;
3532  tokens[ntokens].length = e - s;
3533  ntokens++;
3534  }
3535 
3536  break; /* string end */
3537  }
3538  }
3539 
3540  /*
3541  * If we scanned the whole string, the terminal value pointer is null,
3542  * otherwise it is the first unprocessed character.
3543  */
3544  tokens[ntokens].value = *e == '\0' ? NULL : e;
3545  tokens[ntokens].length = 0;
3546  ntokens++;
3547 
3548  return ntokens;
3549 }
3550 
3551 static void detokenize(token_t *tokens, int ntokens, char **out, int *nbytes) {
3552  int i, nb;
3553  char *buf, *p;
3554 
3555  nb = ntokens; // account for spaces, which is ntokens-1, plus the null
3556  for (i = 0; i < ntokens; ++i) {
3557  nb += tokens[i].length;
3558  }
3559 
3560  buf = malloc(nb * sizeof(char));
3561  if (buf != NULL) {
3562  p = buf;
3563  for (i = 0; i < ntokens; ++i) {
3564  memcpy(p, tokens[i].value, tokens[i].length);
3565  p += tokens[i].length;
3566  *p = ' ';
3567  p++;
3568  }
3569  buf[nb - 1] = '\0';
3570  *nbytes = nb - 1;
3571  *out = buf;
3572  }
3573 }
3574 
3575 
3576 /* set up a connection to write a buffer then free it, used for stats */
3577 static void write_and_free(conn *c, char *buf, int bytes) {
3578  if (buf) {
3579  c->write_and_free = buf;
3580  c->wcurr = buf;
3581  c->wbytes = bytes;
3582  conn_set_state(c, conn_write);
3583  c->write_and_go = conn_new_cmd;
3584  } else {
3585  out_string(c, "SERVER_ERROR out of memory writing stats");
3586  }
3587 }
3588 
3589 static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
3590 {
3591  int noreply_index = ntokens - 2;
3592 
3593  /*
3594  NOTE: this function is not the first place where we are going to
3595  send the reply. We could send it instead from process_command()
3596  if the request line has wrong number of tokens. However parsing
3597  malformed line for "noreply" option is not reliable anyway, so
3598  it can't be helped.
3599  */
3600  if (tokens[noreply_index].value
3601  && strcmp(tokens[noreply_index].value, "noreply") == 0) {
3602  c->noreply = true;
3603  }
3604  return c->noreply;
3605 }
3606 
3607 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
3608  const char *fmt, ...) {
3609  char val_str[STAT_VAL_LEN];
3610  int vlen;
3611  va_list ap;
3612 
3613  assert(name);
3614  assert(add_stats);
3615  assert(c);
3616  assert(fmt);
3617 
3618  va_start(ap, fmt);
3619  vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
3620  va_end(ap);
3621 
3622  add_stats(name, strlen(name), val_str, vlen, c);
3623 }
3624 
3625 inline static void process_stats_detail(conn *c, const char *command) {
3626  assert(c != NULL);
3627 
3628  if (settings.allow_detailed) {
3629  if (strcmp(command, "on") == 0) {
3630  settings.detail_enabled = 1;
3631  out_string(c, "OK");
3632  }
3633  else if (strcmp(command, "off") == 0) {
3634  settings.detail_enabled = 0;
3635  out_string(c, "OK");
3636  }
3637  else if (strcmp(command, "dump") == 0) {
3638  int len;
3639  char *stats = stats_prefix_dump(&len);
3640  write_and_free(c, stats, len);
3641  }
3642  else {
3643  out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
3644  }
3645  }
3646  else {
3647  out_string(c, "CLIENT_ERROR detailed stats disabled");
3648  }
3649 }
3650 
3651 static void aggregate_callback(void *in, void *out) {
3652  struct thread_stats *out_thread_stats = out;
3653  struct independent_stats *in_independent_stats = in;
3654  threadlocal_stats_aggregate(in_independent_stats->thread_stats,
3655  out_thread_stats);
3656 }
3657 
3658 /* return server specific stats only */
3659 static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate) {
3660  pid_t pid = getpid();
3661  rel_time_t now = current_time;
3662 
3663  struct thread_stats thread_stats;
3664  threadlocal_stats_clear(&thread_stats);
3665 
3666  if (aggregate && settings.engine.v1->aggregate_stats != NULL) {
3667  settings.engine.v1->aggregate_stats(settings.engine.v0,
3668  (const void *)c,
3669  aggregate_callback,
3670  &thread_stats);
3671  } else {
3672  threadlocal_stats_aggregate(get_independent_stats(c)->thread_stats,
3673  &thread_stats);
3674  }
3675 
3676  struct slab_stats slab_stats;
3677  slab_stats_aggregate(&thread_stats, &slab_stats);
3678 
3679 #ifndef __WIN32__
3680  struct rusage usage;
3681  getrusage(RUSAGE_SELF, &usage);
3682 #endif
3683 
3684  STATS_LOCK();
3685 
3686  APPEND_STAT("pid", "%lu", (long)pid);
3687  APPEND_STAT("uptime", "%u", now);
3688  APPEND_STAT("time", "%ld", now + (long)process_started);
3689  APPEND_STAT("version", "%s", VERSION);
3690  APPEND_STAT("libevent", "%s", event_get_version());
3691  APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
3692 
3693 #ifndef __WIN32__
3694  append_stat("rusage_user", add_stats, c, "%ld.%06ld",
3695  (long)usage.ru_utime.tv_sec,
3696  (long)usage.ru_utime.tv_usec);
3697  append_stat("rusage_system", add_stats, c, "%ld.%06ld",
3698  (long)usage.ru_stime.tv_sec,
3699  (long)usage.ru_stime.tv_usec);
3700 #endif
3701 
3702  APPEND_STAT("daemon_connections", "%u", stats.daemon_conns);
3703  APPEND_STAT("curr_connections", "%u", stats.curr_conns);
3704  APPEND_STAT("total_connections", "%u", stats.total_conns);
3705  APPEND_STAT("connection_structures", "%u", stats.conn_structs);
3706  APPEND_STAT("cmd_get", "%"PRIu64, thread_stats.cmd_get);
3707  APPEND_STAT("cmd_set", "%"PRIu64, slab_stats.cmd_set);
3708  APPEND_STAT("cmd_flush", "%"PRIu64, thread_stats.cmd_flush);
3709  APPEND_STAT("auth_cmds", "%"PRIu64, thread_stats.auth_cmds);
3710  APPEND_STAT("auth_errors", "%"PRIu64, thread_stats.auth_errors);
3711  APPEND_STAT("get_hits", "%"PRIu64, slab_stats.get_hits);
3712  APPEND_STAT("get_misses", "%"PRIu64, thread_stats.get_misses);
3713  APPEND_STAT("delete_misses", "%"PRIu64, thread_stats.delete_misses);
3714  APPEND_STAT("delete_hits", "%"PRIu64, slab_stats.delete_hits);
3715  APPEND_STAT("incr_misses", "%"PRIu64, thread_stats.incr_misses);
3716  APPEND_STAT("incr_hits", "%"PRIu64, thread_stats.incr_hits);
3717  APPEND_STAT("decr_misses", "%"PRIu64, thread_stats.decr_misses);
3718  APPEND_STAT("decr_hits", "%"PRIu64, thread_stats.decr_hits);
3719  APPEND_STAT("cas_misses", "%"PRIu64, thread_stats.cas_misses);
3720  APPEND_STAT("cas_hits", "%"PRIu64, slab_stats.cas_hits);
3721  APPEND_STAT("cas_badval", "%"PRIu64, slab_stats.cas_badval);
3722  APPEND_STAT("bytes_read", "%"PRIu64, thread_stats.bytes_read);
3723  APPEND_STAT("bytes_written", "%"PRIu64, thread_stats.bytes_written);
3724  APPEND_STAT("limit_maxbytes", "%"PRIu64, settings.maxbytes);
3725  APPEND_STAT("accepting_conns", "%u", is_listen_disabled() ? 0 : 1);
3726  APPEND_STAT("listen_disabled_num", "%"PRIu64, get_listen_disabled_num());
3727  APPEND_STAT("rejected_conns", "%" PRIu64, (unsigned long long)stats.rejected_conns);
3728  APPEND_STAT("threads", "%d", settings.num_threads);
3729  APPEND_STAT("conn_yields", "%" PRIu64, (unsigned long long)thread_stats.conn_yields);
3730  STATS_UNLOCK();
3731 
3732  /*
3733  * Add tap stats (only if non-zero)
3734  */
3735  struct tap_stats ts;
3736  pthread_mutex_lock(&tap_stats.mutex);
3737  ts = tap_stats;
3738  pthread_mutex_unlock(&tap_stats.mutex);
3739 
3740  if (ts.sent.connect) {
3741  APPEND_STAT("tap_connect_sent", "%"PRIu64, ts.sent.connect);
3742  }
3743  if (ts.sent.mutation) {
3744  APPEND_STAT("tap_mutation_sent", "%"PRIu64, ts.sent.mutation);
3745  }
3746  if (ts.sent.checkpoint_start) {
3747  APPEND_STAT("tap_checkpoint_start_sent", "%"PRIu64, ts.sent.checkpoint_start);
3748  }
3749  if (ts.sent.checkpoint_end) {
3750  APPEND_STAT("tap_checkpoint_end_sent", "%"PRIu64, ts.sent.checkpoint_end);
3751  }
3752  if (ts.sent.delete) {
3753  APPEND_STAT("tap_delete_sent", "%"PRIu64, ts.sent.delete);
3754  }
3755  if (ts.sent.flush) {
3756  APPEND_STAT("tap_flush_sent", "%"PRIu64, ts.sent.flush);
3757  }
3758  if (ts.sent.opaque) {
3759  APPEND_STAT("tap_opaque_sent", "%"PRIu64, ts.sent.opaque);
3760  }
3761  if (ts.sent.vbucket_set) {
3762  APPEND_STAT("tap_vbucket_set_sent", "%"PRIu64,
3763  ts.sent.vbucket_set);
3764  }
3765  if (ts.received.connect) {
3766  APPEND_STAT("tap_connect_received", "%"PRIu64, ts.received.connect);
3767  }
3768  if (ts.received.mutation) {
3769  APPEND_STAT("tap_mutation_received", "%"PRIu64, ts.received.mutation);
3770  }
3771  if (ts.received.checkpoint_start) {
3772  APPEND_STAT("tap_checkpoint_start_received", "%"PRIu64, ts.received.checkpoint_start);
3773  }
3774  if (ts.received.checkpoint_end) {
3775  APPEND_STAT("tap_checkpoint_end_received", "%"PRIu64, ts.received.checkpoint_end);
3776  }
3777  if (ts.received.delete) {
3778  APPEND_STAT("tap_delete_received", "%"PRIu64, ts.received.delete);
3779  }
3780  if (ts.received.flush) {
3781  APPEND_STAT("tap_flush_received", "%"PRIu64, ts.received.flush);
3782  }
3783  if (ts.received.opaque) {
3784  APPEND_STAT("tap_opaque_received", "%"PRIu64, ts.received.opaque);
3785  }
3786  if (ts.received.vbucket_set) {
3787  APPEND_STAT("tap_vbucket_set_received", "%"PRIu64,
3788  ts.received.vbucket_set);
3789  }
3790 }
3791 
3792 static void process_stat_settings(ADD_STAT add_stats, void *c) {
3793  assert(add_stats);
3794  APPEND_STAT("maxbytes", "%u", (unsigned int)settings.maxbytes);
3795  APPEND_STAT("maxconns", "%d", settings.maxconns);
3796  APPEND_STAT("tcpport", "%d", settings.port);
3797  APPEND_STAT("udpport", "%d", settings.udpport);
3798  APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
3799  APPEND_STAT("verbosity", "%d", settings.verbose);
3800  APPEND_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
3801  APPEND_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
3802  APPEND_STAT("domain_socket", "%s",
3803  settings.socketpath ? settings.socketpath : "NULL");
3804  APPEND_STAT("umask", "%o", settings.access);
3805  APPEND_STAT("growth_factor", "%.2f", settings.factor);
3806  APPEND_STAT("chunk_size", "%d", settings.chunk_size);
3807  APPEND_STAT("num_threads", "%d", settings.num_threads);
3808  APPEND_STAT("num_threads_per_udp", "%d", settings.num_threads_per_udp);
3809  APPEND_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
3810  APPEND_STAT("detail_enabled", "%s",
3811  settings.detail_enabled ? "yes" : "no");
3812  APPEND_STAT("allow_detailed", "%s",
3813  settings.allow_detailed ? "yes" : "no");
3814  APPEND_STAT("reqs_per_event", "%d", settings.reqs_per_event);
3815  APPEND_STAT("reqs_per_tap_event", "%d", settings.reqs_per_tap_event);
3816  APPEND_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
3817  APPEND_STAT("tcp_backlog", "%d", settings.backlog);
3818  APPEND_STAT("binding_protocol", "%s",
3819  prot_text(settings.binding_protocol));
3820 #ifdef SASL_ENABLED
3821  APPEND_STAT("auth_enabled_sasl", "%s", "yes");
3822 #else
3823  APPEND_STAT("auth_enabled_sasl", "%s", "no");
3824 #endif
3825 
3826 #ifdef ENABLE_ISASL
3827  APPEND_STAT("auth_sasl_engine", "%s", "isasl");
3828 #elif defined(ENABLE_SASL)
3829  APPEND_STAT("auth_sasl_engine", "%s", "cyrus");
3830 #else
3831  APPEND_STAT("auth_sasl_engine", "%s", "none");
3832 #endif
3833  APPEND_STAT("auth_required_sasl", "%s", settings.require_sasl ? "yes" : "no");
3834  APPEND_STAT("item_size_max", "%d", settings.item_size_max);
3835  APPEND_STAT("topkeys", "%d", settings.topkeys);
3836 
3837  for (EXTENSION_DAEMON_DESCRIPTOR *ptr = settings.extensions.daemons;
3838  ptr != NULL;
3839  ptr = ptr->next) {
3840  APPEND_STAT("extension", "%s", ptr->get_name());
3841  }
3842 
3843  APPEND_STAT("logger", "%s", settings.extensions.logger->get_name());
3844 
3845  for (EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ptr = settings.extensions.ascii;
3846  ptr != NULL;
3847  ptr = ptr->next) {
3848  APPEND_STAT("ascii_extension", "%s", ptr->get_name(ptr->cookie));
3849  }
3850 }
3851 
3852 static char *process_stat(conn *c, token_t *tokens, const size_t ntokens) {
3853  const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
3854  c->dynamic_buffer.offset = 0;
3855 
3856  if (ntokens == 2) {
3857  ENGINE_ERROR_CODE ret = c->aiostat;
3858  c->aiostat = ENGINE_SUCCESS;
3859  c->ewouldblock = false;
3860  if (ret == ENGINE_SUCCESS) {
3861  server_stats(&append_stats, c, false);
3862  ret = settings.engine.v1->get_stats(settings.engine.v0, c,
3863  NULL, 0, &append_stats);
3864  if (ret == ENGINE_EWOULDBLOCK) {
3865  c->ewouldblock = true;
3866  return c->rcurr + 5;
3867  }
3868  }
3869  } else if (strcmp(subcommand, "reset") == 0) {
3870  stats_reset(c);
3871  out_string(c, "RESET");
3872  return NULL;
3873  } else if (strcmp(subcommand, "detail") == 0) {
3874  /* NOTE: how to tackle detail with binary? */
3875  if (ntokens < 4) {
3876  process_stats_detail(c, ""); /* outputs the error message */
3877  } else {
3878  process_stats_detail(c, tokens[2].value);
3879  }
3880  /* Output already generated */
3881  return NULL;
3882  } else if (strcmp(subcommand, "settings") == 0) {
3883  process_stat_settings(&append_stats, c);
3884  } else if (strcmp(subcommand, "cachedump") == 0) {
3885  char *buf = NULL;
3886  unsigned int bytes = 0, id, limit = 0;
3887 
3888  if (ntokens < 5) {
3889  out_string(c, "CLIENT_ERROR bad command line");
3890  return NULL;
3891  }
3892 
3893  if (!safe_strtoul(tokens[2].value, &id) ||
3894  !safe_strtoul(tokens[3].value, &limit)) {
3895  out_string(c, "CLIENT_ERROR bad command line format");
3896  return NULL;
3897  }
3898 
3899  if (id >= POWER_LARGEST) {
3900  out_string(c, "CLIENT_ERROR Illegal slab id");
3901  return NULL;
3902  }
3903 
3904 #ifdef FUTURE
3905  buf = item_cachedump(id, limit, &bytes);
3906 #endif
3907  write_and_free(c, buf, bytes);
3908  return NULL;
3909  } else if (strcmp(subcommand, "aggregate") == 0) {
3910  server_stats(&append_stats, c, true);
3911  } else if (strcmp(subcommand, "topkeys") == 0) {
3912  topkeys_t *tk = get_independent_stats(c)->topkeys;
3913  if (tk != NULL) {
3914  topkeys_stats(tk, c, current_time, append_stats);
3915  } else {
3916  out_string(c, "ERROR");
3917  return NULL;
3918  }
3919  } else {
3920  /* getting here means that the subcommand is either engine specific or
3921  is invalid. query the engine and see. */
3922  ENGINE_ERROR_CODE ret = c->aiostat;
3923  c->aiostat = ENGINE_SUCCESS;
3924  c->ewouldblock = false;
3925  if (ret == ENGINE_SUCCESS) {
3926  char *buf = NULL;
3927  int nb = -1;
3928  detokenize(&tokens[1], ntokens - 2, &buf, &nb);
3929  ret = settings.engine.v1->get_stats(settings.engine.v0, c, buf,
3930  nb, append_stats);
3931  free(buf);
3932  }
3933 
3934  switch (ret) {
3935  case ENGINE_SUCCESS:
3936  append_stats(NULL, 0, NULL, 0, c);
3937  write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
3938  c->dynamic_buffer.buffer = NULL;
3939  break;
3940  case ENGINE_ENOMEM:
3941  out_string(c, "SERVER_ERROR out of memory writing stats");
3942  break;
3943  case ENGINE_DISCONNECT:
3944  c->state = conn_closing;
3945  break;
3946  case ENGINE_ENOTSUP:
3947  out_string(c, "SERVER_ERROR not supported");
3948  break;
3949  case ENGINE_EWOULDBLOCK:
3950  c->ewouldblock = true;
3951  return tokens[SUBCOMMAND_TOKEN].value;
3952  default:
3953  out_string(c, "ERROR");
3954  break;
3955  }
3956 
3957  return NULL;
3958  }
3959 
3960  /* append terminator and start the transfer */
3961  append_stats(NULL, 0, NULL, 0, c);
3962 
3963  if (c->dynamic_buffer.buffer == NULL) {
3964  out_string(c, "SERVER_ERROR out of memory writing stats");
3965  } else {
3966  write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
3967  c->dynamic_buffer.buffer = NULL;
3968  }
3969 
3970  return NULL;
3971 }
3972 
3978 static char *get_suffix_buffer(conn *c) {
3979  if (c->suffixleft == c->suffixsize) {
3980  char **new_suffix_list;
3981  size_t sz = sizeof(char*) * c->suffixsize * 2;
3982 
3983  new_suffix_list = realloc(c->suffixlist, sz);
3984  if (new_suffix_list) {
3985  c->suffixsize *= 2;
3986  c->suffixlist = new_suffix_list;
3987  } else {
3988  if (settings.verbose > 1) {
3989  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
3990  "=%d Failed to resize suffix buffer\n", c->sfd);
3991  }
3992 
3993  return NULL;
3994  }
3995  }
3996 
3997  char *suffix = cache_alloc(c->thread->suffix_cache);
3998  if (suffix != NULL) {
3999  *(c->suffixlist + c->suffixleft) = suffix;
4000  ++c->suffixleft;
4001  }
4002 
4003  return suffix;
4004 }
4005 
4006 /* ntokens is overwritten here... shrug.. */
4007 static inline char* process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
4008  char *key;
4009  size_t nkey;
4010  int i = c->ileft;
4011  item *it;
4012  token_t *key_token = &tokens[KEY_TOKEN];
4013  assert(c != NULL);
4014 
4015  do {
4016  while(key_token->length != 0) {
4017 
4018  key = key_token->value;
4019  nkey = key_token->length;
4020 
4021  if(nkey > KEY_MAX_LENGTH) {
4022  out_string(c, "CLIENT_ERROR bad command line format");
4023  return NULL;
4024  }
4025 
4026  ENGINE_ERROR_CODE ret = c->aiostat;
4027  c->aiostat = ENGINE_SUCCESS;
4028 
4029  if (ret == ENGINE_SUCCESS) {
4030  ret = settings.engine.v1->get(settings.engine.v0, c, &it, key, nkey, 0);
4031  }
4032 
4033  switch (ret) {
4034  case ENGINE_EWOULDBLOCK:
4035  c->ewouldblock = true;
4036  c->ileft = i;
4037  return key;
4038 
4039  case ENGINE_SUCCESS:
4040  break;
4041  case ENGINE_KEY_ENOENT:
4042  default:
4043  it = NULL;
4044  break;
4045  }
4046 
4047  if (settings.detail_enabled) {
4048  stats_prefix_record_get(key, nkey, NULL != it);
4049  }
4050 
4051  if (it) {
4052  item_info info = { .nvalue = 1 };
4053  if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
4054  &info)) {
4055  settings.engine.v1->release(settings.engine.v0, c, it);
4056  out_string(c, "SERVER_ERROR error getting item data");
4057  break;
4058  }
4059 
4060  if (i >= c->isize) {
4061  item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
4062  if (new_list) {
4063  c->isize *= 2;
4064  c->ilist = new_list;
4065  } else {
4066  settings.engine.v1->release(settings.engine.v0, c, it);
4067  break;
4068  }
4069  }
4070 
4071  /* Rebuild the suffix */
4072  char *suffix = get_suffix_buffer(c);
4073  if (suffix == NULL) {
4074  out_string(c, "SERVER_ERROR out of memory rebuilding suffix");
4075  settings.engine.v1->release(settings.engine.v0, c, it);
4076  return NULL;
4077  }
4078  int suffix_len = snprintf(suffix, SUFFIX_SIZE,
4079  " %u %u\r\n", htonl(info.flags),
4080  info.nbytes);
4081 
4082  /*
4083  * Construct the response. Each hit adds three elements to the
4084  * outgoing data list:
4085  * "VALUE "
4086  * key
4087  * " " + flags + " " + data length + "\r\n" + data (with \r\n)
4088  */
4089 
4090  MEMCACHED_COMMAND_GET(c->sfd, info.key, info.nkey,
4091  info.nbytes, info.cas);
4092  if (return_cas)
4093  {
4094 
4095  char *cas = get_suffix_buffer(c);
4096  if (cas == NULL) {
4097  out_string(c, "SERVER_ERROR out of memory making CAS suffix");
4098  settings.engine.v1->release(settings.engine.v0, c, it);
4099  return NULL;
4100  }
4101  int cas_len = snprintf(cas, SUFFIX_SIZE, " %"PRIu64"\r\n",
4102  info.cas);
4103  if (add_iov(c, "VALUE ", 6) != 0 ||
4104  add_iov(c, info.key, info.nkey) != 0 ||
4105  add_iov(c, suffix, suffix_len - 2) != 0 ||
4106  add_iov(c, cas, cas_len) != 0 ||
4107  add_iov(c, info.value[0].iov_base, info.value[0].iov_len) != 0 ||
4108  add_iov(c, "\r\n", 2) != 0)
4109  {
4110  settings.engine.v1->release(settings.engine.v0, c, it);
4111  break;
4112  }
4113  }
4114  else
4115  {
4116  if (add_iov(c, "VALUE ", 6) != 0 ||
4117  add_iov(c, info.key, info.nkey) != 0 ||
4118  add_iov(c, suffix, suffix_len) != 0 ||
4119  add_iov(c, info.value[0].iov_base, info.value[0].iov_len) != 0 ||
4120  add_iov(c, "\r\n", 2) != 0)
4121  {
4122  settings.engine.v1->release(settings.engine.v0, c, it);
4123  break;
4124  }
4125  }
4126 
4127 
4128  if (settings.verbose > 1) {
4129  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4130  ">%d sending key %s\n",
4131  c->sfd, info.key);
4132  }
4133 
4134  /* item_get() has incremented it->refcount for us */
4135  STATS_HIT(c, get, key, nkey);
4136  *(c->ilist + i) = it;
4137  i++;
4138 
4139  } else {
4140  STATS_MISS(c, get, key, nkey);
4141  MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
4142  }
4143 
4144  key_token++;
4145  }
4146 
4147  /*
4148  * If the command string hasn't been fully processed, get the next set
4149  * of tokens.
4150  */
4151  if(key_token->value != NULL) {
4152  ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
4153  key_token = tokens;
4154  }
4155 
4156  } while(key_token->value != NULL);
4157 
4158  c->icurr = c->ilist;
4159  c->ileft = i;
4160  c->suffixcurr = c->suffixlist;
4161 
4162  if (settings.verbose > 1) {
4163  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4164  ">%d END\n", c->sfd);
4165  }
4166 
4167  /*
4168  If the loop was terminated because of out-of-memory, it is not
4169  reliable to add END\r\n to the buffer, because it might not end
4170  in \r\n. So we send SERVER_ERROR instead.
4171  */
4172  if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
4173  || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
4174  out_string(c, "SERVER_ERROR out of memory writing get response");
4175  }
4176  else {
4177  conn_set_state(c, conn_mwrite);
4178  c->msgcurr = 0;
4179  }
4180 
4181  return NULL;
4182 }
4183 
4184 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, ENGINE_STORE_OPERATION store_op, bool handle_cas) {
4185  char *key;
4186  size_t nkey;
4187  unsigned int flags;
4188  int32_t exptime_int = 0;
4189  time_t exptime;
4190  int vlen;
4191  uint64_t req_cas_id=0;
4192  item *it;
4193 
4194  assert(c != NULL);
4195 
4196  set_noreply_maybe(c, tokens, ntokens);
4197 
4198  if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
4199  out_string(c, "CLIENT_ERROR bad command line format");
4200  return;
4201  }
4202 
4203  key = tokens[KEY_TOKEN].value;
4204  nkey = tokens[KEY_TOKEN].length;
4205 
4206  if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
4207  && safe_strtol(tokens[3].value, &exptime_int)
4208  && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
4209  out_string(c, "CLIENT_ERROR bad command line format");
4210  return;
4211  }
4212 
4213  /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
4214  exptime = exptime_int;
4215 
4216  // does cas value exist?
4217  if (handle_cas) {
4218  if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
4219  out_string(c, "CLIENT_ERROR bad command line format");
4220  return;
4221  }
4222  }
4223 
4224  if (vlen < 0) {
4225  out_string(c, "CLIENT_ERROR bad command line format");
4226  return;
4227  }
4228 
4229  if (settings.detail_enabled) {
4230  stats_prefix_record_set(key, nkey);
4231  }
4232 
4233  ENGINE_ERROR_CODE ret = c->aiostat;
4234  c->aiostat = ENGINE_SUCCESS;
4235  c->ewouldblock = false;
4236 
4237  if (ret == ENGINE_SUCCESS) {
4238  ret = settings.engine.v1->allocate(settings.engine.v0, c,
4239  &it, key, nkey,
4240  vlen, htonl(flags), exptime);
4241  }
4242 
4243  item_info info = { .nvalue = 1 };
4244  switch (ret) {
4245  case ENGINE_SUCCESS:
4246  item_set_cas(c, it, req_cas_id);
4247  if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
4248  settings.engine.v1->release(settings.engine.v0, c, it);
4249  out_string(c, "SERVER_ERROR error getting item data");
4250  break;
4251  }
4252  c->item = it;
4253  c->ritem = info.value[0].iov_base;
4254  c->rlbytes = vlen;
4255  c->store_op = store_op;
4256  conn_set_state(c, conn_nread);
4257  break;
4258  case ENGINE_EWOULDBLOCK:
4259  c->ewouldblock = true;
4260  break;
4261  case ENGINE_DISCONNECT:
4262  c->state = conn_closing;
4263  break;
4264  default:
4265  if (ret == ENGINE_E2BIG) {
4266  out_string(c, "SERVER_ERROR object too large for cache");
4267  } else {
4268  out_string(c, "SERVER_ERROR out of memory storing object");
4269  }
4270  /* swallow the data line */
4271  c->write_and_go = conn_swallow;
4272  c->sbytes = vlen + 2;
4273 
4274  /* Avoid stale data persisting in cache because we failed alloc.
4275  * Unacceptable for SET. Anywhere else too? */
4276  if (store_op == OPERATION_SET) {
4277  settings.engine.v1->remove(settings.engine.v0, c, key, nkey, 0, 0);
4278  }
4279  }
4280 }
4281 
4282 static char* process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
4283 
4284  uint64_t delta;
4285  char *key;
4286  size_t nkey;
4287 
4288  assert(c != NULL);
4289 
4290  set_noreply_maybe(c, tokens, ntokens);
4291 
4292  if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
4293  out_string(c, "CLIENT_ERROR bad command line format");
4294  return NULL;
4295  }
4296 
4297  key = tokens[KEY_TOKEN].value;
4298  nkey = tokens[KEY_TOKEN].length;
4299 
4300  if (!safe_strtoull(tokens[2].value, &delta)) {
4301  out_string(c, "CLIENT_ERROR invalid numeric delta argument");
4302  return NULL;
4303  }
4304 
4305  ENGINE_ERROR_CODE ret = c->aiostat;
4306  c->aiostat = ENGINE_SUCCESS;
4307  uint64_t cas;
4308  uint64_t result;
4309  if (ret == ENGINE_SUCCESS) {
4310  ret = settings.engine.v1->arithmetic(settings.engine.v0, c, key, nkey,
4311  incr, false, delta, 0, 0, &cas,
4312  &result, 0);
4313  }
4314 
4315  char temp[INCR_MAX_STORAGE_LEN];
4316  switch (ret) {
4317  case ENGINE_SUCCESS:
4318  if (incr) {
4319  STATS_INCR(c, incr_hits, key, nkey);
4320  } else {
4321  STATS_INCR(c, decr_hits, key, nkey);
4322  }
4323  snprintf(temp, sizeof(temp), "%"PRIu64, result);
4324  out_string(c, temp);
4325  break;
4326  case ENGINE_KEY_ENOENT:
4327  if (incr) {
4328  STATS_INCR(c, incr_misses, key, nkey);
4329  } else {
4330  STATS_INCR(c, decr_misses, key, nkey);
4331  }
4332  out_string(c, "NOT_FOUND");
4333  break;
4334  case ENGINE_ENOMEM:
4335  out_string(c, "SERVER_ERROR out of memory");
4336  break;
4337  case ENGINE_TMPFAIL:
4338  out_string(c, "SERVER_ERROR temporary failure");
4339  break;
4340  case ENGINE_EINVAL:
4341  out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
4342  break;
4343  case ENGINE_NOT_STORED:
4344  out_string(c, "SERVER_ERROR failed to store item");
4345  break;
4346  case ENGINE_DISCONNECT:
4347  c->state = conn_closing;
4348  break;
4349  case ENGINE_ENOTSUP:
4350  out_string(c, "SERVER_ERROR not supported");
4351  break;
4352  case ENGINE_EWOULDBLOCK:
4353  c->ewouldblock = true;
4354  return key;
4355  default:
4356  abort();
4357  }
4358 
4359  return NULL;
4360 }
4361 
4362 static char *process_delete_command(conn *c, token_t *tokens,
4363  const size_t ntokens) {
4364  char *key;
4365  size_t nkey;
4366 
4367  assert(c != NULL);
4368 
4369  if (ntokens > 3) {
4370  bool hold_is_zero = strcmp(tokens[KEY_TOKEN+1].value, "0") == 0;
4371  bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
4372  bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
4373  || (ntokens == 5 && hold_is_zero && sets_noreply);
4374  if (!valid) {
4375  out_string(c, "CLIENT_ERROR bad command line format. "
4376  "Usage: delete <key> [noreply]");
4377  return NULL;
4378  }
4379  }
4380 
4381  key = tokens[KEY_TOKEN].value;
4382  nkey = tokens[KEY_TOKEN].length;
4383 
4384  if (nkey > KEY_MAX_LENGTH) {
4385  out_string(c, "CLIENT_ERROR bad command line format");
4386  return NULL;
4387  }
4388 
4389  ENGINE_ERROR_CODE ret = c->aiostat;
4390  c->aiostat = ENGINE_SUCCESS;
4391  c->ewouldblock = false;
4392  if (ret == ENGINE_SUCCESS) {
4393  ret = settings.engine.v1->remove(settings.engine.v0, c,
4394  key, nkey, 0, 0);
4395  }
4396 
4397  /* For some reason the SLAB_INCR tries to access this... */
4398  item_info info = { .nvalue = 1 };
4399  switch (ret) {
4400  case ENGINE_SUCCESS:
4401  out_string(c, "DELETED");
4402  SLAB_INCR(c, delete_hits, key, nkey);
4403  break;
4404  case ENGINE_EWOULDBLOCK:
4405  c->ewouldblock = true;
4406  return key;
4407  case ENGINE_TMPFAIL:
4408  out_string(c, "SERVER_ERROR temporary failure");
4409  break;
4410  default:
4411  out_string(c, "NOT_FOUND");
4412  STATS_INCR(c, delete_misses, key, nkey);
4413  }
4414 
4415  if (ret != ENGINE_EWOULDBLOCK && settings.detail_enabled) {
4416  stats_prefix_record_delete(key, nkey);
4417  }
4418  return NULL;
4419 }
4420 
4421 static char *process_bind_command(conn *c, token_t *tokens,
4422  const size_t ntokens) {
4423  char *name;
4424  size_t name_len;
4425 
4426  assert(c != NULL);
4427 
4428  if (ntokens > 3) {
4429  out_string(c, "CLIENT_ERROR bad command line format. "
4430  "Usage: bind <table_id_name>");
4431  return NULL;
4432  }
4433 
4434  name = tokens[KEY_TOKEN].value;
4435  name_len = tokens[KEY_TOKEN].length;
4436 
4437  if (name_len > KEY_MAX_LENGTH || name_len == 0) {
4438  out_string(c, "CLIENT_ERROR bad command line format");
4439  return NULL;
4440  }
4441 
4442  ENGINE_ERROR_CODE ret = c->aiostat;
4443  c->aiostat = ENGINE_SUCCESS;
4444  c->ewouldblock = false;
4445  if (ret == ENGINE_SUCCESS) {
4446  ret = settings.engine.v1->bind(settings.engine.v0, c,
4447  name, name_len);
4448  }
4449 
4450  /* For some reason the SLAB_INCR tries to access this... */
4451  item_info info = { .nvalue = 1 };
4452  switch (ret) {
4453  case ENGINE_SUCCESS:
4454  out_string(c, "SUCCEED");
4455  break;
4456  case ENGINE_EWOULDBLOCK:
4457  c->ewouldblock = true;
4458  return name;
4459  case ENGINE_TMPFAIL:
4460  default:
4461  out_string(c, "NOT_FOUND");
4462  }
4463 
4464  return NULL;
4465 }
4466 
4467 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
4468  unsigned int level;
4469 
4470  assert(c != NULL);
4471 
4472  set_noreply_maybe(c, tokens, ntokens);
4473  if (c->noreply && ntokens == 3) {
4474  /* "verbosity noreply" is not according to the correct syntax */
4475  c->noreply = false;
4476  out_string(c, "ERROR");
4477  return;
4478  }
4479 
4480  if (safe_strtoul(tokens[1].value, &level)) {
4481  settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
4482  perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
4483  out_string(c, "OK");
4484  } else {
4485  out_string(c, "ERROR");
4486  }
4487 }
4488 
4489 static char* process_command(conn *c, char *command) {
4490 
4491  token_t tokens[MAX_TOKENS];
4492  size_t ntokens;
4493  int comm;
4494  char *ret = NULL;
4495 
4496  assert(c != NULL);
4497 
4498  MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
4499 
4500  if (settings.verbose > 1) {
4501  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4502  "<%d %s\n", c->sfd, command);
4503  }
4504 
4505  /*
4506  * for commands set/add/replace, we build an item and read the data
4507  * directly into it, then continue in nread_complete().
4508  */
4509 
4510  if (c->ewouldblock) {
4511  /*
4512  * If we are retrying after the engine has completed a pending io for
4513  * this command, skip add_msghdr() etc and clear the ewouldblock flag.
4514  */
4515  c->ewouldblock = false;
4516  } else {
4517  c->msgcurr = 0;
4518  c->msgused = 0;
4519  c->iovused = 0;
4520  if (add_msghdr(c) != 0) {
4521  out_string(c, "SERVER_ERROR out of memory preparing response");
4522  return NULL;
4523  }
4524  }
4525 
4526  ntokens = tokenize_command(command, tokens, MAX_TOKENS);
4527  if (ntokens >= 3 &&
4528  ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
4529  (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
4530 
4531  ret = process_get_command(c, tokens, ntokens, false);
4532 
4533  } else if ((ntokens == 6 || ntokens == 7) &&
4534  ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = (int)OPERATION_ADD)) ||
4535  (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = (int)OPERATION_SET)) ||
4536  (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = (int)OPERATION_REPLACE)) ||
4537  (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = (int)OPERATION_PREPEND)) ||
4538  (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = (int)OPERATION_APPEND)) )) {
4539 
4540  process_update_command(c, tokens, ntokens, (ENGINE_STORE_OPERATION)comm, false);
4541 
4542  } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = (int)OPERATION_CAS))) {
4543 
4544  process_update_command(c, tokens, ntokens, (ENGINE_STORE_OPERATION)comm, true);
4545 
4546  } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
4547 
4548  ret = process_arithmetic_command(c, tokens, ntokens, 1);
4549 
4550  } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
4551 
4552  ret = process_get_command(c, tokens, ntokens, true);
4553 
4554  } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
4555 
4556  ret = process_arithmetic_command(c, tokens, ntokens, 0);
4557 
4558  } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
4559 
4560  ret = process_delete_command(c, tokens, ntokens);
4561 
4562  } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bind") == 0)) {
4563 
4564  ret = process_bind_command(c, tokens, ntokens);
4565 
4566  } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
4567 
4568  ret = process_stat(c, tokens, ntokens);
4569 
4570  } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
4571  time_t exptime;
4572 
4573  set_noreply_maybe(c, tokens, ntokens);
4574 
4575  if (ntokens == (c->noreply ? 3 : 2)) {
4576  exptime = 0;
4577  } else {
4578  exptime = strtol(tokens[1].value, NULL, 10);
4579  if(errno == ERANGE) {
4580  out_string(c, "CLIENT_ERROR bad command line format");
4581  return NULL;
4582  }
4583  }
4584 
4585  ENGINE_ERROR_CODE ret = c->aiostat;
4586  c->aiostat = ENGINE_SUCCESS;
4587  c->ewouldblock = false;
4588  if (ret == ENGINE_SUCCESS) {
4589  ret = settings.engine.v1->flush(settings.engine.v0, c, exptime);
4590  }
4591 
4592  switch (ret) {
4593  case ENGINE_SUCCESS:
4594  out_string(c, "OK");
4595  break;
4596  case ENGINE_ENOTSUP:
4597  out_string(c, "SERVER_ERROR not supported");
4598  break;
4599  case ENGINE_EWOULDBLOCK:
4600  c->ewouldblock = true;
4601  return c->rcurr + 9;
4602  default:
4603  out_string(c, "SERVER_ERROR failed to flush cache");
4604  }
4605 
4606  if (ret != ENGINE_EWOULDBLOCK) {
4607  STATS_NOKEY(c, cmd_flush);
4608  }
4609  return NULL;
4610 
4611  } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
4612 
4613  out_string(c, "VERSION " VERSION);
4614 
4615  } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
4616 
4617  conn_set_state(c, conn_closing);
4618 
4619  } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
4620  process_verbosity_command(c, tokens, ntokens);
4621  } else if (settings.extensions.ascii != NULL) {
4623  size_t nbytes = 0;
4624  char *ptr;
4625 
4626  if (ntokens > 0) {
4627  if (ntokens == MAX_TOKENS) {
4628  out_string(c, "ERROR too many arguments");
4629  return NULL;
4630  }
4631 
4632  if (tokens[ntokens - 1].length == 0) {
4633  --ntokens;
4634  }
4635  }
4636 
4637  for (cmd = settings.extensions.ascii; cmd != NULL; cmd = cmd->next) {
4638  if (cmd->accept(cmd->cookie, c, ntokens, tokens, &nbytes, &ptr)) {
4639  break;
4640  }
4641  }
4642 
4643  if (cmd == NULL) {
4644  out_string(c, "ERROR unknown command");
4645  } else if (nbytes == 0) {
4646  switch (cmd->execute(cmd->cookie, c, ntokens, tokens,
4647  ascii_response_handler)) {
4648  case ENGINE_SUCCESS:
4649  if (c->dynamic_buffer.buffer != NULL) {
4650  write_and_free(c, c->dynamic_buffer.buffer,
4651  c->dynamic_buffer.offset);
4652  c->dynamic_buffer.buffer = NULL;
4653  } else {
4654  conn_set_state(c, conn_new_cmd);
4655  }
4656  break;
4657  case ENGINE_EWOULDBLOCK:
4658  c->ewouldblock = true;
4659  ret = tokens[KEY_TOKEN].value;;
4660  break;
4661  case ENGINE_DISCONNECT:
4662  default:
4663  conn_set_state(c, conn_closing);
4664 
4665  }
4666  } else {
4667  c->rlbytes = nbytes;
4668  c->ritem = ptr;
4669  c->ascii_cmd = cmd;
4670  /* NOT SUPPORTED YET! */
4671  conn_set_state(c, conn_nread);
4672  }
4673  } else {
4674  out_string(c, "ERROR");
4675  }
4676  return ret;
4677 }
4678 
4679 /*
4680  * if we have a complete line in the buffer, process it.
4681  */
4682 static int try_read_command(conn *c) {
4683  assert(c != NULL);
4684  assert(c->rcurr <= (c->rbuf + c->rsize));
4685  assert(c->rbytes > 0);
4686 
4687  if (c->protocol == negotiating_prot || c->transport == udp_transport) {
4688  if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
4689  c->protocol = binary_prot;
4690  } else {
4691  c->protocol = ascii_prot;
4692  }
4693 
4694  if (settings.verbose > 1) {
4695  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4696  "%d: Client using the %s protocol\n", c->sfd,
4697  prot_text(c->protocol));
4698  }
4699  }
4700 
4701  if (c->protocol == binary_prot) {
4702  /* Do we have the complete packet header? */
4703  if (c->rbytes < sizeof(c->binary_header)) {
4704  /* need more data! */
4705  return 0;
4706  } else {
4707 #ifdef NEED_ALIGN
4708  if (((long)(c->rcurr)) % 8 != 0) {
4709  /* must realign input buffer */
4710  memmove(c->rbuf, c->rcurr, c->rbytes);
4711  c->rcurr = c->rbuf;
4712  if (settings.verbose > 1) {
4713  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4714  "%d: Realign input buffer\n", c->sfd);
4715  }
4716  }
4717 #endif
4720 
4721  if (settings.verbose > 1) {
4722  /* Dump the packet before we convert it to host order */
4723  char buffer[1024];
4724  ssize_t nw;
4725  nw = bytes_to_output_string(buffer, sizeof(buffer), c->sfd,
4726  true, "Read binary protocol data:",
4727  (const char*)req->bytes,
4728  sizeof(req->bytes));
4729  if (nw != -1) {
4730  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4731  "%s", buffer);
4732  }
4733  }
4734 
4735  c->binary_header = *req;
4736  c->binary_header.request.keylen = ntohs(req->request.keylen);
4737  c->binary_header.request.bodylen = ntohl(req->request.bodylen);
4738  c->binary_header.request.vbucket = ntohs(req->request.vbucket);
4739  c->binary_header.request.cas = ntohll(req->request.cas);
4740 
4741 
4742  if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ &&
4743  !(c->binary_header.request.magic == PROTOCOL_BINARY_RES &&
4744  response_handlers[c->binary_header.request.opcode])) {
4745  if (settings.verbose) {
4746  if (c->binary_header.request.magic != PROTOCOL_BINARY_RES) {
4747  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
4748  "%d: Invalid magic: %x\n", c->sfd,
4749  c->binary_header.request.magic);
4750  } else {
4751  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
4752  "%d: ERROR: Unsupported response packet received: %u\n",
4753  c->sfd, (unsigned int)c->binary_header.request.opcode);
4754 
4755  }
4756  }
4757  conn_set_state(c, conn_closing);
4758  return -1;
4759  }
4760 
4761  c->msgcurr = 0;
4762  c->msgused = 0;
4763  c->iovused = 0;
4764  if (add_msghdr(c) != 0) {
4765  out_string(c, "SERVER_ERROR out of memory");
4766  return 0;
4767  }
4768 
4769  c->cmd = c->binary_header.request.opcode;
4770  c->keylen = c->binary_header.request.keylen;
4771  c->opaque = c->binary_header.request.opaque;
4772  /* clear the returned cas value */
4773  c->cas = 0;
4774 
4775  dispatch_bin_command(c);
4776 
4777  c->rbytes -= sizeof(c->binary_header);
4778  c->rcurr += sizeof(c->binary_header);
4779  }
4780  } else {
4781  char *el, *cont, *left, lb;
4782 
4783  if (c->rbytes == 0) {
4784  return 0;
4785  }
4786 
4787  el = memchr(c->rcurr, '\n', c->rbytes);
4788  if (!el) {
4789  if (c->rbytes > 1024) {
4790  /*
4791  * We didn't have a '\n' in the first k. This _has_ to be a
4792  * large multiget, if not we should just nuke the connection.
4793  */
4794  char *ptr = c->rcurr;
4795  while (*ptr == ' ') { /* ignore leading whitespaces */
4796  ++ptr;
4797  }
4798 
4799  if (ptr - c->rcurr > 100 ||
4800  (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
4801 
4802  conn_set_state(c, conn_closing);
4803  return 1;
4804  }
4805  }
4806 
4807  return 0;
4808  }
4809  cont = el + 1;
4810  if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
4811  el--;
4812  }
4813  lb = *el;
4814  *el = '\0';
4815 
4816  assert(cont <= (c->rcurr + c->rbytes));
4817 
4818  LIBEVENT_THREAD *thread = c->thread;
4819  LOCK_THREAD(thread);
4820  left = process_command(c, c->rcurr);
4821  if (c->ewouldblock) {
4822  unregister_event(c);
4823  }
4824  UNLOCK_THREAD(thread);
4825 
4826  if (left != NULL) {
4827  /*
4828  * We have not processed the entire command. This happens
4829  * when the engine returns ENGINE_EWOULDBLOCK for one of the
4830  * keys in a get/gets request.
4831  */
4832  assert (left <= el);
4833 
4834  int count = strlen(c->rcurr);
4835  if ((c->rcurr + count) == left) {
4836  // Retry the entire command
4837  cont = c->rcurr;
4838  } else {
4839  left -= (count + 1);
4840  cont = left;
4841  assert(cont >= c->rcurr);
4842  if (cont > c->rcurr) {
4843  memmove(cont, c->rcurr, count);
4844  }
4845  }
4846 
4847  /* de-tokenize the command */
4848  while ((left = memchr(left, '\0', el - left)) != NULL) {
4849  *left = ' ';
4850  }
4851  *el = lb;
4852  }
4853 
4854  c->rbytes -= (cont - c->rcurr);
4855  c->rcurr = cont;
4856 
4857  assert(c->rcurr <= (c->rbuf + c->rsize));
4858  }
4859 
4860  return 1;
4861 }
4862 
4863 /*
4864  * read a UDP request.
4865  */
4866 static enum try_read_result try_read_udp(conn *c) {
4867  int res;
4868 
4869  assert(c != NULL);
4870 
4871  c->request_addr_size = sizeof(c->request_addr);
4872  res = recvfrom(c->sfd, c->rbuf, c->rsize,
4873  0, (struct sockaddr *)&c->request_addr, &c->request_addr_size);
4874  if (res > 8) {
4875  unsigned char *buf = (unsigned char *)c->rbuf;
4876  STATS_ADD(c, bytes_read, res);
4877 
4878  /* Beginning of UDP packet is the request ID; save it. */
4879  c->request_id = buf[0] * 256 + buf[1];
4880 
4881  /* If this is a multi-packet request, drop it. */
4882  if (buf[4] != 0 || buf[5] != 1) {
4883  out_string(c, "SERVER_ERROR multi-packet request not supported");
4884  return READ_NO_DATA_RECEIVED;
4885  }
4886 
4887  /* Don't care about any of the rest of the header. */
4888  res -= 8;
4889  memmove(c->rbuf, c->rbuf + 8, res);
4890 
4891  c->rbytes += res;
4892  c->rcurr = c->rbuf;
4893  return READ_DATA_RECEIVED;
4894  }
4895  return READ_NO_DATA_RECEIVED;
4896 }
4897 
4898 /*
4899  * read from network as much as we can, handle buffer overflow and connection
4900  * close.
4901  * before reading, move the remaining incomplete fragment of a command
4902  * (if any) to the beginning of the buffer.
4903  *
4904  * To protect us from someone flooding a connection with bogus data causing
4905  * the connection to eat up all available memory, break out and start looking
4906  * at the data I've got after a number of reallocs...
4907  *
4908  * @return enum try_read_result
4909  */
4910 static enum try_read_result try_read_network(conn *c) {
4911  enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
4912  int res;
4913  int num_allocs = 0;
4914  assert(c != NULL);
4915 
4916  if (c->rcurr != c->rbuf) {
4917  if (c->rbytes != 0) /* otherwise there's nothing to copy */
4918  memmove(c->rbuf, c->rcurr, c->rbytes);
4919  c->rcurr = c->rbuf;
4920  }
4921 
4922  while (1) {
4923  if (c->rbytes >= c->rsize) {
4924  if (num_allocs == 4) {
4925  return gotdata;
4926  }
4927  ++num_allocs;
4928  char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
4929  if (!new_rbuf) {
4930  if (settings.verbose > 0) {
4931  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
4932  "Couldn't realloc input buffer\n");
4933  }
4934  c->rbytes = 0; /* ignore what we read */
4935  out_string(c, "SERVER_ERROR out of memory reading request");
4936  c->write_and_go = conn_closing;
4937  return READ_MEMORY_ERROR;
4938  }
4939  c->rcurr = c->rbuf = new_rbuf;
4940  c->rsize *= 2;
4941  }
4942 
4943  int avail = c->rsize - c->rbytes;
4944  res = recv(c->sfd, c->rbuf + c->rbytes, avail, 0);
4945  if (res > 0) {
4946  STATS_ADD(c, bytes_read, res);
4947  gotdata = READ_DATA_RECEIVED;
4948  c->rbytes += res;
4949  if (res == avail) {
4950  continue;
4951  } else {
4952  break;
4953  }
4954  }
4955  if (res == 0) {
4956  return READ_ERROR;
4957  }
4958  if (res == -1) {
4959  if (errno == EAGAIN || errno == EWOULDBLOCK) {
4960  break;
4961  }
4962  return READ_ERROR;
4963  }
4964  }
4965  return gotdata;
4966 }
4967 
4968 bool register_event(conn *c, struct timeval *timeout) {
4969 #ifdef DEBUG
4970  assert(!c->registered_in_libevent);
4971 #endif
4972 
4973  if (event_add(&c->event, timeout) == -1) {
4974  settings.extensions.logger->log(EXTENSION_LOG_WARNING,
4975  NULL,
4976  "Failed to add connection to libevent: %s",
4977  strerror(errno));
4978  return false;
4979  }
4980 
4981 #ifdef DEBUG
4982  c->registered_in_libevent = true;
4983 #endif
4984 
4985  return true;
4986 }
4987 
4988 bool unregister_event(conn *c) {
4989 #ifdef DEBUG
4990  assert(c->registered_in_libevent);
4991 #endif
4992 
4993  if (event_del(&c->event) == -1) {
4994  return false;
4995  }
4996 
4997 #ifdef DEBUG
4998  c->registered_in_libevent = false;
4999 #endif
5000 
5001  return true;
5002 }
5003 
5004 
5005 bool update_event(conn *c, const int new_flags) {
5006  assert(c != NULL);
5007 
5008  struct event_base *base = c->event.ev_base;
5009  if (c->ev_flags == new_flags)
5010  return true;
5011 
5012  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
5013  "Updated event for %d to read=%s, write=%s\n",
5014  c->sfd, (new_flags & EV_READ ? "yes" : "no"),
5015  (new_flags & EV_WRITE ? "yes" : "no"));
5016 
5017  if (!unregister_event(c)) {
5018  return false;
5019  }
5020 
5021  event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
5022  event_base_set(base, &c->event);
5023  c->ev_flags = new_flags;
5024 
5025  return register_event(c, NULL);
5026 }
5027 
5028 /*
5029  * Transmit the next chunk of data from our list of msgbuf structures.
5030  *
5031  * Returns:
5032  * TRANSMIT_COMPLETE All done writing.
5033  * TRANSMIT_INCOMPLETE More data remaining to write.
5034  * TRANSMIT_SOFT_ERROR Can't write any more right now.
5035  * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
5036  */
5037 static enum transmit_result transmit(conn *c) {
5038  assert(c != NULL);
5039 
5040  if (c->msgcurr < c->msgused &&
5041  c->msglist[c->msgcurr].msg_iovlen == 0) {
5042  /* Finished writing the current msg; advance to the next. */
5043  c->msgcurr++;
5044  }
5045  if (c->msgcurr < c->msgused) {
5046  ssize_t res;
5047  struct msghdr *m = &c->msglist[c->msgcurr];
5048 
5049  res = sendmsg(c->sfd, m, 0);
5050  if (res > 0) {
5051  STATS_ADD(c, bytes_written, res);
5052 
5053  /* We've written some of the data. Remove the completed
5054  iovec entries from the list of pending writes. */
5055  while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
5056  res -= m->msg_iov->iov_len;
5057  m->msg_iovlen--;
5058  m->msg_iov++;
5059  }
5060 
5061  /* Might have written just part of the last iovec entry;
5062  adjust it so the next write will do the rest. */
5063  if (res > 0) {
5064  m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
5065  m->msg_iov->iov_len -= res;
5066  }
5067  return TRANSMIT_INCOMPLETE;
5068  }
5069  if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5070  if (!update_event(c, EV_WRITE | EV_PERSIST)) {
5071  if (settings.verbose > 0) {
5072  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
5073  "Couldn't update event\n");
5074  }
5075  conn_set_state(c, conn_closing);
5076  return TRANSMIT_HARD_ERROR;
5077  }
5078  return TRANSMIT_SOFT_ERROR;
5079  }
5080  /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
5081  we have a real error, on which we close the connection */
5082  if (settings.verbose > 0) {
5083  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
5084  "Failed to write, and not due to blocking: %s",
5085  strerror(errno));
5086  }
5087 
5088  if (IS_UDP(c->transport))
5089  conn_set_state(c, conn_read);
5090  else
5091  conn_set_state(c, conn_closing);
5092  return TRANSMIT_HARD_ERROR;
5093  } else {
5094  return TRANSMIT_COMPLETE;
5095  }
5096 }
5097 
5098 bool conn_listening(conn *c)
5099 {
5100  int sfd;
5101  struct sockaddr_storage addr;
5102  socklen_t addrlen = sizeof(addr);
5103 
5104  if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
5105  if (errno == EMFILE) {
5106  if (settings.verbose > 0) {
5107  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5108  "Too many open connections\n");
5109  }
5110  disable_listen();
5111  } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
5112  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
5113  "Failed to accept new client: %s\n",
5114  strerror(errno));
5115  }
5116 
5117  return false;
5118  }
5119 
5120  STATS_LOCK();
5121  int curr_conns = ++stats.curr_conns;
5122  STATS_UNLOCK();
5123 
5124  if (curr_conns >= settings.maxconns) {
5125  STATS_LOCK();
5126  ++stats.rejected_conns;
5127  STATS_UNLOCK();
5128 
5129  if (settings.verbose > 0) {
5130  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5131  "Too many open connections\n");
5132  }
5133 
5134  safe_close(sfd);
5135  return false;
5136  }
5137 
5138  if (evutil_make_socket_nonblocking(sfd) == -1) {
5139  safe_close(sfd);
5140  return false;
5141  }
5142 
5143  dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
5144  DATA_BUFFER_SIZE, tcp_transport);
5145 
5146  return false;
5147 }
5148 
5160 bool conn_ship_log(conn *c) {
5161  bool cont = false;
5162 
5163  if (c->sfd == INVALID_SOCKET) {
5164  return false;
5165  }
5166 
5167  short mask = EV_READ | EV_PERSIST | EV_WRITE;
5168 
5169  if (c->which & EV_READ || c->rbytes > 0) {
5170  if (c->rbytes > 0) {
5171  if (try_read_command(c) == 0) {
5172  conn_set_state(c, conn_read);
5173  }
5174  } else {
5175  conn_set_state(c, conn_read);
5176  }
5177 
5178  // we're going to process something.. let's proceed
5179  cont = true;
5180 
5181  // We have a finite number of messages in the input queue
5182  // so let's process all of them instead of backing off after
5183  // reading a subset of them.
5184  // Why? Because we've got every time we're calling ship_tap_log
5185  // we try to send a chunk of items.. This means that if we end
5186  // up in a situation where we're receiving a burst of nack messages
5187  // we'll only process a subset of messages in our input queue,
5188  // and it will slowly grow..
5189  c->nevents = settings.reqs_per_tap_event;
5190  } else if (c->which & EV_WRITE) {
5191  --c->nevents;
5192  if (c->nevents >= 0) {
5193  LOCK_THREAD(c->thread);
5194  c->ewouldblock = false;
5195  ship_tap_log(c);
5196  if (c->ewouldblock) {
5197  mask = EV_READ | EV_PERSIST;
5198  } else {
5199  cont = true;
5200  }
5201  UNLOCK_THREAD(c->thread);
5202  }
5203  }
5204 
5205  if (!update_event(c, mask)) {
5206  if (settings.verbose > 0) {
5207  settings.extensions.logger->log(EXTENSION_LOG_INFO,
5208  c, "Couldn't update event\n");
5209  }
5210  conn_set_state(c, conn_closing);
5211  }
5212 
5213  return cont;
5214 }
5215 
5216 bool conn_waiting(conn *c) {
5217  if (!update_event(c, EV_READ | EV_PERSIST)) {
5218  if (settings.verbose > 0) {
5219  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5220  "Couldn't update event\n");
5221  }
5222  conn_set_state(c, conn_closing);
5223  return true;
5224  }
5225  conn_set_state(c, conn_read);
5226  return false;
5227 }
5228 
5229 bool conn_read(conn *c) {
5230  int res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
5231  switch (res) {
5232  case READ_NO_DATA_RECEIVED:
5233  conn_set_state(c, conn_waiting);
5234  break;
5235  case READ_DATA_RECEIVED:
5236  conn_set_state(c, conn_parse_cmd);
5237  break;
5238  case READ_ERROR:
5239  conn_set_state(c, conn_closing);
5240  break;
5241  case READ_MEMORY_ERROR: /* Failed to allocate more memory */
5242  /* State already set by try_read_network */
5243  break;
5244  }
5245 
5246  return true;
5247 }
5248 
5249 bool conn_parse_cmd(conn *c) {
5250  if (try_read_command(c) == 0) {
5251  /* wee need more data! */
5252  conn_set_state(c, conn_waiting);
5253  }
5254 
5255  return !c->ewouldblock;
5256 }
5257 
5258 bool conn_new_cmd(conn *c) {
5259  /* Only process nreqs at a time to avoid starving other connections */
5260  --c->nevents;
5261  if (c->nevents >= 0) {
5262  reset_cmd_handler(c);
5263  } else {
5264  STATS_NOKEY(c, conn_yields);
5265  if (c->rbytes > 0) {
5266  /* We have already read in data into the input buffer,
5267  so libevent will most likely not signal read events
5268  on the socket (unless more data is available. As a
5269  hack we should just put in a request to write data,
5270  because that should be possible ;-)
5271  */
5272  if (!update_event(c, EV_WRITE | EV_PERSIST)) {
5273  if (settings.verbose > 0) {
5274  settings.extensions.logger->log(EXTENSION_LOG_INFO,
5275  c, "Couldn't update event\n");
5276  }
5277  conn_set_state(c, conn_closing);
5278  return true;
5279  }
5280  }
5281  return false;
5282  }
5283 
5284  return true;
5285 }
5286 
5287 
5288 bool conn_swallow(conn *c) {
5289  ssize_t res;
5290  /* we are reading sbytes and throwing them away */
5291  if (c->sbytes == 0) {
5292  conn_set_state(c, conn_new_cmd);
5293  return true;
5294  }
5295 
5296  /* first check if we have leftovers in the conn_read buffer */
5297  if (c->rbytes > 0) {
5298  uint32_t tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
5299  c->sbytes -= tocopy;
5300  c->rcurr += tocopy;
5301  c->rbytes -= tocopy;
5302  return true;
5303  }
5304 
5305  /* now try reading from the socket */
5306  res = recv(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize, 0);
5307  if (res > 0) {
5308  STATS_ADD(c, bytes_read, res);
5309  c->sbytes -= res;
5310  return true;
5311  }
5312  if (res == 0) { /* end of stream */
5313  conn_set_state(c, conn_closing);
5314  return true;
5315  }
5316  if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5317  if (!update_event(c, EV_READ | EV_PERSIST)) {
5318  if (settings.verbose > 0) {
5319  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5320  "Couldn't update event\n");
5321  }
5322  conn_set_state(c, conn_closing);
5323  return true;
5324  }
5325  return false;
5326  }
5327 
5328  if (errno != ENOTCONN && errno != ECONNRESET) {
5329  /* otherwise we have a real error, on which we close the connection */
5330  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5331  "Failed to read, and not due to blocking (%s)\n",
5332  strerror(errno));
5333  }
5334 
5335  conn_set_state(c, conn_closing);
5336 
5337  return true;
5338 
5339 }
5340 
5341 bool conn_nread(conn *c) {
5342  ssize_t res;
5343 
5344  if (c->rlbytes == 0) {
5345  LIBEVENT_THREAD *t = c->thread;
5346  LOCK_THREAD(t);
5347  bool block = c->ewouldblock = false;
5348  complete_nread(c);
5349  UNLOCK_THREAD(t);
5350  /* Breaking this into two, as complete_nread may have
5351  moved us to a different thread */
5352  t = c->thread;
5353  LOCK_THREAD(t);
5354  if (c->ewouldblock) {
5355  unregister_event(c);
5356  block = true;
5357  }
5358  UNLOCK_THREAD(t);
5359  return !block;
5360  }
5361  /* first check if we have leftovers in the conn_read buffer */
5362  if (c->rbytes > 0) {
5363  uint32_t tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
5364  if (c->ritem != c->rcurr) {
5365  memmove(c->ritem, c->rcurr, tocopy);
5366  }
5367  c->ritem += tocopy;
5368  c->rlbytes -= tocopy;
5369  c->rcurr += tocopy;
5370  c->rbytes -= tocopy;
5371  if (c->rlbytes == 0) {
5372  return true;
5373  }
5374  }
5375 
5376  /* now try reading from the socket */
5377  res = recv(c->sfd, c->ritem, c->rlbytes, 0);
5378  if (res > 0) {
5379  STATS_ADD(c, bytes_read, res);
5380  if (c->rcurr == c->ritem) {
5381  c->rcurr += res;
5382  }
5383  c->ritem += res;
5384  c->rlbytes -= res;
5385  return true;
5386  }
5387  if (res == 0) { /* end of stream */
5388  conn_set_state(c, conn_closing);
5389  return true;
5390  }
5391 
5392 #ifdef INNODB_MEMCACHED
5393  /* MEMCACHED_RESOLVE: on solaris platform, when connect through
5394  telnet and waiting for input from an "add" or "set" command,
5395  it could have res == -1 and errno == 0. Thus causing early termination
5396  Add "!errno" condition here to deal with this scenario for now */
5397  if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || !errno)) {
5398 #else
5399  if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5400 #endif /* INNODB_MEMCACHED */
5401  if (!update_event(c, EV_READ | EV_PERSIST)) {
5402  if (settings.verbose > 0) {
5403  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5404  "Couldn't update event\n");
5405  }
5406  conn_set_state(c, conn_closing);
5407  return true;
5408  }
5409  return false;
5410  }
5411 
5412  if (errno != ENOTCONN && errno != ECONNRESET) {
5413  /* otherwise we have a real error, on which we close the connection */
5414  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
5415  "Failed to read, and not due to blocking:\n"
5416  "errno: %d %s \n"
5417  "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
5418  errno, strerror(errno),
5419  (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
5420  (int)c->rlbytes, (int)c->rsize);
5421  }
5422  conn_set_state(c, conn_closing);
5423  return true;
5424 }
5425 
5426 bool conn_write(conn *c) {
5427  /*
5428  * We want to write out a simple response. If we haven't already,
5429  * assemble it into a msgbuf list (this will be a single-entry
5430  * list for TCP or a two-entry list for UDP).
5431  */
5432  if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
5433  if (add_iov(c, c->wcurr, c->wbytes) != 0) {
5434  if (settings.verbose > 0) {
5435  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5436  "Couldn't build response\n");
5437  }
5438  conn_set_state(c, conn_closing);
5439  return true;
5440  }
5441  }
5442 
5443  return conn_mwrite(c);
5444 }
5445 
5446 bool conn_mwrite(conn *c) {
5447  if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
5448  if (settings.verbose > 0) {
5449  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5450  "Failed to build UDP headers\n");
5451  }
5452  conn_set_state(c, conn_closing);
5453  return true;
5454  }
5455 
5456  switch (transmit(c)) {
5457  case TRANSMIT_COMPLETE:
5458  if (c->state == conn_mwrite) {
5459  while (c->ileft > 0) {
5460  item *it = *(c->icurr);
5461  settings.engine.v1->release(settings.engine.v0, c, it);
5462  c->icurr++;
5463  c->ileft--;
5464  }
5465  while (c->suffixleft > 0) {
5466  char *suffix = *(c->suffixcurr);
5467  cache_free(c->thread->suffix_cache, suffix);
5468  c->suffixcurr++;
5469  c->suffixleft--;
5470  }
5471  /* XXX: I don't know why this wasn't the general case */
5472  if(c->protocol == binary_prot) {
5473  conn_set_state(c, c->write_and_go);
5474  } else {
5475  conn_set_state(c, conn_new_cmd);
5476  }
5477  } else if (c->state == conn_write) {
5478  if (c->write_and_free) {
5479  free(c->write_and_free);
5480  c->write_and_free = 0;
5481  }
5482  conn_set_state(c, c->write_and_go);
5483  } else {
5484  if (settings.verbose > 0) {
5485  settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5486  "Unexpected state %d\n", c->state);
5487  }
5488  conn_set_state(c, conn_closing);
5489  }
5490  break;
5491 
5492  case TRANSMIT_INCOMPLETE:
5493  case TRANSMIT_HARD_ERROR:
5494  break; /* Continue in state machine. */
5495 
5496  case TRANSMIT_SOFT_ERROR:
5497  return false;
5498  }
5499 
5500  return true;
5501 }
5502 
5503 bool conn_pending_close(conn *c) {
5504  assert(c->sfd == INVALID_SOCKET);
5505  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
5506  "Awaiting clients to release the cookie (pending close for %p)",
5507  (void*)c);
5508  LOCK_THREAD(c->thread);
5509  c->thread->pending_io = list_remove(c->thread->pending_io, c);
5510  if (!list_contains(c->thread->pending_close, c)) {
5511  enlist_conn(c, &c->thread->pending_close);
5512  }
5513  UNLOCK_THREAD(c->thread);
5514 
5515  /*
5516  * tell the tap connection that we're disconnecting it now,
5517  * but give it a grace period
5518  */
5519  perform_callbacks(ON_DISCONNECT, NULL, c);
5520 
5521  /*
5522  * disconnect callback may have changed the state for the object
5523  * so we might complete the disconnect now
5524  */
5525  return c->state != conn_pending_close;
5526 }
5527 
5528 bool conn_immediate_close(conn *c) {
5529  settings.extensions.logger->log(EXTENSION_LOG_DETAIL, c,
5530  "Immediate close of %p",
5531  (void*)c);
5532  perform_callbacks(ON_DISCONNECT, NULL, c);
5533  conn_close(c);
5534 
5535  return false;
5536 }
5537 
5538 bool conn_closing(conn *c) {
5539  if (IS_UDP(c->transport)) {
5540  conn_cleanup(c);
5541  return false;
5542  }
5543 
5544  // We don't want any network notifications anymore..
5545  unregister_event(c);
5546  safe_close(c->sfd);
5547  c->sfd = INVALID_SOCKET;
5548 
5549  if (c->refcount > 1) {
5550  conn_set_state(c, conn_pending_close);
5551  } else {
5552  conn_set_state(c, conn_immediate_close);
5553  }
5554  return true;
5555 }
5556 
5557 bool conn_add_tap_client(conn *c) {
5558  LIBEVENT_THREAD *tp = tap_thread;
5559  LIBEVENT_THREAD *orig_thread = c->thread;
5560 
5561  assert(orig_thread);
5562  assert(orig_thread != tp);
5563 
5564  c->ewouldblock = true;
5565 
5566  unregister_event(c);
5567 
5568  LOCK_THREAD(orig_thread);
5569  /* Clean out the lists */
5570  orig_thread->pending_io = list_remove(orig_thread->pending_io, c);
5571  orig_thread->pending_close = list_remove(orig_thread->pending_close, c);
5572 
5573  LOCK_THREAD(tp);
5574  c->ev_flags = 0;
5575  conn_set_state(c, conn_setup_tap_stream);
5576  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
5577  "Moving %d conn from %p to %p\n",
5578  c->sfd, c->thread, tp);
5579  c->thread = tp;
5580  c->event.ev_base = tp->base;
5581  assert(c->next == NULL);
5582  assert(c->list_state == 0);
5583  enlist_conn(c, &tp->pending_io);
5584 
5585  UNLOCK_THREAD(tp);
5586 
5587  UNLOCK_THREAD(orig_thread);
5588 
5589  notify_thread(tp);
5590 
5591  return false;
5592 }
5593 
5594 bool conn_setup_tap_stream(conn *c) {
5595  process_bin_tap_connect(c);
5596  return true;
5597 }
5598 
5599 void event_handler(const int fd, const short which, void *arg) {
5600  conn *c;
5601 
5602  c = (conn *)arg;
5603  assert(c != NULL);
5604 
5605  if (memcached_shutdown) {
5606  event_base_loopbreak(c->event.ev_base);
5607  return ;
5608  }
5609 
5610  c->which = which;
5611 
5612  /* sanity */
5613  if (fd != c->sfd) {
5614  if (settings.verbose > 0) {
5615  settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
5616  "Catastrophic: event fd doesn't match conn fd!\n");
5617  }
5618  conn_close(c);
5619  return;
5620  }
5621 
5622  perform_callbacks(ON_SWITCH_CONN, c, c);
5623 
5624  c->nevents = settings.reqs_per_event;
5625  if (c->state == conn_ship_log) {
5626  c->nevents = settings.reqs_per_tap_event;
5627  }
5628 
5629  LIBEVENT_THREAD *thr = c->thread;
5630 
5631  // Do we have pending closes?
5632  const size_t max_items = 256;
5633  conn *pending_close[max_items];
5634  size_t n_pending_close = 0;
5635  if (thr != NULL) {
5636  LOCK_THREAD(thr);
5637  if (thr->pending_close && thr->last_checked != current_time) {
5638  assert(!has_cycle(thr->pending_close));
5639  thr->last_checked = current_time;
5640 
5641  n_pending_close = list_to_array(pending_close, max_items,
5642  &thr->pending_close);
5643  }
5644  UNLOCK_THREAD(thr);
5645  }
5646 
5647  if (settings.verbose) {
5648  do {
5649  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
5650  "%d - Running task: (%s)\n",
5651  c->sfd, state_text(c->state));
5652  } while (c->state(c));
5653  } else {
5654  while (c->state(c)) {
5655  /* empty */
5656  }
5657  }
5658 
5659  /* Close any connections pending close */
5660  if (n_pending_close > 0) {
5661  for (size_t i = 0; i < n_pending_close; ++i) {
5662  conn *ce = pending_close[i];
5663  if (ce->refcount == 1) {
5664  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
5665  "OK, time to nuke: %p\n",
5666  (void*)ce);
5667  conn_close(ce);
5668  } else {
5669  LOCK_THREAD(ce->thread);
5670  enlist_conn(ce, &ce->thread->pending_close);
5671  UNLOCK_THREAD(ce->thread);
5672  }
5673  }
5674  }
5675 
5676  if (thr != NULL) {
5677  LOCK_THREAD(thr);
5678  finalize_list(pending_close, n_pending_close);
5679  UNLOCK_THREAD(thr);
5680  }
5681 }
5682 
5683 static void dispatch_event_handler(int fd, short which, void *arg) {
5684  char buffer[80];
5685  ssize_t nr = recv(fd, buffer, sizeof(buffer), 0);
5686 
5687  if (nr != -1 && is_listen_disabled()) {
5688  bool enable = false;
5689  pthread_mutex_lock(&listen_state.mutex);
5690  listen_state.count -= nr;
5691  if (listen_state.count <= 0) {
5692  enable = true;
5693  listen_state.disabled = false;
5694  }
5695  pthread_mutex_unlock(&listen_state.mutex);
5696  if (enable) {
5697  conn *next;
5698  for (next = listen_conn; next; next = next->next) {
5699  update_event(next, EV_READ | EV_PERSIST);
5700  if (listen(next->sfd, settings.backlog) != 0) {
5701  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5702  "listen() failed",
5703  strerror(errno));
5704  }
5705  }
5706  }
5707  }
5708 }
5709 
5710 
5711 
5712 static SOCKET new_socket(struct addrinfo *ai) {
5713  SOCKET sfd;
5714 
5715  sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
5716  if (sfd == INVALID_SOCKET) {
5717  return INVALID_SOCKET;
5718  }
5719 
5720  if (evutil_make_socket_nonblocking(sfd) == -1) {
5721  safe_close(sfd);
5722  return INVALID_SOCKET;
5723  }
5724 
5725  return sfd;
5726 }
5727 
5728 
5729 /*
5730  * Sets a socket's send buffer size to the maximum allowed by the system.
5731  */
5732 static void maximize_sndbuf(const int sfd) {
5733  socklen_t intsize = sizeof(int);
5734  int last_good = 0;
5735  int min, max, avg;
5736  int old_size;
5737 
5738  /* Start with the default size. */
5739  if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&old_size, &intsize) != 0) {
5740  if (settings.verbose > 0) {
5741  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5742  "getsockopt(SO_SNDBUF): %s",
5743  strerror(errno));
5744  }
5745 
5746  return;
5747  }
5748 
5749  /* Binary-search for the real maximum. */
5750  min = old_size;
5751  max = MAX_SENDBUF_SIZE;
5752 
5753  while (min <= max) {
5754  avg = ((unsigned int)(min + max)) / 2;
5755  if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
5756  last_good = avg;
5757  min = avg + 1;
5758  } else {
5759  max = avg - 1;
5760  }
5761  }
5762 
5763  if (settings.verbose > 1) {
5764  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
5765  "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
5766  }
5767 }
5768 
5769 
5770 
5780 static int server_socket(const char *interface,
5781  int port,
5782  enum network_transport transport,
5783  FILE *portnumber_file) {
5784  int sfd;
5785  struct linger ling = {0, 0};
5786  struct addrinfo *ai;
5787  struct addrinfo *next;
5788  struct addrinfo hints = { .ai_flags = AI_PASSIVE,
5789  .ai_family = AF_UNSPEC };
5790  char port_buf[NI_MAXSERV];
5791  int error;
5792  int success = 0;
5793  int flags =1;
5794  num_udp_socket = 0;
5795 
5796  hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
5797 
5798  if (port == -1) {
5799  port = 0;
5800  }
5801  snprintf(port_buf, sizeof(port_buf), "%d", port);
5802  error= getaddrinfo(interface, port_buf, &hints, &ai);
5803  if (error != 0) {
5804  if (error != EAI_SYSTEM) {
5805  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5806  "getaddrinfo(): %s\n", gai_strerror(error));
5807  } else {
5808  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5809  "getaddrinfo(): %s\n", strerror(error));
5810  }
5811  return 1;
5812  }
5813 
5814  for (next= ai; next; next= next->ai_next) {
5815  conn *listen_conn_add;
5816  if ((sfd = new_socket(next)) == INVALID_SOCKET) {
5817  /* getaddrinfo can return "junk" addresses,
5818  * we make sure at least one works before erroring.
5819  */
5820  continue;
5821  }
5822 
5823 #ifdef IPV6_V6ONLY
5824  if (next->ai_family == AF_INET6) {
5825  error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
5826  if (error != 0) {
5827  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5828  "setsockopt(IPV6_V6ONLY): %s",
5829  strerror(errno));
5830  safe_close(sfd);
5831  continue;
5832  }
5833  }
5834 #endif
5835 
5836  setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
5837  if (IS_UDP(transport)) {
5838  maximize_sndbuf(sfd);
5839  udp_socket[num_udp_socket] = sfd;
5840  num_udp_socket++;
5841  } else {
5842  error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
5843  if (error != 0) {
5844  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5845  "setsockopt(SO_KEEPALIVE): %s",
5846  strerror(errno));
5847  }
5848 
5849  error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
5850  if (error != 0) {
5851  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5852  "setsockopt(SO_LINGER): %s",
5853  strerror(errno));
5854  }
5855 
5856  error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
5857  if (error != 0) {
5858  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5859  "setsockopt(TCP_NODELAY): %s",
5860  strerror(errno));
5861  }
5862  }
5863 
5864  if (bind(sfd, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR) {
5865  if (errno != EADDRINUSE) {
5866  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5867  "bind(): %s",
5868  strerror(errno));
5869  safe_close(sfd);
5870  freeaddrinfo(ai);
5871  return 1;
5872  }
5873  safe_close(sfd);
5874  continue;
5875  } else {
5876  success++;
5877  if (!IS_UDP(transport) && listen(sfd, settings.backlog) == SOCKET_ERROR) {
5878  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5879  "listen(): %s",
5880  strerror(errno));
5881  safe_close(sfd);
5882  freeaddrinfo(ai);
5883  return 1;
5884  }
5885  if (portnumber_file != NULL &&
5886  (next->ai_addr->sa_family == AF_INET ||
5887  next->ai_addr->sa_family == AF_INET6)) {
5888  union {
5889  struct sockaddr_in in;
5890  struct sockaddr_in6 in6;
5891  } my_sockaddr;
5892  socklen_t len = sizeof(my_sockaddr);
5893  if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
5894  if (next->ai_addr->sa_family == AF_INET) {
5895  fprintf(portnumber_file, "%s INET: %u\n",
5896  IS_UDP(transport) ? "UDP" : "TCP",
5897  ntohs(my_sockaddr.in.sin_port));
5898  } else {
5899  fprintf(portnumber_file, "%s INET6: %u\n",
5900  IS_UDP(transport) ? "UDP" : "TCP",
5901  ntohs(my_sockaddr.in6.sin6_port));
5902  }
5903  }
5904  }
5905  }
5906 
5907  if (IS_UDP(transport)) {
5908  int c;
5909 
5910  for (c = 0; c < settings.num_threads_per_udp; c++) {
5911  /* this is guaranteed to hit all threads because we round-robin */
5912  dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
5913  UDP_READ_BUFFER_SIZE, transport);
5914  STATS_LOCK();
5915  ++stats.curr_conns;
5916  ++stats.daemon_conns;
5917  STATS_UNLOCK();
5918  }
5919  } else {
5920  if (!(listen_conn_add = conn_new(sfd, conn_listening,
5921  EV_READ | EV_PERSIST, 1,
5922  transport, main_base, NULL))) {
5923  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5924  "failed to create listening connection\n");
5925  exit(EXIT_FAILURE);
5926  }
5927  listen_conn_add->next = listen_conn;
5928  listen_conn = listen_conn_add;
5929  STATS_LOCK();
5930  ++stats.curr_conns;
5931  ++stats.daemon_conns;
5932  STATS_UNLOCK();
5933  }
5934  }
5935 
5936  freeaddrinfo(ai);
5937 
5938  /* Return zero iff we detected no errors in starting up connections */
5939  return success == 0;
5940 }
5941 
5942 static int server_sockets(int port, enum network_transport transport,
5943  FILE *portnumber_file) {
5944  if (settings.inter == NULL) {
5945  return server_socket(settings.inter, port, transport, portnumber_file);
5946  } else {
5947  // tokenize them and bind to each one of them..
5948  char *b;
5949  int ret = 0;
5950  char *list = strdup(settings.inter);
5951 
5952  if (list == NULL) {
5953  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5954  "Failed to allocate memory for parsing server interface string\n");
5955  return 1;
5956  }
5957  for (char *p = strtok_r(list, ";,", &b);
5958  p != NULL;
5959  p = strtok_r(NULL, ";,", &b)) {
5960  int the_port = port;
5961 
5962  char *s = strchr(p, ':');
5963  if (s != NULL) {
5964  *s = '\0';
5965  ++s;
5966  if (!safe_strtol(s, &the_port)) {
5967  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5968  "Invalid port number: \"%s\"", s);
5969  return 1;
5970  }
5971  }
5972  if (strcmp(p, "*") == 0) {
5973  p = NULL;
5974  }
5975  ret |= server_socket(p, the_port, transport, portnumber_file);
5976  }
5977  free(list);
5978  return ret;
5979  }
5980 }
5981 
5982 static int new_socket_unix(void) {
5983  int sfd;
5984 
5985  if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == INVALID_SOCKET) {
5986  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5987  "socket(AF_UNIX, SOCK_STREAM, 0): %s",
5988  strerror(errno));
5989  return INVALID_SOCKET;
5990  }
5991 
5992  if (evutil_make_socket_nonblocking(sfd) == -1) {
5993  safe_close(sfd);
5994  return INVALID_SOCKET;
5995  }
5996  return sfd;
5997 }
5998 
5999 /* this will probably not work on windows */
6000 static int server_socket_unix(const char *path, int access_mask) {
6001  int sfd;
6002  struct linger ling = {0, 0};
6003  struct sockaddr_un addr;
6004  struct stat tstat;
6005  int flags =1;
6006  int old_umask;
6007 
6008  if (!path) {
6009  return 1;
6010  }
6011 
6012  if ((sfd = new_socket_unix()) == -1) {
6013  return 1;
6014  }
6015 
6016  /*
6017  * Clean up a previous socket file if we left it around
6018  */
6019  if (lstat(path, &tstat) == 0) {
6020  if (S_ISSOCK(tstat.st_mode))
6021  unlink(path);
6022  }
6023 
6024  setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
6025  setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
6026  setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
6027 
6028  /*
6029  * the memset call clears nonstandard fields in some impementations
6030  * that otherwise mess things up.
6031  */
6032  memset(&addr, 0, sizeof(addr));
6033 
6034  addr.sun_family = AF_UNIX;
6035  strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
6036  assert(strcmp(addr.sun_path, path) == 0);
6037  old_umask = umask( ~(access_mask&0777));
6038  if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
6039  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6040  "bind(): %s",
6041  strerror(errno));
6042  safe_close(sfd);
6043  umask(old_umask);
6044  return 1;
6045  }
6046  umask(old_umask);
6047  if (listen(sfd, settings.backlog) == -1) {
6048  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6049  "listen(): %s",
6050  strerror(errno));
6051  safe_close(sfd);
6052  return 1;
6053  }
6054  if (!(listen_conn = conn_new(sfd, conn_listening,
6055  EV_READ | EV_PERSIST, 1,
6056  local_transport, main_base, NULL))) {
6057  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6058  "failed to create listening connection\n");
6059  exit(EXIT_FAILURE);
6060  }
6061  STATS_LOCK();
6062  ++stats.daemon_conns;
6063  STATS_UNLOCK();
6064 
6065  return 0;
6066 }
6067 
6068 static struct event clockevent;
6069 
6070 /* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
6071 static void set_current_time(void) {
6072  struct timeval timer;
6073 
6074  gettimeofday(&timer, NULL);
6075  current_time = (rel_time_t) (timer.tv_sec - process_started);
6076 }
6077 
6078 static void clock_handler(const int fd, const short which, void *arg) {
6079  struct timeval t = {.tv_sec = 1, .tv_usec = 0};
6080  static bool initialized = false;
6081 
6082  if (memcached_shutdown) {
6083  event_base_loopbreak(main_base);
6084  return ;
6085  }
6086 
6087  if (initialized) {
6088  /* only delete the event if it's actually there. */
6089  evtimer_del(&clockevent);
6090  } else {
6091  initialized = true;
6092  }
6093 
6094  evtimer_set(&clockevent, clock_handler, 0);
6095  event_base_set(main_base, &clockevent);
6096  evtimer_add(&clockevent, &t);
6097 
6098  set_current_time();
6099 }
6100 
6101 static void usage(void) {
6102  printf(PACKAGE " " VERSION "\n");
6103  printf("-p <num> TCP port number to listen on (default: 11211)\n"
6104  "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
6105  "-s <file> UNIX socket path to listen on (disables network support)\n"
6106  "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
6107  "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
6108  " <addr> may be specified as host:port. If you don't specify\n"
6109  " a port number, the value you specified with -p or -U is\n"
6110  " used. You may specify multiple addresses separated by comma\n"
6111  " or by using -l multiple times\n"
6112  "-d run as a daemon\n"
6113  "-r maximize core file limit\n"
6114  "-u <username> assume identity of <username> (only when run as root)\n"
6115  "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
6116  "-M return error on memory exhausted (rather than removing items)\n"
6117  "-c <num> max simultaneous connections (default: 1000)\n"
6118  "-k lock down all paged memory. Note that there is a\n"
6119  " limit on how much memory you may lock. Trying to\n"
6120  " allocate more than that would fail, so be sure you\n"
6121  " set the limit correctly for the user you started\n"
6122  " the daemon with (not for -u <username> user;\n"
6123  " under sh this is done with 'ulimit -S -l NUM_KB').\n"
6124  "-v verbose (print errors/warnings while in event loop)\n"
6125  "-vv very verbose (also print client commands/reponses)\n"
6126  "-vvv extremely verbose (also print internal state transitions)\n"
6127  "-h print this help and exit\n"
6128  "-i print memcached and libevent license\n"
6129  "-P <file> save PID in <file>, only used with -d option\n"
6130  "-f <factor> chunk size growth factor (default: 1.25)\n"
6131  "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
6132  printf("-L Try to use large memory pages (if available). Increasing\n"
6133  " the memory page size could reduce the number of TLB misses\n"
6134  " and improve the performance. In order to get large pages\n"
6135  " from the OS, memcached will allocate the total item-cache\n"
6136  " in one large chunk.\n");
6137  printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
6138  " This is used for per-prefix stats reporting. The default is\n"
6139  " \":\" (colon). If this option is specified, stats collection\n"
6140  " is turned on automatically; if not, then it may be turned on\n"
6141  " by sending the \"stats detail on\" command to the server.\n");
6142  printf("-t <num> number of threads to use (default: 4)\n");
6143  printf("-R Maximum number of requests per event, limits the number of\n"
6144  " requests process for a given connection to prevent \n"
6145  " starvation (default: 20)\n");
6146  printf("-C Disable use of CAS\n");
6147  printf("-b Set the backlog queue limit (default: 1024)\n");
6148  printf("-B Binding protocol - one of ascii, binary, or auto (default)\n");
6149  printf("-I Override the size of each slab page. Adjusts max item size\n"
6150  " (default: 1mb, min: 1k, max: 128m)\n");
6151  printf("-q Disable detailed stats commands\n");
6152 #ifdef SASL_ENABLED
6153  printf("-S Require SASL authentication\n");
6154 #endif
6155  printf("-X module,cfg Load the module and initialize it with the config\n");
6156  printf("-E engine Load engine as the storage engine\n");
6157  printf("-e config Pass config as configuration options to the storage engine\n");
6158  printf("\nEnvironment variables:\n"
6159  "MEMCACHED_PORT_FILENAME File to write port information to\n"
6160  "MEMCACHED_TOP_KEYS Number of top keys to keep track of\n"
6161  "MEMCACHED_REQS_TAP_EVENT Similar to -R but for tap_ship_log\n");
6162 }
6163 static void usage_license(void) {
6164  printf(PACKAGE " " VERSION "\n\n");
6165  printf(
6166  "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
6167  "All rights reserved.\n"
6168  "\n"
6169  "Redistribution and use in source and binary forms, with or without\n"
6170  "modification, are permitted provided that the following conditions are\n"
6171  "met:\n"
6172  "\n"
6173  " * Redistributions of source code must retain the above copyright\n"
6174  "notice, this list of conditions and the following disclaimer.\n"
6175  "\n"
6176  " * Redistributions in binary form must reproduce the above\n"
6177  "copyright notice, this list of conditions and the following disclaimer\n"
6178  "in the documentation and/or other materials provided with the\n"
6179  "distribution.\n"
6180  "\n"
6181  " * Neither the name of the Danga Interactive nor the names of its\n"
6182  "contributors may be used to endorse or promote products derived from\n"
6183  "this software without specific prior written permission.\n"
6184  "\n"
6185  "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
6186  "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
6187  "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
6188  "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
6189  "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
6190  "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
6191  "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
6192  "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
6193  "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
6194  "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
6195  "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
6196  "\n"
6197  "\n"
6198  "This product includes software developed by Niels Provos.\n"
6199  "\n"
6200  "[ libevent ]\n"
6201  "\n"
6202  "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
6203  "All rights reserved.\n"
6204  "\n"
6205  "Redistribution and use in source and binary forms, with or without\n"
6206  "modification, are permitted provided that the following conditions\n"
6207  "are met:\n"
6208  "1. Redistributions of source code must retain the above copyright\n"
6209  " notice, this list of conditions and the following disclaimer.\n"
6210  "2. Redistributions in binary form must reproduce the above copyright\n"
6211  " notice, this list of conditions and the following disclaimer in the\n"
6212  " documentation and/or other materials provided with the distribution.\n"
6213  "3. All advertising materials mentioning features or use of this software\n"
6214  " must display the following acknowledgement:\n"
6215  " This product includes software developed by Niels Provos.\n"
6216  "4. The name of the author may not be used to endorse or promote products\n"
6217  " derived from this software without specific prior written permission.\n"
6218  "\n"
6219  "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
6220  "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
6221  "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
6222  "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
6223  "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
6224  "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
6225  "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
6226  "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
6227  "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
6228  "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
6229  );
6230 
6231  return;
6232 }
6233 
6234 static void save_pid(const char *pid_file) {
6235  FILE *fp;
6236 
6237  if (access(pid_file, F_OK) == 0) {
6238  if ((fp = fopen(pid_file, "r")) != NULL) {
6239  char buffer[1024];
6240  if (fgets(buffer, sizeof(buffer), fp) != NULL) {
6241  unsigned int pid;
6242  if (safe_strtoul(buffer, &pid) && kill((pid_t)pid, 0) == 0) {
6243  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6244  "WARNING: The pid file contained the following (running) pid: %u\n", pid);
6245  }
6246  }
6247  fclose(fp);
6248  }
6249  }
6250 
6251  if ((fp = fopen(pid_file, "w")) == NULL) {
6252  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6253  "Could not open the pid file %s for writing: %s\n",
6254  pid_file, strerror(errno));
6255  return;
6256  }
6257 
6258  fprintf(fp,"%ld\n", (long)getpid());
6259  if (fclose(fp) == -1) {
6260  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6261  "Could not close the pid file %s: %s\n",
6262  pid_file, strerror(errno));
6263  }
6264 }
6265 
6266 static void remove_pidfile(const char *pid_file) {
6267  if (pid_file != NULL) {
6268  if (unlink(pid_file) != 0) {
6269  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6270  "Could not remove the pid file %s: %s\n",
6271  pid_file, strerror(errno));
6272  }
6273  }
6274 }
6275 
6276 #ifndef HAVE_SIGIGNORE
6277 static int sigignore(int sig) {
6278  struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
6279 
6280  if (sigemptyset(&sa.sa_mask) == -1 || sigaction(sig, &sa, 0) == -1) {
6281  return -1;
6282  }
6283  return 0;
6284 }
6285 #endif /* !HAVE_SIGIGNORE */
6286 
6287 static void sigterm_handler(int sig) {
6288  assert(sig == SIGTERM || sig == SIGINT);
6289  memcached_shutdown = 1;
6290 }
6291 
6292 static int install_sigterm_handler(void) {
6293  struct sigaction sa = {.sa_handler = sigterm_handler, .sa_flags = 0};
6294 
6295  if (sigemptyset(&sa.sa_mask) == -1 || sigaction(SIGTERM, &sa, 0) == -1 ||
6296  sigaction(SIGINT, &sa, 0) == -1) {
6297  return -1;
6298  }
6299 
6300  return 0;
6301 }
6302 
6303 /*
6304  * On systems that supports multiple page sizes we may reduce the
6305  * number of TLB-misses by using the biggest available page size
6306  */
6307 static int enable_large_pages(void) {
6308 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
6309  int ret = -1;
6310  size_t sizes[32];
6311  int avail = getpagesizes(sizes, 32);
6312  if (avail != -1) {
6313  size_t max = sizes[0];
6314  struct memcntl_mha arg = {0};
6315  int ii;
6316 
6317  for (ii = 1; ii < avail; ++ii) {
6318  if (max < sizes[ii]) {
6319  max = sizes[ii];
6320  }
6321  }
6322 
6323  arg.mha_flags = 0;
6324  arg.mha_pagesize = max;
6325  arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
6326 
6327  if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
6328  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6329  "Failed to set large pages: %s\nWill use default page size\n",
6330  strerror(errno));
6331  } else {
6332  ret = 0;
6333  }
6334  } else {
6335  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6336  "Failed to get supported pagesizes: %s\nWill use default page size\n",
6337  strerror(errno));
6338  }
6339 
6340  return ret;
6341 #else
6342  return 0;
6343 #endif
6344 }
6345 
6346 static const char* get_server_version(void) {
6347  return VERSION;
6348 }
6349 
6350 static void store_engine_specific(const void *cookie,
6351  void *engine_data) {
6352  conn *c = (conn*)cookie;
6353  c->engine_storage = engine_data;
6354 }
6355 
6356 static void *get_engine_specific(const void *cookie) {
6357  conn *c = (conn*)cookie;
6358  return c->engine_storage;
6359 }
6360 
6361 static int get_socket_fd(const void *cookie) {
6362  conn *c = (conn *)cookie;
6363  return c->sfd;
6364 }
6365 
6366 static void set_tap_nack_mode(const void *cookie, bool enable) {
6367  conn *c = (conn *)cookie;
6368  c->tap_nack_mode = enable;
6369 }
6370 
6371 static void reserve_cookie(const void *cookie) {
6372  conn *c = (conn *)cookie;
6373  ++c->refcount;
6374 }
6375 
6376 static void release_cookie(const void *cookie) {
6377  conn *c = (conn *)cookie;
6378  --c->refcount;
6379 }
6380 
6381 static int num_independent_stats(void) {
6382  return settings.num_threads + 1;
6383 }
6384 
6385 static void *new_independent_stats(void) {
6386  int ii;
6387  int nrecords = num_independent_stats();
6388  struct independent_stats *independent_stats = calloc(sizeof(independent_stats) + sizeof(struct thread_stats) * nrecords, 1);
6389  if (settings.topkeys > 0)
6390  independent_stats->topkeys = topkeys_init(settings.topkeys);
6391  for (ii = 0; ii < nrecords; ii++)
6392  pthread_mutex_init(&independent_stats->thread_stats[ii].mutex, NULL);
6393  return independent_stats;
6394 }
6395 
6396 static void release_independent_stats(void *stats) {
6397  int ii;
6398  int nrecords = num_independent_stats();
6399  struct independent_stats *independent_stats = stats;
6400  if (independent_stats->topkeys)
6401  topkeys_free(independent_stats->topkeys);
6402  for (ii = 0; ii < nrecords; ii++)
6403  pthread_mutex_destroy(&independent_stats->thread_stats[ii].mutex);
6404  free(independent_stats);
6405 }
6406 
6407 static inline struct independent_stats *get_independent_stats(conn *c) {
6408  struct independent_stats *independent_stats;
6409  if (settings.engine.v1->get_stats_struct != NULL) {
6410  independent_stats = settings.engine.v1->get_stats_struct(settings.engine.v0, (const void *)c);
6411  if (independent_stats == NULL)
6412  independent_stats = default_independent_stats;
6413  } else {
6414  independent_stats = default_independent_stats;
6415  }
6416  return independent_stats;
6417 }
6418 
6419 static inline struct thread_stats *get_thread_stats(conn *c) {
6420  struct independent_stats *independent_stats = get_independent_stats(c);
6421  assert(c->thread->index < num_independent_stats());
6422  return &independent_stats->thread_stats[c->thread->index];
6423 }
6424 
6425 static void register_callback(ENGINE_HANDLE *eh,
6426  ENGINE_EVENT_TYPE type,
6427  EVENT_CALLBACK cb, const void *cb_data) {
6428  struct engine_event_handler *h =
6429  calloc(sizeof(struct engine_event_handler), 1);
6430 
6431  assert(h);
6432  h->cb = cb;
6433  h->cb_data = cb_data;
6434  h->next = engine_event_handlers[type];
6435  engine_event_handlers[type] = h;
6436 }
6437 
6438 static rel_time_t get_current_time(void)
6439 {
6440  return current_time;
6441 }
6442 
6443 static void count_eviction(const void *cookie, const void *key, const int nkey) {
6444  topkeys_t *tk = get_independent_stats((conn*)cookie)->topkeys;
6445  TK(tk, evictions, key, nkey, get_current_time());
6446 }
6447 
6455 static ENGINE_ERROR_CODE internal_arithmetic(ENGINE_HANDLE* handle,
6456  const void* cookie,
6457  const void* key,
6458  const int nkey,
6459  const bool increment,
6460  const bool create,
6461  const uint64_t delta,
6462  const uint64_t initial,
6463  const rel_time_t exptime,
6464  uint64_t *cas,
6465  uint64_t *result,
6466  uint16_t vbucket)
6467 {
6468  ENGINE_HANDLE_V1 *e = (ENGINE_HANDLE_V1*)handle;
6469 
6470  item *it = NULL;
6471 
6472  ENGINE_ERROR_CODE ret;
6473  ret = e->get(handle, cookie, &it, key, nkey, vbucket);
6474 
6475  if (ret == ENGINE_SUCCESS) {
6476  item_info info = { .nvalue = 1 };
6477 
6478  if (!e->get_item_info(handle, cookie, it, &info)) {
6479  e->release(handle, cookie, it);
6480  return ENGINE_FAILED;
6481  }
6482 
6483  char value[80];
6484 
6485  if (info.value[0].iov_len > (sizeof(value) - 1)) {
6486  e->release(handle, cookie, it);
6487  return ENGINE_EINVAL;
6488  }
6489 
6490  memcpy(value, info.value[0].iov_base, info.value[0].iov_len);
6491  value[info.value[0].iov_len] = '\0';
6492 
6493  uint64_t val;
6494  if (!safe_strtoull(value, &val)) {
6495  e->release(handle, cookie, it);
6496  return ENGINE_EINVAL;
6497  }
6498 
6499  if (increment) {
6500  val += delta;
6501  } else {
6502  if (delta > val) {
6503  val = 0;
6504  } else {
6505  val -= delta;
6506  }
6507  }
6508 
6509  size_t nb = snprintf(value, sizeof(value), "%"PRIu64, val);
6510  *result = val;
6511  item *nit = NULL;
6512  if (e->allocate(handle, cookie, &nit, key,
6513  nkey, nb, info.flags, info.exptime) != ENGINE_SUCCESS) {
6514  e->release(handle, cookie, it);
6515  return ENGINE_ENOMEM;
6516  }
6517 
6518  item_info i2 = { .nvalue = 1 };
6519  if (!e->get_item_info(handle, cookie, nit, &i2)) {
6520  e->release(handle, cookie, it);
6521  e->release(handle, cookie, nit);
6522  return ENGINE_FAILED;
6523  }
6524 
6525  memcpy(i2.value[0].iov_base, value, nb);
6526  e->item_set_cas(handle, cookie, nit, info.cas);
6527  ret = e->store(handle, cookie, nit, cas, OPERATION_CAS, vbucket);
6528  e->release(handle, cookie, it);
6529  e->release(handle, cookie, nit);
6530  } else if (ret == ENGINE_KEY_ENOENT && create) {
6531  char value[80];
6532  size_t nb = snprintf(value, sizeof(value), "%"PRIu64"\r\n", initial);
6533  *result = initial;
6534  if (e->allocate(handle, cookie, &it, key, nkey, nb, 0, exptime) != ENGINE_SUCCESS) {
6535  e->release(handle, cookie, it);
6536  return ENGINE_ENOMEM;
6537  }
6538 
6539  item_info info = { .nvalue = 1 };
6540  if (!e->get_item_info(handle, cookie, it, &info)) {
6541  e->release(handle, cookie, it);
6542  return ENGINE_FAILED;
6543  }
6544 
6545  memcpy(info.value[0].iov_base, value, nb);
6546  ret = e->store(handle, cookie, it, cas, OPERATION_CAS, vbucket);
6547  e->release(handle, cookie, it);
6548  }
6549 
6550  /* We had a race condition.. just call ourself recursively to retry */
6551  if (ret == ENGINE_KEY_EEXISTS) {
6552  return internal_arithmetic(handle, cookie, key, nkey, increment, create, delta,
6553  initial, exptime, cas, result, vbucket);
6554  }
6555 
6556  return ret;
6557 }
6558 
6566 static bool register_extension(extension_type_t type, void *extension)
6567 {
6568  if (extension == NULL) {
6569  return false;
6570  }
6571 
6572  switch (type) {
6573  case EXTENSION_DAEMON:
6574  for (EXTENSION_DAEMON_DESCRIPTOR *ptr = settings.extensions.daemons;
6575  ptr != NULL;
6576  ptr = ptr->next) {
6577  if (ptr == extension) {
6578  return false;
6579  }
6580  }
6581  ((EXTENSION_DAEMON_DESCRIPTOR *)(extension))->next = settings.extensions.daemons;
6582  settings.extensions.daemons = extension;
6583  return true;
6584  case EXTENSION_LOGGER:
6585  settings.extensions.logger = extension;
6586  return true;
6588  if (settings.extensions.ascii != NULL) {
6590  for (last = settings.extensions.ascii; last->next != NULL;
6591  last = last->next) {
6592  if (last == extension) {
6593  return false;
6594  }
6595  }
6596  if (last == extension) {
6597  return false;
6598  }
6599  last->next = extension;
6600  last->next->next = NULL;
6601  } else {
6602  settings.extensions.ascii = extension;
6603  settings.extensions.ascii->next = NULL;
6604  }
6605  return true;
6606 
6607  default:
6608  return false;
6609  }
6610 }
6611 
6618 static void unregister_extension(extension_type_t type, void *extension)
6619 {
6620  switch (type) {
6621  case EXTENSION_DAEMON:
6622  {
6623  EXTENSION_DAEMON_DESCRIPTOR *prev = NULL;
6624  EXTENSION_DAEMON_DESCRIPTOR *ptr = settings.extensions.daemons;
6625 
6626  while (ptr != NULL && ptr != extension) {
6627  prev = ptr;
6628  ptr = ptr->next;
6629  }
6630 
6631  if (ptr != NULL && prev != NULL) {
6632  prev->next = ptr->next;
6633  }
6634 
6635  if (settings.extensions.daemons == ptr) {
6636  settings.extensions.daemons = ptr->next;
6637  }
6638  }
6639  break;
6640  case EXTENSION_LOGGER:
6641  if (settings.extensions.logger == extension) {
6642  if (get_stderr_logger() == extension) {
6643  settings.extensions.logger = get_null_logger();
6644  } else {
6645  settings.extensions.logger = get_stderr_logger();
6646  }
6647  }
6648  break;
6650  {
6652  EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ptr = settings.extensions.ascii;
6653 
6654  while (ptr != NULL && ptr != extension) {
6655  prev = ptr;
6656  ptr = ptr->next;
6657  }
6658 
6659  if (ptr != NULL && prev != NULL) {
6660  prev->next = ptr->next;
6661  }
6662 
6663  if (settings.extensions.ascii == ptr) {
6664  settings.extensions.ascii = ptr->next;
6665  }
6666  }
6667  break;
6668 
6669  default:
6670  ;
6671  }
6672 
6673 }
6674 
6678 static void* get_extension(extension_type_t type)
6679 {
6680  switch (type) {
6681  case EXTENSION_DAEMON:
6682  return settings.extensions.daemons;
6683 
6684  case EXTENSION_LOGGER:
6685  return settings.extensions.logger;
6686 
6688  return settings.extensions.ascii;
6689 
6690  default:
6691  return NULL;
6692  }
6693 }
6694 
6695 #ifdef INNODB_MEMCACHED
6696 void shutdown_server(void) {
6697 #else
6698 static void shutdown_server(void) {
6699 #endif /* INNODB_MEMCACHED */
6700 #ifdef INNODB_MEMCACHED
6701  int i;
6702  /* Clean up connections */
6703  while (listen_conn) {
6704  conn_closing(listen_conn);
6705  listen_conn = listen_conn->next;
6706  }
6707 
6708  for (i = 0; i < num_udp_socket; i++) {
6709  safe_close(udp_socket[i]);
6710  }
6711 #endif
6712  memcached_shutdown = 1;
6713 }
6714 
6715 #ifdef INNODB_MEMCACHED
6716 bool shutdown_complete(void)
6717 {
6718  return(memcached_shutdown == 2);
6719 }
6720 #endif
6721 
6722 static EXTENSION_LOGGER_DESCRIPTOR* get_logger(void)
6723 {
6724  return settings.extensions.logger;
6725 }
6726 
6727 static EXTENSION_LOG_LEVEL get_log_level(void)
6728 {
6729  EXTENSION_LOG_LEVEL ret;
6730  switch (settings.verbose) {
6731  case 0: ret = EXTENSION_LOG_WARNING; break;
6732  case 1: ret = EXTENSION_LOG_INFO; break;
6733  case 2: ret = EXTENSION_LOG_DEBUG; break;
6734  default:
6735  ret = EXTENSION_LOG_DETAIL;
6736  }
6737  return ret;
6738 }
6739 
6740 static void set_log_level(EXTENSION_LOG_LEVEL severity)
6741 {
6742  switch (severity) {
6743  case EXTENSION_LOG_WARNING: settings.verbose = 0; break;
6744  case EXTENSION_LOG_INFO: settings.verbose = 1; break;
6745  case EXTENSION_LOG_DEBUG: settings.verbose = 2; break;
6746  default:
6747  settings.verbose = 3;
6748  }
6749 }
6750 
6751 static void get_config_append_stats(const char *key, const uint16_t klen,
6752  const char *val, const uint32_t vlen,
6753  const void *cookie)
6754 {
6755  if (klen == 0 || vlen == 0) {
6756  return ;
6757  }
6758 
6759  char *pos = (char*)cookie;
6760  size_t nbytes = strlen(pos);
6761 
6762  if ((nbytes + klen + vlen + 3) > 1024) {
6763  // Not enough size in the buffer..
6764  return;
6765  }
6766 
6767  memcpy(pos + nbytes, key, klen);
6768  nbytes += klen;
6769  pos[nbytes] = '=';
6770  ++nbytes;
6771  memcpy(pos + nbytes, val, vlen);
6772  nbytes += vlen;
6773  memcpy(pos + nbytes, ";", 2);
6774 }
6775 
6776 static bool get_config(struct config_item items[]) {
6777  char config[1024];
6778  config[0] = '\0';
6779  process_stat_settings(get_config_append_stats, config);
6780  int rval = parse_config(config, items, NULL);
6781  return rval >= 0;
6782 }
6783 
6789 static SERVER_HANDLE_V1 *get_server_api(void)
6790 {
6791  static SERVER_CORE_API core_api = {
6792  .server_version = get_server_version,
6793  .hash = hash,
6794  .realtime = realtime,
6795  .abstime = abstime,
6796  .get_current_time = get_current_time,
6797  .parse_config = parse_config,
6798  .shutdown = shutdown_server,
6799  .get_config = get_config
6800  };
6801 
6802  static SERVER_COOKIE_API server_cookie_api = {
6803  .get_auth_data = get_auth_data,
6804  .store_engine_specific = store_engine_specific,
6805  .get_engine_specific = get_engine_specific,
6806  .get_socket_fd = get_socket_fd,
6807  .set_tap_nack_mode = set_tap_nack_mode,
6808  .notify_io_complete = notify_io_complete,
6809  .reserve = reserve_cookie,
6810  .release = release_cookie
6811  };
6812 
6813  static SERVER_STAT_API server_stat_api = {
6814  .new_stats = new_independent_stats,
6815  .release_stats = release_independent_stats,
6816  .evicting = count_eviction
6817  };
6818 
6819  static SERVER_LOG_API server_log_api = {
6820  .get_logger = get_logger,
6821  .get_level = get_log_level,
6822  .set_level = set_log_level
6823  };
6824  static SERVER_EXTENSION_API extension_api = {
6825  .register_extension = register_extension,
6826  .unregister_extension = unregister_extension,
6827  .get_extension = get_extension
6828  };
6829 
6830  static SERVER_CALLBACK_API callback_api = {
6831  .register_callback = register_callback,
6832  .perform_callbacks = perform_callbacks,
6833  };
6834 
6835  static SERVER_HANDLE_V1 rv = {
6836  .interface = 1,
6837  .core = &core_api,
6838  .stat = &server_stat_api,
6839  .extension = &extension_api,
6840  .callback = &callback_api,
6841  .log = &server_log_api,
6842  .cookie = &server_cookie_api
6843  };
6844 
6845  if (rv.engine == NULL) {
6846  rv.engine = settings.engine.v0;
6847  }
6848 
6849  return &rv;
6850 }
6851 
6859 static bool load_extension(const char *soname, const char *config) {
6860  if (soname == NULL) {
6861  return false;
6862  }
6863 
6864  /* Hack to remove the warning from C99 */
6865  union my_hack {
6867  void* voidptr;
6868  } funky = {.initialize = NULL };
6869 
6870  void *handle = dlopen(soname, RTLD_NOW | RTLD_LOCAL);
6871  if (handle == NULL) {
6872  const char *msg = dlerror();
6873  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6874  "Failed to open library \"%s\": %s\n",
6875  soname, msg ? msg : "unknown error");
6876  return false;
6877  }
6878 
6879  void *symbol = dlsym(handle, "memcached_extensions_initialize");
6880  if (symbol == NULL) {
6881  const char *msg = dlerror();
6882  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6883  "Could not find symbol \"memcached_extensions_initialize\" in %s: %s\n",
6884  soname, msg ? msg : "unknown error");
6885  return false;
6886  }
6887  funky.voidptr = symbol;
6888 
6889  EXTENSION_ERROR_CODE error = (*funky.initialize)(config, get_server_api);
6890 
6891  if (error != EXTENSION_SUCCESS) {
6892  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6893  "Failed to initalize extensions from %s. Error code: %d\n",
6894  soname, error);
6895  dlclose(handle);
6896  return false;
6897  }
6898 
6899  if (settings.verbose > 0) {
6900  settings.extensions.logger->log(EXTENSION_LOG_INFO, NULL,
6901  "Loaded extensions from: %s\n", soname);
6902  }
6903 
6904  return true;
6905 }
6906 
6911 static bool sanitycheck(void) {
6912  /* One of our biggest problems is old and bogus libevents */
6913  const char *ever = event_get_version();
6914  if (ever != NULL) {
6915  if (strncmp(ever, "1.", 2) == 0) {
6916  /* Require at least 1.3 (that's still a couple of years old) */
6917  if ((ever[2] == '1' || ever[2] == '2') && !isdigit(ever[3])) {
6918  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6919  "You are using libevent %s.\nPlease upgrade to"
6920  " a more recent version (1.3 or newer)\n",
6921  event_get_version());
6922  return false;
6923  }
6924  }
6925  }
6926 
6927  return true;
6928 }
6929 
6930 #ifdef INNODB_MEMCACHED
6931 static
6932 char*
6933 my_strdupl(const char* str, int len)
6934 {
6935  char* s = (char*) malloc(len + 1);
6936  s[len] = 0;
6937  return((char*) memcpy(s, str, len));
6938 }
6939 
6942 static
6943 void
6944 daemon_memcached_make_option(char* option, int* option_argc,
6945  char*** option_argv)
6946 {
6947  static const char* sep = " ";
6948  char* last;
6949  char* opt_str;
6950  char* my_str;
6951  int num_arg = 0;
6952  int i = 1;
6953 
6954  my_str = my_strdupl(option, strlen(option));
6955 
6956  for (opt_str = strtok_r(my_str, sep, &last);
6957  opt_str;
6958  opt_str = strtok_r(NULL, sep, &last)) {
6959  num_arg++;
6960  }
6961 
6962  free(my_str);
6963 
6964  my_str = option;
6965 
6966  *option_argv = (char**) malloc((num_arg + 1)
6967  * sizeof(**option_argv));
6968 
6969  for (opt_str = strtok_r(my_str, sep, &last);
6970  opt_str;
6971  opt_str = strtok_r(NULL, sep, &last)) {
6972  (*option_argv)[i] = my_strdupl(opt_str, strlen(opt_str));
6973  i++;
6974  }
6975 
6976  assert(i == num_arg + 1);
6977 
6978  *option_argc = (num_arg + 1);
6979 
6980  return;
6981 }
6982 
6983 /* Structure that adds the call back functions struture pointers,
6984 passed to InnoDB engine */
6985 typedef struct eng_config_info {
6987  void* cb_ptr;
6988  unsigned int eng_r_batch_size;
6989  unsigned int eng_w_batch_size;
6990  bool enable_binlog;
6992 #endif /* INNODB_MEMCACHED */
6993 
6994 #ifdef INNODB_MEMCACHED
6995 void* daemon_memcached_main(void *p) {
6996 #else
6997 int main (int argc, char **argv) {
6998 #endif
6999  int c;
7000  bool lock_memory = false;
7001  bool do_daemonize = false;
7002  bool preallocate = false;
7003  int maxcore = 0;
7004  char *username = NULL;
7005  char *pid_file = NULL;
7006  struct passwd *pw;
7007  struct rlimit rlim;
7008  char unit = '\0';
7009  int size_max = 0;
7010 
7011  bool protocol_specified = false;
7012  bool tcp_specified = false;
7013  bool udp_specified = false;
7014  memcached_context_t* m_config = (memcached_context_t*)p;
7015  const char *engine;
7016  const char *engine_config = NULL;
7017  char old_options[1024] = { [0] = '\0' };
7018  char *old_opts = old_options;
7019 #ifdef INNODB_MEMCACHED
7020  int option_argc = 0;
7021  char** option_argv = NULL;
7022  eng_config_info_t my_eng_config;
7023 
7024  if (m_config->m_engine_library) {
7025  engine = m_config->m_engine_library;
7026 
7027  /* FIXME: We should have a better way to pass the callback structure
7028  point to storage engine. It is now appended in the configure
7029  string in eng_config_info_t structure */
7030  my_eng_config.cb_ptr = m_config->m_innodb_api_cb;
7031  my_eng_config.eng_r_batch_size = m_config->m_r_batch_size;
7032  my_eng_config.eng_w_batch_size = m_config->m_w_batch_size;
7033  my_eng_config.enable_binlog = m_config->m_enable_binlog;
7034  my_eng_config.option_string = old_opts;
7035  engine_config = (const char *) (&my_eng_config);
7036 
7037  } else {
7038  engine = "default_engine.so";
7039  }
7040 #else
7041  engine = "default_engine.so";
7042 #endif /* INNODB_MEMCACHED */
7043 
7044  memcached_shutdown = 0;
7045 
7046  if (!sanitycheck()) {
7047  return(NULL);
7048  }
7049 
7050  /* make the time we started always be 2 seconds before we really
7051  did, so time(0) - time.started is never zero. if so, things
7052  like 'settings.oldest_live' which act as booleans as well as
7053  values are now false in boolean context... */
7054  process_started = time(0) - 2;
7055  set_current_time();
7056 
7057  /* Initialize the socket subsystem */
7058  initialize_sockets();
7059 
7060  /* init settings */
7061  settings_init();
7062 
7063  if (memcached_initialize_stderr_logger(get_server_api) != EXTENSION_SUCCESS) {
7064  fprintf(stderr, "Failed to initialize log system\n");
7065  return (NULL);
7066  }
7067 
7068  if (m_config->m_mem_option) {
7069  daemon_memcached_make_option(m_config->m_mem_option,
7070  &option_argc,
7071  &option_argv);
7072  }
7073 
7074 #ifdef INNODB_MEMCACHED
7075 
7076  if (option_argc > 0 && option_argv) {
7077  /* Always reset the index to 1, since this function can
7078  be invoked multiple times with install/uninstall plugins */
7079  optind = 1;
7080  while (-1 != (c = getopt(option_argc, option_argv,
7081  "a:" /* access mask for unix socket */
7082  "p:" /* TCP port number to listen on */
7083  "s:" /* unix socket path to listen on */
7084  "U:" /* UDP port number to listen on */
7085  "m:" /* max memory to use for items in megabytes */
7086  "M" /* return error on memory exhausted */
7087  "c:" /* max simultaneous connections */
7088  "k" /* lock down all paged memory */
7089  "hi" /* help, licence info */
7090  "r" /* maximize core file limit */
7091  "v" /* verbose */
7092  "d" /* daemon mode */
7093  "l:" /* interface to listen on */
7094  "u:" /* user identity to run as */
7095  "P:" /* save PID in file */
7096  "f:" /* factor? */
7097  "n:" /* minimum space allocated for key+value+flags */
7098  "t:" /* threads */
7099  "D:" /* prefix delimiter? */
7100  "L" /* Large memory pages */
7101  "R:" /* max requests per event */
7102  "C" /* Disable use of CAS */
7103  "b:" /* backlog queue limit */
7104  "B:" /* Binding protocol */
7105  "I:" /* Max item size */
7106  "S" /* Sasl ON */
7107  "E:" /* Engine to load */
7108  "e:" /* Engine options */
7109  "q" /* Disallow detailed stats */
7110  "X:" /* Load extension */
7111  ))) {
7112  switch (c) {
7113  case 'a':
7114  /* access for unix domain socket, as octal mask (like chmod)*/
7115  settings.access= strtol(optarg,NULL,8);
7116  break;
7117 
7118  case 'U':
7119  settings.udpport = atoi(optarg);
7120  udp_specified = true;
7121  break;
7122  case 'p':
7123  settings.port = atoi(optarg);
7124  tcp_specified = true;
7125  break;
7126  case 's':
7127  settings.socketpath = optarg;
7128  break;
7129  case 'm':
7130  settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
7131  old_opts += sprintf(old_opts, "cache_size=%lu;",
7132  (unsigned long)settings.maxbytes);
7133  break;
7134  case 'M':
7135  settings.evict_to_free = 0;
7136  old_opts += sprintf(old_opts, "eviction=false;");
7137  break;
7138  case 'c':
7139  settings.maxconns = atoi(optarg);
7140  break;
7141  case 'h':
7142  usage();
7143  exit(EXIT_SUCCESS);
7144  case 'i':
7145  usage_license();
7146  exit(EXIT_SUCCESS);
7147  case 'k':
7148  lock_memory = true;
7149  break;
7150  case 'v':
7151  settings.verbose++;
7152  perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
7153  break;
7154  case 'l':
7155  settings.inter= strdup(optarg);
7156  break;
7157  case 'd':
7158  do_daemonize = true;
7159  break;
7160  case 'r':
7161  maxcore = 1;
7162  break;
7163  case 'R':
7164  settings.reqs_per_event = atoi(optarg);
7165  if (settings.reqs_per_event <= 0) {
7166  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7167  "Number of requests per event must be greater than 0\n");
7168  return (void*)1;
7169  }
7170  break;
7171  case 'u':
7172  username = optarg;
7173  break;
7174  case 'P':
7175  pid_file = optarg;
7176  break;
7177  case 'f':
7178  settings.factor = atof(optarg);
7179  if (settings.factor <= 1.0) {
7180  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7181  "Factor must be greater than 1\n");
7182  return (void*)1;
7183  }
7184  old_opts += sprintf(old_opts, "factor=%f;",
7185  settings.factor);
7186  break;
7187  case 'n':
7188  settings.chunk_size = atoi(optarg);
7189  if (settings.chunk_size == 0) {
7190  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7191  "Chunk size must be greater than 0\n");
7192  return (void*)1;
7193  }
7194  old_opts += sprintf(old_opts, "chunk_size=%u;",
7195  settings.chunk_size);
7196  break;
7197  case 't':
7198  settings.num_threads = atoi(optarg);
7199  if (settings.num_threads <= 0) {
7200  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7201  "Number of threads must be greater than 0\n");
7202  return (void*)1;
7203  }
7204  /* There're other problems when you get above 64 threads.
7205  * In the future we should portably detect # of cores for the
7206  * default.
7207  */
7208  if (settings.num_threads > 64) {
7209  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7210  "WARNING: Setting a high number of worker"
7211  "threads is not recommended.\n"
7212  " Set this value to the number of cores in"
7213  " your machine or less.\n");
7214  }
7215  break;
7216  case 'D':
7217  settings.prefix_delimiter = optarg[0];
7218  settings.detail_enabled = 1;
7219  break;
7220  case 'L' :
7221  if (enable_large_pages() == 0) {
7222  preallocate = true;
7223  old_opts += sprintf(old_opts, "preallocate=true;");
7224  }
7225  break;
7226  case 'C' :
7227  settings.use_cas = false;
7228  break;
7229  case 'b' :
7230  settings.backlog = atoi(optarg);
7231  break;
7232  case 'B':
7233  protocol_specified = true;
7234  if (strcmp(optarg, "auto") == 0) {
7235  settings.binding_protocol = negotiating_prot;
7236  } else if (strcmp(optarg, "binary") == 0) {
7237  settings.binding_protocol = binary_prot;
7238  } else if (strcmp(optarg, "ascii") == 0) {
7239  settings.binding_protocol = ascii_prot;
7240  } else {
7241  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7242  "Invalid value for binding protocol: %s\n"
7243  " -- should be one of auto, binary, or ascii\n", optarg);
7244  exit(EX_USAGE);
7245  }
7246  break;
7247  case 'I':
7248  unit = optarg[strlen(optarg)-1];
7249  if (unit == 'k' || unit == 'm' ||
7250  unit == 'K' || unit == 'M') {
7251  optarg[strlen(optarg)-1] = '\0';
7252  size_max = atoi(optarg);
7253  if (unit == 'k' || unit == 'K')
7254  size_max *= 1024;
7255  if (unit == 'm' || unit == 'M')
7256  size_max *= 1024 * 1024;
7257  settings.item_size_max = size_max;
7258  } else {
7259  settings.item_size_max = atoi(optarg);
7260  }
7261  if (settings.item_size_max < 1024) {
7262  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7263  "Item max size cannot be less than 1024 bytes.\n");
7264  return (void*)1;
7265  }
7266  if (settings.item_size_max > 1024 * 1024 * 128) {
7267  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7268  "Cannot set item size limit higher than 128 mb.\n");
7269  return (void*)1;
7270  }
7271  if (settings.item_size_max > 1024 * 1024) {
7272  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7273  "WARNING: Setting item max size above 1MB is not"
7274  " recommended!\n"
7275  " Raising this limit increases the minimum memory requirements\n"
7276  " and will decrease your memory efficiency.\n"
7277  );
7278  }
7279 #ifndef __WIN32__
7280  old_opts += sprintf(old_opts, "item_size_max=%zu;",
7281  settings.item_size_max);
7282 #else
7283  old_opts += sprintf(old_opts, "item_size_max=%lu;", (long unsigned)
7284  settings.item_size_max);
7285 #endif
7286  break;
7287  case 'E':
7288  engine = optarg;
7289  break;
7290  case 'e':
7291  /* FIXME, we use engine_config to pass callback function
7292  for now. Will need a better solution
7293  engine_config = optarg; */
7294  break;
7295  case 'q':
7296  settings.allow_detailed = false;
7297  break;
7298  case 'S': /* set Sasl authentication to true. Default is false */
7299 # ifdef ENABLE_MEMCACHED_SASL
7300 # ifndef SASL_ENABLED
7301  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7302  "This server is not built with SASL support.\n");
7303  exit(EX_USAGE);
7304 # endif /* !SASL_ENABLED */
7305  settings.require_sasl = true;
7306 # endif /* ENABLE_MEMCACHED_SASL */
7307  break;
7308  case 'X' :
7309  {
7310  char *ptr = strchr(optarg, ',');
7311  if (ptr != NULL) {
7312  *ptr = '\0';
7313  ++ptr;
7314  }
7315  if (!load_extension(optarg, ptr)) {
7316  exit(EXIT_FAILURE);
7317  }
7318  if (ptr != NULL) {
7319  *(ptr - 1) = ',';
7320  }
7321  }
7322  break;
7323  default:
7324  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7325  "Illegal argument \"%c\"\n", c);
7326  return (void*)1;
7327  }
7328  }
7329 
7330  free(option_argv);
7331  }
7332 #else
7333  /* process arguments */
7334  while (-1 != (c = getopt(argc, argv,
7335  "a:" /* access mask for unix socket */
7336  "p:" /* TCP port number to listen on */
7337  "s:" /* unix socket path to listen on */
7338  "U:" /* UDP port number to listen on */
7339  "m:" /* max memory to use for items in megabytes */
7340  "M" /* return error on memory exhausted */
7341  "c:" /* max simultaneous connections */
7342  "k" /* lock down all paged memory */
7343  "hi" /* help, licence info */
7344  "r" /* maximize core file limit */
7345  "v" /* verbose */
7346  "d" /* daemon mode */
7347  "l:" /* interface to listen on */
7348  "u:" /* user identity to run as */
7349  "P:" /* save PID in file */
7350  "f:" /* factor? */
7351  "n:" /* minimum space allocated for key+value+flags */
7352  "t:" /* threads */
7353  "D:" /* prefix delimiter? */
7354  "L" /* Large memory pages */
7355  "R:" /* max requests per event */
7356  "C" /* Disable use of CAS */
7357  "b:" /* backlog queue limit */
7358  "B:" /* Binding protocol */
7359  "I:" /* Max item size */
7360  "S" /* Sasl ON */
7361  "E:" /* Engine to load */
7362  "e:" /* Engine options */
7363  "q" /* Disallow detailed stats */
7364  "X:" /* Load extension */
7365  ))) {
7366  switch (c) {
7367  case 'a':
7368  /* access for unix domain socket, as octal mask (like chmod)*/
7369  settings.access= strtol(optarg,NULL,8);
7370  break;
7371 
7372  case 'U':
7373  settings.udpport = atoi(optarg);
7374  udp_specified = true;
7375  break;
7376  case 'p':
7377  settings.port = atoi(optarg);
7378  tcp_specified = true;
7379  break;
7380  case 's':
7381  settings.socketpath = optarg;
7382  break;
7383  case 'm':
7384  settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
7385  old_opts += sprintf(old_opts, "cache_size=%lu;",
7386  (unsigned long)settings.maxbytes);
7387  break;
7388  case 'M':
7389  settings.evict_to_free = 0;
7390  old_opts += sprintf(old_opts, "eviction=false;");
7391  break;
7392  case 'c':
7393  settings.maxconns = atoi(optarg);
7394  break;
7395  case 'h':
7396  usage();
7397  exit(EXIT_SUCCESS);
7398  case 'i':
7399  usage_license();
7400  exit(EXIT_SUCCESS);
7401  case 'k':
7402  lock_memory = true;
7403  break;
7404  case 'v':
7405  settings.verbose++;
7406  perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
7407  break;
7408  case 'l':
7409  if (settings.inter != NULL) {
7410  size_t len = strlen(settings.inter) + strlen(optarg) + 2;
7411  char *p = malloc(len);
7412  if (p == NULL) {
7413  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7414  "Failed to allocate memory\n");
7415  return 1;
7416  }
7417  snprintf(p, len, "%s,%s", settings.inter, optarg);
7418  free(settings.inter);
7419  settings.inter = p;
7420  } else {
7421  settings.inter= strdup(optarg);
7422  }
7423  break;
7424  case 'd':
7425  do_daemonize = true;
7426  break;
7427  case 'r':
7428  maxcore = 1;
7429  break;
7430  case 'R':
7431  settings.reqs_per_event = atoi(optarg);
7432  if (settings.reqs_per_event <= 0) {
7433  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7434  "Number of requests per event must be greater than 0\n");
7435  return 1;
7436  }
7437  break;
7438  case 'u':
7439  username = optarg;
7440  break;
7441  case 'P':
7442  pid_file = optarg;
7443  break;
7444  case 'f':
7445  settings.factor = atof(optarg);
7446  if (settings.factor <= 1.0) {
7447  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7448  "Factor must be greater than 1\n");
7449  return 1;
7450  }
7451  old_opts += sprintf(old_opts, "factor=%f;",
7452  settings.factor);
7453  break;
7454  case 'n':
7455  settings.chunk_size = atoi(optarg);
7456  if (settings.chunk_size == 0) {
7457  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7458  "Chunk size must be greater than 0\n");
7459  return 1;
7460  }
7461  old_opts += sprintf(old_opts, "chunk_size=%u;",
7462  settings.chunk_size);
7463  break;
7464  case 't':
7465  settings.num_threads = atoi(optarg);
7466  if (settings.num_threads <= 0) {
7467  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7468  "Number of threads must be greater than 0\n");
7469  return 1;
7470  }
7471  /* There're other problems when you get above 64 threads.
7472  * In the future we should portably detect # of cores for the
7473  * default.
7474  */
7475  if (settings.num_threads > 64) {
7476  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7477  "WARNING: Setting a high number of worker"
7478  "threads is not recommended.\n"
7479  " Set this value to the number of cores in"
7480  " your machine or less.\n");
7481  }
7482  break;
7483  case 'D':
7484  settings.prefix_delimiter = optarg[0];
7485  settings.detail_enabled = 1;
7486  break;
7487  case 'L' :
7488  if (enable_large_pages() == 0) {
7489  preallocate = true;
7490  old_opts += sprintf(old_opts, "preallocate=true;");
7491  }
7492  break;
7493  case 'C' :
7494  settings.use_cas = false;
7495  break;
7496  case 'b' :
7497  settings.backlog = atoi(optarg);
7498  break;
7499  case 'B':
7500  protocol_specified = true;
7501  if (strcmp(optarg, "auto") == 0) {
7502  settings.binding_protocol = negotiating_prot;
7503  } else if (strcmp(optarg, "binary") == 0) {
7504  settings.binding_protocol = binary_prot;
7505  } else if (strcmp(optarg, "ascii") == 0) {
7506  settings.binding_protocol = ascii_prot;
7507  } else {
7508  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7509  "Invalid value for binding protocol: %s\n"
7510  " -- should be one of auto, binary, or ascii\n", optarg);
7511  exit(EX_USAGE);
7512  }
7513  break;
7514  case 'I':
7515  unit = optarg[strlen(optarg)-1];
7516  if (unit == 'k' || unit == 'm' ||
7517  unit == 'K' || unit == 'M') {
7518  optarg[strlen(optarg)-1] = '\0';
7519  size_max = atoi(optarg);
7520  if (unit == 'k' || unit == 'K')
7521  size_max *= 1024;
7522  if (unit == 'm' || unit == 'M')
7523  size_max *= 1024 * 1024;
7524  settings.item_size_max = size_max;
7525  } else {
7526  settings.item_size_max = atoi(optarg);
7527  }
7528  if (settings.item_size_max < 1024) {
7529  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7530  "Item max size cannot be less than 1024 bytes.\n");
7531  return 1;
7532  }
7533  if (settings.item_size_max > 1024 * 1024 * 128) {
7534  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7535  "Cannot set item size limit higher than 128 mb.\n");
7536  return 1;
7537  }
7538  if (settings.item_size_max > 1024 * 1024) {
7539  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7540  "WARNING: Setting item max size above 1MB is not"
7541  " recommended!\n"
7542  " Raising this limit increases the minimum memory requirements\n"
7543  " and will decrease your memory efficiency.\n"
7544  );
7545  }
7546 #ifndef __WIN32__
7547  old_opts += sprintf(old_opts, "item_size_max=%zu;",
7548  settings.item_size_max);
7549 #else
7550  old_opts += sprintf(old_opts, "item_size_max=%lu;", (long unsigned)
7551  settings.item_size_max);
7552 #endif
7553  break;
7554  case 'E':
7555  engine = optarg;
7556  break;
7557  case 'e':
7558  engine_config = optarg;
7559  break;
7560  case 'q':
7561  settings.allow_detailed = false;
7562  break;
7563  case 'S': /* set Sasl authentication to true. Default is false */
7564 #ifndef SASL_ENABLED
7565  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7566  "This server is not built with SASL support.\n");
7567  exit(EX_USAGE);
7568 #endif
7569  settings.require_sasl = true;
7570  break;
7571  case 'X' :
7572  {
7573  char *ptr = strchr(optarg, ',');
7574  if (ptr != NULL) {
7575  *ptr = '\0';
7576  ++ptr;
7577  }
7578  if (!load_extension(optarg, ptr)) {
7579  exit(EXIT_FAILURE);
7580  }
7581  if (ptr != NULL) {
7582  *(ptr - 1) = ',';
7583  }
7584  }
7585  break;
7586  default:
7587  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7588  "Illegal argument \"%c\"\n", c);
7589  return 1;
7590  }
7591  }
7592 #endif /* INNODB_MEMCACHED */
7593 
7594  if (getenv("MEMCACHED_REQS_TAP_EVENT") != NULL) {
7595  settings.reqs_per_tap_event = atoi(getenv("MEMCACHED_REQS_TAP_EVENT"));
7596  }
7597 
7598  if (settings.reqs_per_tap_event <= 0) {
7599  settings.reqs_per_tap_event = DEFAULT_REQS_PER_TAP_EVENT;
7600  }
7601 
7602 
7603  if (install_sigterm_handler() != 0) {
7604  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7605  "Failed to install SIGTERM handler\n");
7606  exit(EXIT_FAILURE);
7607  }
7608 
7609  char *topkeys_env = getenv("MEMCACHED_TOP_KEYS");
7610  if (topkeys_env != NULL) {
7611  settings.topkeys = atoi(topkeys_env);
7612  if (settings.topkeys < 0) {
7613  settings.topkeys = 0;
7614  }
7615  }
7616 
7617  if (settings.require_sasl) {
7618  if (!protocol_specified) {
7619  settings.binding_protocol = binary_prot;
7620  } else {
7621  if (settings.binding_protocol == negotiating_prot) {
7622  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7623  "ERROR: You cannot use auto-negotiating protocol while requiring SASL.\n");
7624  exit(EX_USAGE);
7625  }
7626  if (settings.binding_protocol == ascii_prot) {
7627  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7628  "ERROR: You cannot use only ASCII protocol while requiring SASL.\n");
7629  exit(EX_USAGE);
7630  }
7631  }
7632  }
7633 
7634  if (tcp_specified && !udp_specified) {
7635  settings.udpport = settings.port;
7636  } else if (udp_specified && !tcp_specified) {
7637  settings.port = settings.udpport;
7638  }
7639 
7640  /*
7641  if (engine_config != NULL && strlen(old_options) > 0) {
7642  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7643  "ERROR: You can't mix -e with the old options\n");
7644  return (NULL);
7645  } else if (engine_config == NULL && strlen(old_options) > 0) {
7646  engine_config = old_options;
7647  } */
7648 
7649  if (maxcore != 0) {
7650  struct rlimit rlim_new;
7651  /*
7652  * First try raising to infinity; if that fails, try bringing
7653  * the soft limit to the hard.
7654  */
7655  if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
7656  rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
7657  if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
7658  /* failed. try raising just to the old max */
7659  rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
7660  (void)setrlimit(RLIMIT_CORE, &rlim_new);
7661  }
7662  }
7663  /*
7664  * getrlimit again to see what we ended up with. Only fail if
7665  * the soft limit ends up 0, because then no core files will be
7666  * created at all.
7667  */
7668 
7669  if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
7670  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7671  "failed to ensure corefile creation\n");
7672  exit(EX_OSERR);
7673  }
7674  }
7675 
7676  /*
7677  * If needed, increase rlimits to allow as many connections
7678  * as needed.
7679  */
7680 
7681  if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7682  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7683  "failed to getrlimit number of files\n");
7684  exit(EX_OSERR);
7685  } else {
7686  int maxfiles = settings.maxconns;
7687  if (rlim.rlim_cur < maxfiles)
7688  rlim.rlim_cur = maxfiles;
7689  if (rlim.rlim_max < rlim.rlim_cur)
7690  rlim.rlim_max = rlim.rlim_cur;
7691  if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7692  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7693  "failed to set rlimit for open files. Try running as"
7694  " root or requesting smaller maxconns value.\n");
7695  exit(EX_OSERR);
7696  }
7697  }
7698 
7699  /* Sanity check for the connection structures */
7700  int nfiles = 0;
7701  if (settings.port != 0) {
7702  nfiles += 2;
7703  }
7704  if (settings.udpport != 0) {
7705  nfiles += settings.num_threads * 2;
7706  }
7707 
7708  if (settings.maxconns <= nfiles) {
7709  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7710  "Configuratioin error. \n"
7711  "You specified %d connections, but the system will use at "
7712  "least %d\nconnection structures to start.\n",
7713  settings.maxconns, nfiles);
7714  exit(EX_USAGE);
7715  }
7716 
7717  /* lose root privileges if we have them */
7718  if (getuid() == 0 || geteuid() == 0) {
7719  if (username == 0 || *username == '\0') {
7720  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7721  "can't run as root without the -u switch\n");
7722  exit(EX_USAGE);
7723  }
7724  if ((pw = getpwnam(username)) == 0) {
7725  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7726  "can't find the user %s to switch to\n", username);
7727  exit(EX_NOUSER);
7728  }
7729  if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
7730  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7731  "failed to assume identity of user %s: %s\n", username,
7732  strerror(errno));
7733  exit(EX_OSERR);
7734  }
7735  }
7736 
7737 #ifdef SASL_ENABLED
7738  init_sasl();
7739 #endif /* SASL */
7740 
7741  /* daemonize if requested */
7742  /* if we want to ensure our ability to dump core, don't chdir to / */
7743  if (do_daemonize) {
7744  if (sigignore(SIGHUP) == -1) {
7745  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7746  "Failed to ignore SIGHUP: ", strerror(errno));
7747  }
7748  if (daemonize(maxcore, settings.verbose) == -1) {
7749  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7750  "failed to daemon() in order to daemonize\n");
7751  exit(EXIT_FAILURE);
7752  }
7753  }
7754 
7755  /* lock paged memory if needed */
7756  if (lock_memory) {
7757 #ifdef HAVE_MLOCKALL
7758  int res = mlockall(MCL_CURRENT | MCL_FUTURE);
7759  if (res != 0) {
7760  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7761  "warning: -k invalid, mlockall() failed: %s\n",
7762  strerror(errno));
7763  }
7764 #else
7765  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7766  "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
7767 #endif
7768  }
7769 
7770  /* initialize main thread libevent instance */
7771  main_base = event_init();
7772 
7773  /* Load the storage engine */
7774  ENGINE_HANDLE *engine_handle = NULL;
7775  if (!load_engine(engine,get_server_api,settings.extensions.logger,&engine_handle)) {
7776  /* Error already reported */
7777  exit(EXIT_FAILURE);
7778  }
7779 
7780  if(!init_engine(engine_handle,engine_config,settings.extensions.logger)) {
7781 #ifdef INNODB_MEMCACHED
7782  shutdown_server();
7783  goto func_exit;
7784 #else
7785  return(false);
7786 #endif /* INNODB_MEMCACHED */
7787  }
7788 
7789  if(settings.verbose > 0) {
7790  log_engine_details(engine_handle,settings.extensions.logger);
7791  }
7792  settings.engine.v1 = (ENGINE_HANDLE_V1 *) engine_handle;
7793 
7794  if (settings.engine.v1->arithmetic == NULL) {
7795  settings.engine.v1->arithmetic = internal_arithmetic;
7796  }
7797 
7798  /* initialize other stuff */
7799  stats_init();
7800 
7801  if (!(conn_cache = cache_create("conn", sizeof(conn), sizeof(void*),
7802  conn_constructor, conn_destructor))) {
7803  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7804  "Failed to create connection cache\n");
7805  exit(EXIT_FAILURE);
7806  }
7807 
7808  default_independent_stats = new_independent_stats();
7809 
7810 #ifndef __WIN32__
7811  /*
7812  * ignore SIGPIPE signals; we can use errno == EPIPE if we
7813  * need that information
7814  */
7815  if (sigignore(SIGPIPE) == -1) {
7816  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7817  "failed to ignore SIGPIPE; sigaction");
7818  exit(EX_OSERR);
7819  }
7820 #endif
7821 
7822  /* start up worker threads if MT mode */
7823  thread_init(settings.num_threads, main_base, dispatch_event_handler);
7824 
7825  /* initialise clock event */
7826  clock_handler(0, 0, 0);
7827 
7828  /* create unix mode sockets after dropping privileges */
7829  if (settings.socketpath != NULL) {
7830  if (server_socket_unix(settings.socketpath,settings.access)) {
7831  vperror("failed to listen on UNIX socket: %s", settings.socketpath);
7832  exit(EX_OSERR);
7833  }
7834  }
7835 
7836  /* create the listening socket, bind it, and init */
7837  if (settings.socketpath == NULL) {
7838  int udp_port;
7839 
7840  const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
7841  char temp_portnumber_filename[PATH_MAX];
7842  FILE *portnumber_file = NULL;
7843 
7844  if (portnumber_filename != NULL) {
7845  snprintf(temp_portnumber_filename,
7846  sizeof(temp_portnumber_filename),
7847  "%s.lck", portnumber_filename);
7848 
7849  portnumber_file = fopen(temp_portnumber_filename, "a");
7850  if (portnumber_file == NULL) {
7851  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7852  "Failed to open \"%s\": %s\n",
7853  temp_portnumber_filename, strerror(errno));
7854  }
7855  }
7856 
7857  if (settings.port && server_sockets(settings.port, tcp_transport,
7858  portnumber_file)) {
7859  vperror("failed to listen on TCP port %d", settings.port);
7860 #ifdef INNODB_MEMCACHED
7861  shutdown_server();
7862  goto func_exit;
7863 #else
7864  exit(EX_OSERR);
7865 #endif /* INNODB_MEMCACHED */
7866  }
7867 
7868  /*
7869  * initialization order: first create the listening sockets
7870  * (may need root on low ports), then drop root if needed,
7871  * then daemonise if needed, then init libevent (in some cases
7872  * descriptors created by libevent wouldn't survive forking).
7873  */
7874  udp_port = settings.udpport ? settings.udpport : settings.port;
7875 
7876  /* create the UDP listening socket and bind it */
7877  if (settings.udpport && server_sockets(settings.udpport, udp_transport,
7878  portnumber_file)) {
7879  vperror("failed to listen on UDP port %d", settings.udpport);
7880  exit(EX_OSERR);
7881  }
7882 
7883  if (portnumber_file) {
7884  fclose(portnumber_file);
7885  rename(temp_portnumber_filename, portnumber_filename);
7886  }
7887  }
7888 
7889  if (pid_file != NULL) {
7890  save_pid(pid_file);
7891  }
7892 
7893  /* Drop privileges no longer needed */
7894  drop_privileges();
7895 
7896  /* enter the event loop */
7897  event_base_loop(main_base, 0);
7898 
7899  if (settings.verbose) {
7900  settings.extensions.logger->log(EXTENSION_LOG_INFO, NULL,
7901  "Initiating shutdown\n");
7902  }
7903 
7904 func_exit:
7905 
7906  if (settings.engine.v1)
7907  settings.engine.v1->destroy(settings.engine.v0, false);
7908 
7909  threads_shutdown();
7910 
7911  /* remove the PID file if we're a daemon */
7912  if (do_daemonize)
7913  remove_pidfile(pid_file);
7914  /* Clean up strdup() call for bind() address */
7915  if (settings.inter)
7916  free(settings.inter);
7917 
7918  memcached_shutdown = 2;
7919 
7920  return EXIT_SUCCESS;
7921 }