17 #include "config_static.h" 
   19 #include "memcached/extension_loggers.h" 
   20 #include "utilities/engine_loader.h" 
   38 #define INNODB_MEMCACHED 
   40 static inline void item_set_cas(
const void *cookie, item *it, uint64_t cas) {
 
   45 #define SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \ 
   46     thread_stats->slab_stats[info.clsid].slab_op++; 
   48 #define THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \ 
   49     thread_stats->thread_op++; 
   51 #define THREAD_GUTS2(conn, thread_stats, slab_op, thread_op) \ 
   52     thread_stats->slab_op++; \ 
   53     thread_stats->thread_op++; 
   55 #define SLAB_THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \ 
   56     SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \ 
   57     THREAD_GUTS(conn, thread_stats, slab_op, thread_op) 
   59 #define STATS_INCR1(GUTS, conn, slab_op, thread_op, key, nkey) { \ 
   60     struct independent_stats *independent_stats = get_independent_stats(conn); \ 
   61     struct thread_stats *thread_stats = \ 
   62         &independent_stats->thread_stats[conn->thread->index]; \ 
   63     topkeys_t *topkeys = independent_stats->topkeys; \ 
   64     pthread_mutex_lock(&thread_stats->mutex); \ 
   65     GUTS(conn, thread_stats, slab_op, thread_op); \ 
   66     pthread_mutex_unlock(&thread_stats->mutex); \ 
   67     TK(topkeys, slab_op, key, nkey, current_time); \ 
   70 #define STATS_INCR(conn, op, key, nkey) \ 
   71     STATS_INCR1(THREAD_GUTS, conn, op, op, key, nkey) 
   73 #define SLAB_INCR(conn, op, key, nkey) \ 
   74     STATS_INCR1(SLAB_GUTS, conn, op, op, key, nkey) 
   76 #define STATS_TWO(conn, slab_op, thread_op, key, nkey) \ 
   77     STATS_INCR1(THREAD_GUTS2, conn, slab_op, thread_op, key, nkey) 
   79 #define SLAB_TWO(conn, slab_op, thread_op, key, nkey) \ 
   80     STATS_INCR1(SLAB_THREAD_GUTS, conn, slab_op, thread_op, key, nkey) 
   82 #define STATS_HIT(conn, op, key, nkey) \ 
   83     SLAB_TWO(conn, op##_hits, cmd_##op, key, nkey) 
   85 #define STATS_MISS(conn, op, key, nkey) \ 
   86     STATS_TWO(conn, op##_misses, cmd_##op, key, nkey) 
   88 #define STATS_NOKEY(conn, op) { \ 
   89     struct thread_stats *thread_stats = \ 
   90         get_thread_stats(conn); \ 
   91     pthread_mutex_lock(&thread_stats->mutex); \ 
   93     pthread_mutex_unlock(&thread_stats->mutex); \ 
   96 #define STATS_NOKEY2(conn, op1, op2) { \ 
   97     struct thread_stats *thread_stats = \ 
   98         get_thread_stats(conn); \ 
   99     pthread_mutex_lock(&thread_stats->mutex); \ 
  100     thread_stats->op1++; \ 
  101     thread_stats->op2++; \ 
  102     pthread_mutex_unlock(&thread_stats->mutex); \ 
  105 #define STATS_ADD(conn, op, amt) { \ 
  106     struct thread_stats *thread_stats = \ 
  107         get_thread_stats(conn); \ 
  108     pthread_mutex_lock(&thread_stats->mutex); \ 
  109     thread_stats->op += amt; \ 
  110     pthread_mutex_unlock(&thread_stats->mutex); \ 
  113 volatile sig_atomic_t memcached_shutdown;
 
  123 volatile rel_time_t current_time;
 
  128 static SOCKET new_socket(
struct addrinfo *ai);
 
  129 static int try_read_command(
conn *c);
 
  133                               ENGINE_EVENT_TYPE 
type,
 
  134                               EVENT_CALLBACK cb, 
const void *cb_data);
 
  135 enum try_read_result {
 
  137     READ_NO_DATA_RECEIVED,
 
  142 static enum try_read_result try_read_network(
conn *c);
 
  143 static enum try_read_result try_read_udp(
conn *c);
 
  146 static void stats_init(
void);
 
  147 static void server_stats(
ADD_STAT add_stats, 
conn *c, 
bool aggregate);
 
  148 static void process_stat_settings(
ADD_STAT add_stats, 
void *c);
 
  152 static void settings_init(
void);
 
  155 static void event_handler(
const int fd, 
const short which, 
void *arg);
 
  156 static void complete_nread(
conn *c);
 
  157 static char *process_command(
conn *c, 
char *command);
 
  158 static void write_and_free(
conn *c, 
char *
buf, 
int bytes);
 
  159 static int ensure_iov_space(
conn *c);
 
  160 static int add_iov(
conn *c, 
const void *
buf, 
int len);
 
  161 static int add_msghdr(
conn *c);
 
  165 static void set_current_time(
void);  
 
  172 static time_t process_started;     
 
  175 static conn *listen_conn = NULL;
 
  176 static int  udp_socket[100];
 
  177 static int  num_udp_socket;
 
  183 enum transmit_result {
 
  190 static enum transmit_result transmit(
conn *c);
 
  192 #define REALTIME_MAXDELTA 60*60*24*30 
  195 static void perform_callbacks(ENGINE_EVENT_TYPE 
type,
 
  200         h->cb(c, type, data, h->cb_data);
 
  209 static rel_time_t realtime(
const time_t exptime) {
 
  212     if (exptime == 0) 
return 0; 
 
  214     if (exptime > REALTIME_MAXDELTA) {
 
  221         if (exptime <= process_started)
 
  222             return (rel_time_t)1;
 
  223         return (rel_time_t)(exptime - process_started);
 
  225         return (rel_time_t)(exptime + current_time);
 
  232 static time_t abstime(
const rel_time_t exptime)
 
  234     return process_started + exptime;
 
  237 static void stats_init(
void) {
 
  238     stats.daemon_conns = 0;
 
  239     stats.rejected_conns = 0;
 
  245 static void stats_reset(
const void *cookie) {
 
  246     struct conn *
conn = (
struct conn*)cookie;
 
  248     stats.rejected_conns = 0;
 
  249     stats.total_conns = 0;
 
  250     stats_prefix_clear();
 
  252     threadlocal_stats_reset(get_independent_stats(conn)->
thread_stats);
 
  256 static void settings_init(
void) {
 
  263     settings.maxbytes = 64 * 1024 * 1024; 
 
  276     settings.reqs_per_event = DEFAULT_REQS_PER_EVENT;
 
  278     settings.binding_protocol = negotiating_prot;
 
  279     settings.item_size_max = 1024 * 1024; 
 
  282     settings.extensions.logger = get_stderr_logger();
 
  290 static int add_msghdr(conn *c)
 
  296     if (c->msgsize == c->msgused) {
 
  297         msg = realloc(c->msglist, c->msgsize * 2 * 
sizeof(
struct msghdr));
 
  304     msg = c->msglist + c->msgused;
 
  308     memset(msg, 0, 
sizeof(
struct msghdr));
 
  310     msg->msg_iov = &c->iov[c->iovused];
 
  312     if (c->request_addr_size > 0) {
 
  313         msg->msg_name = &c->request_addr;
 
  314         msg->msg_namelen = c->request_addr_size;
 
  320     if (IS_UDP(c->transport)) {
 
  322         return add_iov(c, NULL, UDP_HEADER_SIZE);
 
  328 static const char *prot_text(
enum protocol prot) {
 
  329     char *rv = 
"unknown";
 
  337         case negotiating_prot:
 
  338             rv = 
"auto-negotiate";
 
  345     pthread_mutex_t mutex;
 
  348     uint64_t num_disable;
 
  351 static bool is_listen_disabled(
void) {
 
  353     pthread_mutex_lock(&listen_state.mutex);
 
  354     ret = listen_state.disabled;
 
  355     pthread_mutex_unlock(&listen_state.mutex);
 
  359 static uint64_t get_listen_disabled_num(
void) {
 
  361     pthread_mutex_lock(&listen_state.mutex);
 
  362     ret = listen_state.num_disable;
 
  363     pthread_mutex_unlock(&listen_state.mutex);
 
  367 static void disable_listen(
void) {
 
  368     pthread_mutex_lock(&listen_state.mutex);
 
  369     listen_state.disabled = 
true;
 
  370     listen_state.count = 10;
 
  371     ++listen_state.num_disable;
 
  372     pthread_mutex_unlock(&listen_state.mutex);
 
  375     for (next = listen_conn; next; next = next->next) {
 
  376         update_event(next, 0);
 
  377         if (listen(next->sfd, 1) != 0) {
 
  378             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
  385 void safe_close(SOCKET sfd) {
 
  386     if (sfd != INVALID_SOCKET) {
 
  388         while ((rval = closesocket(sfd)) == SOCKET_ERROR &&
 
  389                (errno == EINTR || errno == EAGAIN)) {
 
  393         if (rval == SOCKET_ERROR) {
 
  394             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
  395                                             "Failed to close socket %d (%s)!!\n", (
int)sfd,
 
  402             if (is_listen_disabled()) {
 
  426 static bool conn_reset_buffersize(conn *c) {
 
  429     if (c->
rsize != DATA_BUFFER_SIZE) {
 
  430         void *ptr = malloc(DATA_BUFFER_SIZE);
 
  434             c->
rsize = DATA_BUFFER_SIZE;
 
  440     if (c->wsize != DATA_BUFFER_SIZE) {
 
  441         void *ptr = malloc(DATA_BUFFER_SIZE);
 
  445             c->wsize = DATA_BUFFER_SIZE;
 
  507 static int conn_constructor(
void *buffer, 
void *unused1, 
int unused2) {
 
  508     (void)unused1; (void)unused2;
 
  511     memset(c, 0, 
sizeof(*c));
 
  512     MEMCACHED_CONN_CREATE(c);
 
  514     if (!conn_reset_buffersize(c)) {
 
  521         settings.extensions.logger->
log(EXTENSION_LOG_WARNING,
 
  523                                         "Failed to allocate buffers for connection\n");
 
  528     stats.conn_structs++;
 
  540 static void conn_destructor(
void *buffer, 
void *unused) {
 
  551     stats.conn_structs--;
 
  555 conn *conn_new(
const SOCKET sfd, STATE_FUNC init_state,
 
  556                const int event_flags,
 
  557                const int read_buffer_size, 
enum network_transport transport,
 
  559     conn *c = cache_alloc(conn_cache);
 
  564     assert(c->thread == NULL);
 
  566     if (c->
rsize < read_buffer_size) {
 
  567         void *mem = malloc(read_buffer_size);
 
  569             c->
rsize = read_buffer_size;
 
  573             assert(c->thread == NULL);
 
  574             cache_free(conn_cache, c);
 
  579     c->transport = transport;
 
  580     c->protocol = 
settings.binding_protocol;
 
  586         c->request_addr_size = 
sizeof(c->request_addr);
 
  588         c->request_addr_size = 0;
 
  592         if (init_state == conn_listening) {
 
  593             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
  594                                             "<%d server listening (%s)\n", sfd,
 
  595                                             prot_text(c->protocol));
 
  596         } 
else if (IS_UDP(transport)) {
 
  597             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
  598                                             "<%d server listening (udp)\n", sfd);
 
  599         } 
else if (c->protocol == negotiating_prot) {
 
  600             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
  601                                             "<%d new auto-negotiating client connection\n",
 
  603         } 
else if (c->protocol == ascii_prot) {
 
  604             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
  605                                             "<%d new ascii client connection.\n", sfd);
 
  606         } 
else if (c->protocol == binary_prot) {
 
  607             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
  608                                             "<%d new binary client connection.\n", sfd);
 
  610             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
  611                                             "<%d new unknown (%d) client connection\n",
 
  618     c->state = init_state;
 
  622     c->
rbytes = c->wbytes = 0;
 
  627     c->suffixcurr = c->suffixlist;
 
  637     c->write_and_free = 0;
 
  642     event_set(&c->event, sfd, event_flags, event_handler, (
void *)c);
 
  643     event_base_set(base, &c->event);
 
  644     c->ev_flags = event_flags;
 
  646     if (!register_event(c, timeout)) {
 
  647         assert(c->thread == NULL);
 
  648         cache_free(conn_cache, c);
 
  656     c->aiostat = ENGINE_SUCCESS;
 
  657     c->ewouldblock = 
false;
 
  660     MEMCACHED_CONN_ALLOCATE(c->sfd);
 
  662     perform_callbacks(ON_CONNECT, NULL, c);
 
  667 static void conn_cleanup(conn *c) {
 
  676         for (; c->ileft > 0; c->ileft--,c->icurr++) {
 
  681     if (c->suffixleft != 0) {
 
  682         for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
 
  683             cache_free(c->thread->suffix_cache, *(c->suffixcurr));
 
  687     if (c->write_and_free) {
 
  688         free(c->write_and_free);
 
  689         c->write_and_free = 0;
 
  693         sasl_dispose(&c->sasl_conn);
 
  697     if (c->engine_storage) {
 
  702     c->engine_storage = NULL;
 
  703     c->tap_iterator = NULL;
 
  705     assert(c->next == NULL);
 
  707     c->sfd = INVALID_SOCKET;
 
  708     c->tap_nack_mode = 
false;
 
  711 void conn_close(conn *c) {
 
  713     assert(c->sfd == INVALID_SOCKET);
 
  720     LOCK_THREAD(c->thread);
 
  722     if (
settings.verbose > 1 && list_contains(c->thread->pending_io, c)) {
 
  723         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
  724                                         "Current connection was in the pending-io list.. Nuking it\n");
 
  726     c->thread->pending_io = list_remove(c->thread->pending_io, c);
 
  727     c->thread->pending_close = list_remove(c->thread->pending_close, c);
 
  728     UNLOCK_THREAD(c->thread);
 
  737     conn_reset_buffersize(c);
 
  738     assert(c->thread == NULL);
 
  739     cache_free(conn_cache, c);
 
  750 static void conn_shrink(conn *c) {
 
  753     if (IS_UDP(c->transport))
 
  762         newbuf = (
char *)realloc((
void *)c->
rbuf, DATA_BUFFER_SIZE);
 
  766             c->
rsize = DATA_BUFFER_SIZE;
 
  772     if (c->isize > ITEM_LIST_HIGHWAT) {
 
  773         item **newbuf = (item**) realloc((
void *)c->ilist, 
ITEM_LIST_INITIAL * 
sizeof(c->ilist[0]));
 
  781     if (c->msgsize > MSG_LIST_HIGHWAT) {
 
  790     if (c->iovsize > IOV_LIST_HIGHWAT) {
 
  803 const char *state_text(STATE_FUNC state) {
 
  804     if (state == conn_listening) {
 
  805         return "conn_listening";
 
  806     } 
else if (state == conn_new_cmd) {
 
  807         return "conn_new_cmd";
 
  808     } 
else if (state == conn_waiting) {
 
  809         return "conn_waiting";
 
  810     } 
else if (state == conn_read) {
 
  812     } 
else if (state == conn_parse_cmd) {
 
  813         return "conn_parse_cmd";
 
  814     } 
else if (state == conn_write) {
 
  816     } 
else if (state == conn_nread) {
 
  818     } 
else if (state == conn_swallow) {
 
  819         return "conn_swallow";
 
  820     } 
else if (state == conn_closing) {
 
  821         return "conn_closing";
 
  822     } 
else if (state == conn_mwrite) {
 
  823         return "conn_mwrite";
 
  824     } 
else if (state == conn_ship_log) {
 
  825         return "conn_ship_log";
 
  826     } 
else if (state == conn_add_tap_client) {
 
  827         return "conn_add_tap_client";
 
  828     } 
else if (state == conn_setup_tap_stream) {
 
  829         return "conn_setup_tap_stream";
 
  830     } 
else if (state == conn_pending_close) {
 
  831         return "conn_pending_close";
 
  832     } 
else if (state == conn_immediate_close) {
 
  833         return "conn_immediate_close";
 
  844 void conn_set_state(conn *c, STATE_FUNC state) {
 
  847     if (state != c->state) {
 
  854         if (c->thread == tap_thread) {
 
  855             if (state == conn_waiting) {
 
  857                 state = conn_ship_log;
 
  861         if (
settings.verbose > 2 || c->state == conn_closing
 
  862             || c->state == conn_add_tap_client) {
 
  863             settings.extensions.logger->
log(EXTENSION_LOG_DETAIL, c,
 
  864                                             "%d: going from %s to %s\n",
 
  865                                             c->sfd, state_text(c->state),
 
  871         if (state == conn_write || state == conn_mwrite) {
 
  872             MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->
wbuf, c->wbytes);
 
  883 static int ensure_iov_space(conn *c) {
 
  886     if (c->iovused >= c->iovsize) {
 
  888         struct iovec *new_iov = (
struct iovec *)realloc(c->iov,
 
  889                                 (c->iovsize * 2) * 
sizeof(
struct iovec));
 
  896         for (i = 0, iovnum = 0; i < c->msgused; i++) {
 
  897             c->msglist[
i].msg_iov = &c->iov[iovnum];
 
  898             iovnum += c->msglist[
i].msg_iovlen;
 
  913 static int add_iov(conn *c, 
const void *
buf, 
int len) {
 
  921         m = &c->msglist[c->msgused - 1];
 
  927         limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
 
  930         if (m->msg_iovlen == IOV_MAX ||
 
  931             (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
 
  933             m = &c->msglist[c->msgused - 1];
 
  936         if (ensure_iov_space(c) != 0)
 
  940         if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
 
  941             leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
 
  947         m = &c->msglist[c->msgused - 1];
 
  948         m->msg_iov[m->msg_iovlen].iov_base = (
void *)buf;
 
  949         m->msg_iov[m->msg_iovlen].iov_len = len;
 
  955         buf = ((
char *)buf) + len;
 
  957     } 
while (leftover > 0);
 
  966 static int build_udp_headers(conn *c) {
 
  972     if (c->msgused > c->hdrsize) {
 
  975             new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
 
  977             new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
 
  980         c->hdrbuf = (
unsigned char *)new_hdrbuf;
 
  981         c->hdrsize = c->msgused * 2;
 
  985     for (i = 0; i < c->msgused; i++) {
 
  986         c->msglist[
i].msg_iov[0].iov_base = (
void*)hdr;
 
  987         c->msglist[
i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
 
  988         *hdr++ = c->request_id / 256;
 
  989         *hdr++ = c->request_id % 256;
 
  992         *hdr++ = c->msgused / 256;
 
  993         *hdr++ = c->msgused % 256;
 
  996         assert((
void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
 
 1003 static void out_string(conn *c, 
const char *str) {
 
 1010             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 1011                                             ">%d NOREPLY %s\n", c->sfd, str);
 
 1014         if (c->sbytes > 0) {
 
 1015             conn_set_state(c, conn_swallow);
 
 1017             conn_set_state(c, conn_new_cmd);
 
 1023         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 1024                                         ">%d %s\n", c->sfd, str);
 
 1034     if ((len + 2) > c->wsize) {
 
 1036         str = 
"SERVER_ERROR output line too long";
 
 1040     memcpy(c->
wbuf, str, len);
 
 1041     memcpy(c->
wbuf + len, 
"\r\n", 2);
 
 1042     c->wbytes = len + 2;
 
 1045     conn_set_state(c, conn_write);
 
 1047     if (c->sbytes > 0) {
 
 1060 static void complete_update_ascii(conn *c) {
 
 1067         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 1068                                         "%d: Failed to get item info\n",
 
 1070         out_string(c, 
"SERVER_ERROR failed to get item details");
 
 1075     ENGINE_ERROR_CODE ret = c->aiostat;
 
 1076     c->aiostat = ENGINE_SUCCESS;
 
 1077     if (ret == ENGINE_SUCCESS) {
 
 1082 #ifdef ENABLE_DTRACE 
 1083     switch (c->store_op) {
 
 1085         MEMCACHED_COMMAND_ADD(c->sfd, info.
key, info.
nkey,
 
 1086                               (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1088     case OPERATION_REPLACE:
 
 1089         MEMCACHED_COMMAND_REPLACE(c->sfd, info.
key, info.
nkey,
 
 1090                                   (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1092     case OPERATION_APPEND:
 
 1093         MEMCACHED_COMMAND_APPEND(c->sfd, info.
key, info.
nkey,
 
 1094                                  (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1096     case OPERATION_PREPEND:
 
 1097         MEMCACHED_COMMAND_PREPEND(c->sfd, info.
key, info.
nkey,
 
 1098                                   (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1101         MEMCACHED_COMMAND_SET(c->sfd, info.
key, info.
nkey,
 
 1102                               (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1105         MEMCACHED_COMMAND_CAS(c->sfd, info.
key, info.
nkey, info.
nbytes, c->cas);
 
 1111     case ENGINE_SUCCESS:
 
 1112         out_string(c, 
"STORED");
 
 1114     case ENGINE_KEY_EEXISTS:
 
 1115         out_string(c, 
"EXISTS");
 
 1117     case ENGINE_KEY_ENOENT:
 
 1118         out_string(c, 
"NOT_FOUND");
 
 1120     case ENGINE_NOT_STORED:
 
 1121         out_string(c, 
"NOT_STORED");
 
 1123     case ENGINE_DISCONNECT:
 
 1124         c->state = conn_closing;
 
 1126     case ENGINE_ENOTSUP:
 
 1127         out_string(c, 
"SERVER_ERROR not supported");
 
 1130         out_string(c, 
"SERVER_ERROR out of memory");
 
 1132     case ENGINE_TMPFAIL:
 
 1133         out_string(c, 
"SERVER_ERROR temporary failure");
 
 1136         out_string(c, 
"CLIENT_ERROR invalid arguments");
 
 1139         out_string(c, 
"CLIENT_ERROR value too big");
 
 1141     case ENGINE_EACCESS:
 
 1142         out_string(c, 
"CLIENT_ERROR access control violation");
 
 1144     case ENGINE_NOT_MY_VBUCKET:
 
 1145         out_string(c, 
"SERVER_ERROR not my vbucket");
 
 1148         out_string(c, 
"SERVER_ERROR failure");
 
 1150     case ENGINE_EWOULDBLOCK:
 
 1151         c->ewouldblock = 
true;
 
 1153     case ENGINE_WANT_MORE:
 
 1155         c->state = conn_closing;
 
 1159         out_string(c, 
"SERVER_ERROR internal");
 
 1162     if (c->store_op == OPERATION_CAS) {
 
 1164         case ENGINE_SUCCESS:
 
 1165             SLAB_INCR(c, cas_hits, info.
key, info.
nkey);
 
 1167         case ENGINE_KEY_EEXISTS:
 
 1168             SLAB_INCR(c, cas_badval, info.
key, info.
nkey);
 
 1170         case ENGINE_KEY_ENOENT:
 
 1171             STATS_NOKEY(c, cas_misses);
 
 1177         SLAB_INCR(c, cmd_set, info.
key, info.
nkey);
 
 1180     if (!c->ewouldblock) {
 
 1190 static void* binary_get_request(conn *c) {
 
 1191     char *ret = c->
rcurr;
 
 1192     ret -= (
sizeof(c->binary_header) + c->binary_header.request.keylen +
 
 1193             c->binary_header.request.extlen);
 
 1195     assert(ret >= c->
rbuf);
 
 1202 static char* binary_get_key(conn *c) {
 
 1203     return c->
rcurr - (c->binary_header.request.keylen);
 
 1219 static ssize_t key_to_printable_buffer(
char *dest, 
size_t destsz,
 
 1220                                        int client, 
bool from_client,
 
 1225     ssize_t nw = snprintf(dest, destsz, 
"%c%d %s ", from_client ? 
'>' : 
'<',
 
 1231     char *ptr = dest + nw;
 
 1233     if (nkey > destsz) {
 
 1237     for (ssize_t ii = 0; ii < nkey; ++ii, ++key, ++ptr) {
 
 1238         if (isgraph(*key)) {
 
 1261 static ssize_t bytes_to_output_string(
char *dest, 
size_t destsz,
 
 1262                                       int client, 
bool from_client,
 
 1267     ssize_t nw = snprintf(dest, destsz, 
"%c%d %s", from_client ? 
'>' : 
'<',
 
 1274     for (ssize_t ii = 0; ii < 
size; ++ii) {
 
 1276             if ((nw = snprintf(dest + offset, destsz - offset, 
"\n%c%d  ",
 
 1277                                from_client ? 
'>' : 
'<', client)) == -1) {
 
 1282         if ((nw = snprintf(dest + offset, destsz - offset,
 
 1283                            " 0x%02x", (
unsigned char)data[ii])) == -1) {
 
 1289     if ((nw = snprintf(dest + offset, destsz - offset, 
"\n")) == -1) {
 
 1296 static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
 
 1304     if (add_msghdr(c) != 0) {
 
 1306         out_string(c, 
"SERVER_ERROR out of memory");
 
 1312     header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
 
 1313     header->response.opcode = c->binary_header.request.opcode;
 
 1314     header->response.keylen = (uint16_t)htons(key_len);
 
 1316     header->response.extlen = (uint8_t)hdr_len;
 
 1317     header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
 
 1318     header->response.status = (uint16_t)htons(err);
 
 1320     header->response.bodylen = htonl(body_len);
 
 1321     header->response.opaque = c->opaque;
 
 1322     header->response.cas = htonll(c->cas);
 
 1326         if (bytes_to_output_string(buffer, 
sizeof(buffer), c->sfd, 
false,
 
 1327                                    "Writing bin response:",
 
 1328                                    (
const char*)header->bytes,
 
 1329                                    sizeof(header->bytes)) != -1) {
 
 1330             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 1335     add_iov(c, c->
wbuf, 
sizeof(header->response));
 
 1348     case ENGINE_SUCCESS:
 
 1349         return PROTOCOL_BINARY_RESPONSE_SUCCESS;
 
 1350     case ENGINE_KEY_ENOENT:
 
 1351         return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
 
 1352     case ENGINE_KEY_EEXISTS:
 
 1353         return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
 
 1355         return PROTOCOL_BINARY_RESPONSE_ENOMEM;
 
 1356     case ENGINE_TMPFAIL:
 
 1357         return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
 
 1358     case ENGINE_NOT_STORED:
 
 1359         return PROTOCOL_BINARY_RESPONSE_NOT_STORED;
 
 1361         return PROTOCOL_BINARY_RESPONSE_EINVAL;
 
 1362     case ENGINE_ENOTSUP:
 
 1363         return PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED;
 
 1365         return PROTOCOL_BINARY_RESPONSE_E2BIG;
 
 1366     case ENGINE_NOT_MY_VBUCKET:
 
 1367         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
 
 1369         ret = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
 
 1377     char buffer[1024] = { [
sizeof(buffer) - 1] = 
'\0' };
 
 1380     case PROTOCOL_BINARY_RESPONSE_SUCCESS:
 
 1383     case PROTOCOL_BINARY_RESPONSE_ENOMEM:
 
 1384         len = snprintf(buffer, 
sizeof(buffer), 
"Out of memory");
 
 1386     case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
 
 1387         len = snprintf(buffer, 
sizeof(buffer), 
"Temporary failure");
 
 1389     case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
 
 1390         len = snprintf(buffer, 
sizeof(buffer), 
"Unknown command");
 
 1392     case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
 
 1393         len = snprintf(buffer, 
sizeof(buffer), 
"Not found");
 
 1395     case PROTOCOL_BINARY_RESPONSE_EINVAL:
 
 1396         len = snprintf(buffer, 
sizeof(buffer), 
"Invalid arguments");
 
 1398     case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
 
 1399         len = snprintf(buffer, 
sizeof(buffer), 
"Data exists for key");
 
 1401     case PROTOCOL_BINARY_RESPONSE_E2BIG:
 
 1402         len = snprintf(buffer, 
sizeof(buffer), 
"Too large");
 
 1404     case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
 
 1405         len = snprintf(buffer, 
sizeof(buffer),
 
 1406                        "Non-numeric server-side value for incr or decr");
 
 1408     case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
 
 1409         len = snprintf(buffer, 
sizeof(buffer), 
"Not stored");
 
 1411     case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
 
 1412         len = snprintf(buffer, 
sizeof(buffer), 
"Auth failure");
 
 1414     case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
 
 1415         len = snprintf(buffer, 
sizeof(buffer), 
"Not supported");
 
 1417     case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
 
 1418         len = snprintf(buffer, 
sizeof(buffer),
 
 1419                        "I'm not responsible for this vbucket");
 
 1423         len = snprintf(buffer, 
sizeof(buffer), 
"UNHANDLED ERROR (%d)", err);
 
 1424         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 1425                                         ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
 
 1431                                                   sizeof(buffer) - len - 3);
 
 1434             memcpy(buffer + len, 
": ", 2);
 
 1439     if (err != PROTOCOL_BINARY_RESPONSE_SUCCESS && 
settings.verbose > 1) {
 
 1440         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 1441                                         ">%d Writing an error: %s\n", c->sfd,
 
 1445     add_bin_header(c, err, 0, 0, len);
 
 1447         add_iov(c, buffer, len);
 
 1449     conn_set_state(c, conn_mwrite);
 
 1451         c->sbytes = swallow;
 
 1459 static void write_bin_response(conn *c, 
void *d, 
int hlen, 
int keylen, 
int dlen) {
 
 1460     if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
 
 1461         c->cmd == PROTOCOL_BINARY_CMD_GETK) {
 
 1462         add_bin_header(c, 0, hlen, keylen, dlen);
 
 1464             add_iov(c, d, dlen);
 
 1466         conn_set_state(c, conn_mwrite);
 
 1469         conn_set_state(c, conn_new_cmd);
 
 1474 static void complete_incr_bin(conn *c) {
 
 1479     assert(c->wsize >= 
sizeof(*rsp));
 
 1482     uint64_t delta = ntohll(req->message.body.delta);
 
 1483     uint64_t initial = ntohll(req->message.body.initial);
 
 1484     rel_time_t expiration = ntohl(req->message.body.expiration);
 
 1485     char *key = binary_get_key(c);
 
 1486     size_t nkey = c->binary_header.request.keylen;
 
 1487     bool incr = (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT ||
 
 1488                  c->cmd == PROTOCOL_BINARY_CMD_INCREMENTQ);
 
 1493         nw = key_to_printable_buffer(buffer, 
sizeof(buffer), c->sfd, 
true,
 
 1494                                      incr ? 
"INCR" : 
"DECR", key, nkey);
 
 1496             if (snprintf(buffer + nw, 
sizeof(buffer) - nw,
 
 1497                          " %" PRIu64 
", %" PRIu64 
", %" PRIu64 
"\n",
 
 1498                          delta, initial, (uint64_t)expiration) != -1) {
 
 1499                 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c, 
"%s",
 
 1505     ENGINE_ERROR_CODE ret = c->aiostat;
 
 1506     c->aiostat = ENGINE_SUCCESS;
 
 1507     if (ret == ENGINE_SUCCESS) {
 
 1510                                              req->message.body.expiration != 0xffffffff,
 
 1511                                              delta, initial, expiration,
 
 1513                                              &rsp->message.body.value,
 
 1514                                              c->binary_header.request.vbucket);
 
 1518     case ENGINE_SUCCESS:
 
 1519         rsp->message.body.value = htonll(rsp->message.body.value);
 
 1520         write_bin_response(c, &rsp->message.body, 0, 0,
 
 1521                            sizeof (rsp->message.body.value));
 
 1523             STATS_INCR(c, incr_hits, key, nkey);
 
 1525             STATS_INCR(c, decr_hits, key, nkey);
 
 1528     case ENGINE_KEY_EEXISTS:
 
 1529         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
 
 1531     case ENGINE_KEY_ENOENT:
 
 1532         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
 
 1533         if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
 
 1534             STATS_INCR(c, incr_misses, key, nkey);
 
 1536             STATS_INCR(c, decr_misses, key, nkey);
 
 1540         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
 
 1542     case ENGINE_TMPFAIL:
 
 1543         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
 
 1546         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0);
 
 1548     case ENGINE_NOT_STORED:
 
 1549         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
 
 1551     case ENGINE_DISCONNECT:
 
 1552         c->state = conn_closing;
 
 1554     case ENGINE_ENOTSUP:
 
 1555         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
 
 1557     case ENGINE_NOT_MY_VBUCKET:
 
 1558         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
 
 1560     case ENGINE_EWOULDBLOCK:
 
 1561         c->ewouldblock = 
true;
 
 1568 static void complete_update_bin(conn *c) {
 
 1576         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 1577                                         "%d: Failed to get item info\n",
 
 1579         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
 
 1583     ENGINE_ERROR_CODE ret = c->aiostat;
 
 1584     c->aiostat = ENGINE_SUCCESS;
 
 1585     if (ret == ENGINE_SUCCESS) {
 
 1587                                         it, &c->cas, c->store_op,
 
 1588                                         c->binary_header.request.vbucket);
 
 1591 #ifdef ENABLE_DTRACE 
 1594         MEMCACHED_COMMAND_ADD(c->sfd, info.
key, info.
nkey,
 
 1595                               (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1597     case OPERATION_REPLACE:
 
 1598         MEMCACHED_COMMAND_REPLACE(c->sfd, info.
key, info.
nkey,
 
 1599                                   (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1601     case OPERATION_APPEND:
 
 1602         MEMCACHED_COMMAND_APPEND(c->sfd, info.
key, info.
nkey,
 
 1603                                  (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1605     case OPERATION_PREPEND:
 
 1606         MEMCACHED_COMMAND_PREPEND(c->sfd, info.
key, info.
nkey,
 
 1607                                   (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1610         MEMCACHED_COMMAND_SET(c->sfd, info.
key, info.
nkey,
 
 1611                               (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
 
 1617     case ENGINE_SUCCESS:
 
 1619         write_bin_response(c, NULL, 0, 0, 0);
 
 1621     case ENGINE_KEY_EEXISTS:
 
 1622         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
 
 1624     case ENGINE_KEY_ENOENT:
 
 1625         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
 
 1628         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
 
 1630     case ENGINE_TMPFAIL:
 
 1631         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
 
 1633     case ENGINE_EWOULDBLOCK:
 
 1634         c->ewouldblock = 
true;
 
 1636     case ENGINE_DISCONNECT:
 
 1637         c->state = conn_closing;
 
 1639     case ENGINE_ENOTSUP:
 
 1640         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
 
 1642     case ENGINE_NOT_MY_VBUCKET:
 
 1643         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
 
 1646         if (c->store_op == OPERATION_ADD) {
 
 1647             eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
 
 1648         } 
else if(c->store_op == OPERATION_REPLACE) {
 
 1649             eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
 
 1651             eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
 
 1653         write_bin_packet(c, eno, 0);
 
 1656     if (c->store_op == OPERATION_CAS) {
 
 1658         case ENGINE_SUCCESS:
 
 1659             SLAB_INCR(c, cas_hits, info.
key, info.
nkey);
 
 1661         case ENGINE_KEY_EEXISTS:
 
 1662             SLAB_INCR(c, cas_badval, info.
key, info.
nkey);
 
 1664         case ENGINE_KEY_ENOENT:
 
 1665             STATS_NOKEY(c, cas_misses);
 
 1671         SLAB_INCR(c, cmd_set, info.
key, info.
nkey);
 
 1674     if (!c->ewouldblock) {
 
 1681 static void process_bin_get(conn *c) {
 
 1685     char* key = binary_get_key(c);
 
 1686     size_t nkey = c->binary_header.request.keylen;
 
 1690         if (key_to_printable_buffer(buffer, 
sizeof(buffer), c->sfd, 
true,
 
 1691                                     "GET", key, nkey) != -1) {
 
 1692             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c, 
"%s\n",
 
 1697     ENGINE_ERROR_CODE ret = c->aiostat;
 
 1698     c->aiostat = ENGINE_SUCCESS;
 
 1699     if (ret == ENGINE_SUCCESS) {
 
 1701                                       c->binary_header.request.vbucket);
 
 1709     case ENGINE_SUCCESS:
 
 1712             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 1713                                             "%d: Failed to get item info\n",
 
 1715             write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
 
 1720         bodylen = 
sizeof(rsp->message.body) + info.
nbytes;
 
 1722         STATS_HIT(c, 
get, key, nkey);
 
 1724         if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
 
 1728         add_bin_header(c, 0, 
sizeof(rsp->message.body), keylen, bodylen);
 
 1729         rsp->message.header.response.cas = htonll(info.cas);
 
 1732         rsp->message.body.flags = info.
flags;
 
 1733         add_iov(c, &rsp->message.body, 
sizeof(rsp->message.body));
 
 1735         if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
 
 1736             add_iov(c, info.
key, nkey);
 
 1739         add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
 
 1740         conn_set_state(c, conn_mwrite);
 
 1744     case ENGINE_KEY_ENOENT:
 
 1745         STATS_MISS(c, 
get, key, nkey);
 
 1747         MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
 
 1750             conn_set_state(c, conn_new_cmd);
 
 1752             if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
 
 1754                 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
 
 1756                 memcpy(ofs, key, nkey);
 
 1757                 add_iov(c, ofs, nkey);
 
 1758                 conn_set_state(c, conn_mwrite);
 
 1760                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
 
 1764     case ENGINE_EWOULDBLOCK:
 
 1765         c->ewouldblock = 
true;
 
 1767     case ENGINE_DISCONNECT:
 
 1768         c->state = conn_closing;
 
 1770     case ENGINE_TMPFAIL:
 
 1772     case ENGINE_ENOTSUP:
 
 1773         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
 
 1775     case ENGINE_NOT_MY_VBUCKET:
 
 1776         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
 
 1780         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 1781                                         "Unknown error code: %d\n", ret);
 
 1785     if (
settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) {
 
 1786         stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS);
 
 1790 static void append_bin_stats(
const char *key, 
const uint16_t klen,
 
 1791                              const char *val, 
const uint32_t vlen,
 
 1793     char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
 
 1794     uint32_t bodylen = klen + vlen;
 
 1796         .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
 
 1797         .response.opcode = PROTOCOL_BINARY_CMD_STAT,
 
 1798         .response.keylen = (uint16_t)htons(klen),
 
 1799         .response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
 
 1800         .response.bodylen = htonl(bodylen),
 
 1801         .response.opaque = c->opaque
 
 1804     memcpy(buf, header.bytes, 
sizeof(header.response));
 
 1805     buf += 
sizeof(header.response);
 
 1808         memcpy(buf, key, klen);
 
 1812             memcpy(buf, val, vlen);
 
 1816     c->dynamic_buffer.offset += 
sizeof(header.response) + bodylen;
 
 1824 static void append_ascii_stats(
const char *key, 
const uint16_t klen,
 
 1825                                const char *val, 
const uint32_t vlen,
 
 1827     char *pos = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
 
 1828     uint32_t nbytes = 5; 
 
 1830     if (klen == 0 && vlen == 0) {
 
 1831         memcpy(pos, 
"END\r\n", 5);
 
 1833         memcpy(pos, 
"STAT ", 5);
 
 1834         memcpy(pos + nbytes, key, klen);
 
 1839             memcpy(pos + nbytes, val, vlen);
 
 1842         memcpy(pos + nbytes, 
"\r\n", 2);
 
 1846     c->dynamic_buffer.offset += nbytes;
 
 1849 static bool grow_dynamic_buffer(conn *c, 
size_t needed) {
 
 1850     size_t nsize = c->dynamic_buffer.size;
 
 1851     size_t available = nsize - c->dynamic_buffer.offset;
 
 1855     if (c->dynamic_buffer.buffer == NULL) {
 
 1857         available = c->dynamic_buffer.size = c->dynamic_buffer.offset = 0;
 
 1860     while (needed > available) {
 
 1863         available = nsize - c->dynamic_buffer.offset;
 
 1866     if (nsize != c->dynamic_buffer.size) {
 
 1867         char *ptr = realloc(c->dynamic_buffer.buffer, nsize);
 
 1869             c->dynamic_buffer.buffer = ptr;
 
 1870             c->dynamic_buffer.size = nsize;
 
 1879 static void append_stats(
const char *key, 
const uint16_t klen,
 
 1880                          const char *val, 
const uint32_t vlen,
 
 1884     if (klen == 0 && vlen > 0) {
 
 1888     conn *c = (conn*)cookie;
 
 1890     if (c->protocol == binary_prot) {
 
 1892         if (!grow_dynamic_buffer(c, needed)) {
 
 1895         append_bin_stats(key, klen, val, vlen, c);
 
 1897         size_t needed = vlen + klen + 10; 
 
 1898         if (!grow_dynamic_buffer(c, needed)) {
 
 1901         append_ascii_stats(key, klen, val, vlen, c);
 
 1904     assert(c->dynamic_buffer.offset <= c->dynamic_buffer.size);
 
 1907 static void process_bin_stat(conn *c) {
 
 1908     char *subcommand = binary_get_key(c);
 
 1909     size_t nkey = c->binary_header.request.keylen;
 
 1913         if (key_to_printable_buffer(buffer, 
sizeof(buffer), c->sfd, 
true,
 
 1914                                     "STATS", subcommand, nkey) != -1) {
 
 1915             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c, 
"%s\n",
 
 1920     ENGINE_ERROR_CODE ret = c->aiostat;
 
 1921     c->aiostat = ENGINE_SUCCESS;
 
 1922     c->ewouldblock = 
false;
 
 1924     if (ret == ENGINE_SUCCESS) {
 
 1928             if (ret == ENGINE_SUCCESS) {
 
 1929                 server_stats(&append_stats, c, 
false);
 
 1931         } 
else if (strncmp(subcommand, 
"reset", 5) == 0) {
 
 1934         } 
else if (strncmp(subcommand, 
"settings", 8) == 0) {
 
 1935             process_stat_settings(&append_stats, c);
 
 1936         } 
else if (strncmp(subcommand, 
"detail", 6) == 0) {
 
 1937             char *subcmd_pos = subcommand + 6;
 
 1939                 if (strncmp(subcmd_pos, 
" dump", 5) == 0) {
 
 1941                     char *dump_buf = stats_prefix_dump(&len);
 
 1942                     if (dump_buf == NULL || len <= 0) {
 
 1943                         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
 
 1946                         append_stats(
"detailed", strlen(
"detailed"), dump_buf, len, c);
 
 1949                 } 
else if (strncmp(subcmd_pos, 
" on", 3) == 0) {
 
 1951                 } 
else if (strncmp(subcmd_pos, 
" off", 4) == 0) {
 
 1954                     write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
 
 1958                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
 
 1961         } 
else if (strncmp(subcommand, 
"aggregate", 9) == 0) {
 
 1962             server_stats(&append_stats, c, 
true);
 
 1963         } 
else if (strncmp(subcommand, 
"topkeys", 7) == 0) {
 
 1964             topkeys_t *tk = get_independent_stats(c)->topkeys;
 
 1966                 topkeys_stats(tk, c, current_time, append_stats);
 
 1968                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
 
 1979     case ENGINE_SUCCESS:
 
 1980         append_stats(NULL, 0, NULL, 0, c);
 
 1981         write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
 
 1982         c->dynamic_buffer.buffer = NULL;
 
 1985         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
 
 1987     case ENGINE_TMPFAIL:
 
 1988         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
 
 1990     case ENGINE_KEY_ENOENT:
 
 1991         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
 
 1993     case ENGINE_DISCONNECT:
 
 1994         c->state = conn_closing;
 
 1996     case ENGINE_ENOTSUP:
 
 1997         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
 
 1999     case ENGINE_EWOULDBLOCK:
 
 2000         c->ewouldblock = 
true;
 
 2003         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
 
 2007 static void bin_read_chunk(conn *c, 
enum bin_substates next_substate, uint32_t chunk) {
 
 2009     c->substate = next_substate;
 
 2015         size_t nsize = c->
rsize;
 
 2018         while (size > nsize) {
 
 2022         if (nsize != c->
rsize) {
 
 2024                 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 2025                         "%d: Need to grow buffer from %lu to %lu\n",
 
 2026                         c->sfd, (
unsigned long)c->
rsize, (
unsigned long)nsize);
 
 2028             char *newm = realloc(c->
rbuf, nsize);
 
 2031                     settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 2032                             "%d: Failed to grow buffer.. closing connection\n",
 
 2035                 conn_set_state(c, conn_closing);
 
 2048                 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 2049                                                 "%d: Repack input buffer\n",
 
 2057     conn_set_state(c, conn_nread);
 
 2060 static void bin_read_key(conn *c, 
enum bin_substates next_substate, 
int extra) {
 
 2061     bin_read_chunk(c, next_substate, c->keylen + extra);
 
 2066 static void handle_binary_protocol_error(conn *c) {
 
 2067     write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
 
 2069         settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 2070                 "%d: Protocol error (opcode %02x), close connection\n",
 
 2071                 c->sfd, c->binary_header.request.opcode);
 
 2076 static void init_sasl_conn(conn *c) {
 
 2078     if (!c->sasl_conn) {
 
 2079         int result=sasl_server_new(
"memcached",
 
 2080                                    NULL, NULL, NULL, NULL,
 
 2081                                    NULL, 0, &c->sasl_conn);
 
 2082         if (result != SASL_OK) {
 
 2084                 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 2085                          "%d: Failed to initialize SASL conn.\n",
 
 2088             c->sasl_conn = NULL;
 
 2093 static void get_auth_data(
const void *cookie, 
auth_data_t *data) {
 
 2094     conn *c = (conn*)cookie;
 
 2096         sasl_getprop(c->sasl_conn, SASL_USERNAME, (
void*)&data->username);
 
 2098         sasl_getprop(c->sasl_conn, ISASL_CONFIG, (
void*)&data->config);
 
 2104 static void bin_list_sasl_mechs(conn *c) {
 
 2106     const char *result_string = NULL;
 
 2107     unsigned int string_length = 0;
 
 2108     int result=sasl_listmech(c->sasl_conn, NULL,
 
 2112                              &result_string, &string_length,
 
 2114     if (result != SASL_OK) {
 
 2117             settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 2118                      "%d: Failed to list SASL mechanisms.\n",
 
 2121         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
 
 2124     write_bin_response(c, (
char*)result_string, 0, 0, string_length);
 
 2134 static void process_bin_sasl_auth(conn *c) {
 
 2135     assert(c->binary_header.request.extlen == 0);
 
 2137     int nkey = c->binary_header.request.keylen;
 
 2138     int vlen = c->binary_header.request.bodylen - nkey;
 
 2140     if (nkey > MAX_SASL_MECH_LEN) {
 
 2141         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
 
 2146     char *key = binary_get_key(c);
 
 2149     size_t buffer_size = 
sizeof(
struct sasl_tmp) + nkey + vlen + 2;
 
 2152         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
 
 2159     memcpy(data->data, key, nkey);
 
 2162     c->
ritem = data->data + nkey;
 
 2164     conn_set_state(c, conn_nread);
 
 2165     c->substate = bin_reading_sasl_auth_data;
 
 2168 static void process_bin_complete_sasl_auth(conn *c) {
 
 2169     const char *out = NULL;
 
 2170     unsigned int outlen = 0;
 
 2175     int nkey = c->binary_header.request.keylen;
 
 2176     int vlen = c->binary_header.request.bodylen - nkey;
 
 2180     memcpy(mech, stmp->data, nkey);
 
 2184         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 2185                 "%d: mech: ``%s'' with %d bytes of data\n", c->sfd, mech, vlen);
 
 2188     const char *challenge = vlen == 0 ? NULL : (stmp->data + nkey);
 
 2193     case PROTOCOL_BINARY_CMD_SASL_AUTH:
 
 2194         result = sasl_server_start(c->sasl_conn, mech,
 
 2198     case PROTOCOL_BINARY_CMD_SASL_STEP:
 
 2199         result = sasl_server_step(c->sasl_conn,
 
 2208             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 2209                     "%d: Unhandled command %d with challenge %s\n",
 
 2210                     c->sfd, c->cmd, challenge);
 
 2220         settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 2221                                         "%d: sasl result code:  %d\n",
 
 2227         write_bin_response(c, 
"Authenticated", 0, 0, strlen(
"Authenticated"));
 
 2229         get_auth_data(c, &data);
 
 2230         perform_callbacks(ON_AUTH, (
const void*)&data, c);
 
 2231         STATS_NOKEY(c, auth_cmds);
 
 2234         add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen);
 
 2236             add_iov(c, out, outlen);
 
 2238         conn_set_state(c, conn_mwrite);
 
 2243             settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 2244                                             "%d: Unknown sasl response:  %d\n",
 
 2247         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
 
 2248         STATS_NOKEY2(c, auth_cmds, auth_errors);
 
 2252 static bool authenticated(conn *c) {
 
 2256     case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: 
 
 2257     case PROTOCOL_BINARY_CMD_SASL_AUTH:       
 
 2258     case PROTOCOL_BINARY_CMD_SASL_STEP:       
 
 2259     case PROTOCOL_BINARY_CMD_VERSION:         
 
 2264             const void *uname = NULL;
 
 2265             sasl_getprop(c->sasl_conn, SASL_USERNAME, &uname);
 
 2271         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 2272                 "%d: authenticated() in cmd 0x%02x is %s\n",
 
 2273                 c->sfd, c->cmd, rv ? 
"true" : 
"false");
 
 2279 static bool binary_response_handler(
const void *key, uint16_t keylen,
 
 2280                                     const void *ext, uint8_t extlen,
 
 2281                                     const void *body, uint32_t bodylen,
 
 2282                                     uint8_t datatype, uint16_t status,
 
 2283                                     uint64_t cas, 
const void *cookie)
 
 2285     conn *c = (conn*)cookie;
 
 2288     if (!grow_dynamic_buffer(c, needed)) {
 
 2290             settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 2291                     "<%d ERROR: Failed to allocate memory for response\n",
 
 2297     char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
 
 2299         .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
 
 2300         .response.opcode = c->binary_header.request.opcode,
 
 2301         .response.keylen = (uint16_t)htons(keylen),
 
 2302         .response.extlen = extlen,
 
 2303         .response.datatype = datatype,
 
 2304         .response.status = (uint16_t)htons(status),
 
 2305         .response.bodylen = htonl(bodylen + keylen + extlen),
 
 2306         .response.opaque = c->opaque,
 
 2307         .response.cas = htonll(cas),
 
 2310     memcpy(buf, header.bytes, 
sizeof(header.response));
 
 2311     buf += 
sizeof(header.response);
 
 2314         memcpy(buf, ext, extlen);
 
 2319         memcpy(buf, key, keylen);
 
 2324         memcpy(buf, body, bodylen);
 
 2327     c->dynamic_buffer.offset += needed;
 
 2339     uint64_t checkpoint_start;
 
 2340     uint64_t checkpoint_end;
 
 2344     uint64_t vbucket_set;
 
 2348     pthread_mutex_t mutex;
 
 2351 } 
tap_stats = { .mutex = PTHREAD_MUTEX_INITIALIZER };
 
 2353 static void ship_tap_log(conn *c) {
 
 2354     assert(c->thread->type == TAP);
 
 2358     if (add_msghdr(c) != 0) {
 
 2360             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 2361                                             "%d: Failed to create output headers. Shutting down tap connection\n", c->sfd);
 
 2363         conn_set_state(c, conn_closing);
 
 2369     bool more_data = 
true;
 
 2370     bool send_data = 
false;
 
 2371     bool disconnect = 
false;
 
 2376     c->icurr = c->ilist;
 
 2390         tap_event_t 
event = c->tap_iterator(
settings.engine.v0, c, &it,
 
 2391                                             &engine, &nengine, &ttl,
 
 2392                                             &tap_flags, &seqno, &vbucket);
 
 2400             .mutation.message.header.request.magic = (uint8_t)PROTOCOL_BINARY_REQ,
 
 2403         msg.opaque.message.header.request.opaque = htonl(seqno);
 
 2404         msg.opaque.message.body.tap.enginespecific_length = htons(nengine);
 
 2405         msg.opaque.message.body.tap.ttl = ttl;
 
 2406         msg.opaque.message.body.tap.flags = htons(tap_flags);
 
 2407         msg.opaque.message.header.request.extlen = 8;
 
 2408         msg.opaque.message.header.request.vbucket = htons(vbucket);
 
 2414             msg.noop.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
 
 2415             msg.noop.message.header.request.extlen = 0;
 
 2416             msg.noop.message.header.request.bodylen = htonl(0);
 
 2417             memcpy(c->wcurr, msg.noop.bytes, 
sizeof(msg.noop.bytes));
 
 2418             add_iov(c, c->wcurr, 
sizeof(msg.noop.bytes));
 
 2419             c->wcurr += 
sizeof(msg.noop.bytes);
 
 2420             c->wbytes += 
sizeof(msg.noop.bytes);
 
 2425         case TAP_CHECKPOINT_START:
 
 2426         case TAP_CHECKPOINT_END:
 
 2430                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 2431                                                 "%d: Failed to get item info\n", c->sfd);
 
 2435             c->ilist[c->ileft++] = it;
 
 2437             if (
event == TAP_CHECKPOINT_START) {
 
 2438                 msg.mutation.message.header.request.opcode =
 
 2439                     PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START;
 
 2443             } 
else if (
event == TAP_CHECKPOINT_END) {
 
 2444                 msg.mutation.message.header.request.opcode =
 
 2445                     PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END;
 
 2449             } 
else if (
event == TAP_MUTATION) {
 
 2450                 msg.mutation.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_MUTATION;
 
 2456             msg.mutation.message.header.request.cas = htonll(info.cas);
 
 2457             msg.mutation.message.header.request.keylen = htons(info.
nkey);
 
 2458             msg.mutation.message.header.request.extlen = 16;
 
 2460             bodylen = 16 + info.
nkey + nengine;
 
 2461             if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
 
 2464             msg.mutation.message.header.request.bodylen = htonl(bodylen);
 
 2465             msg.mutation.message.body.item.flags = htonl(info.
flags);
 
 2466             msg.mutation.message.body.item.expiration = htonl(info.
exptime);
 
 2467             msg.mutation.message.body.tap.enginespecific_length = htons(nengine);
 
 2468             msg.mutation.message.body.tap.ttl = ttl;
 
 2469             msg.mutation.message.body.tap.flags = htons(tap_flags);
 
 2470             memcpy(c->wcurr, msg.mutation.bytes, 
sizeof(msg.mutation.bytes));
 
 2472             add_iov(c, c->wcurr, 
sizeof(msg.mutation.bytes));
 
 2473             c->wcurr += 
sizeof(msg.mutation.bytes);
 
 2474             c->wbytes += 
sizeof(msg.mutation.bytes);
 
 2477                 memcpy(c->wcurr, engine, nengine);
 
 2478                 add_iov(c, c->wcurr, nengine);
 
 2479                 c->wcurr += nengine;
 
 2480                 c->wbytes += nengine;
 
 2483             add_iov(c, info.
key, info.
nkey);
 
 2484             if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
 
 2485                 add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
 
 2493                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 2494                                                 "%d: Failed to get item info\n", c->sfd);
 
 2498             c->ilist[c->ileft++] = it;
 
 2499             msg.delete.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_DELETE;
 
 2500             msg.delete.message.header.request.cas = htonll(info.cas);
 
 2501             msg.delete.message.header.request.keylen = htons(info.
nkey);
 
 2503             bodylen = 8 + info.
nkey + nengine;
 
 2504             if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
 
 2507             msg.delete.message.header.request.bodylen = htonl(bodylen);
 
 2509             memcpy(c->wcurr, msg.delete.bytes, 
sizeof(msg.delete.bytes));
 
 2510             add_iov(c, c->wcurr, 
sizeof(msg.delete.bytes));
 
 2511             c->wcurr += 
sizeof(msg.delete.bytes);
 
 2512             c->wbytes += 
sizeof(msg.delete.bytes);
 
 2515                 memcpy(c->wcurr, engine, nengine);
 
 2516                 add_iov(c, c->wcurr, nengine);
 
 2517                 c->wcurr += nengine;
 
 2518                 c->wbytes += nengine;
 
 2521             add_iov(c, info.
key, info.
nkey);
 
 2522             if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
 
 2523                 add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
 
 2531         case TAP_DISCONNECT:
 
 2535         case TAP_VBUCKET_SET:
 
 2540             if (
event == TAP_OPAQUE) {
 
 2541                 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_OPAQUE;
 
 2546             } 
else if (
event == TAP_FLUSH) {
 
 2547                 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_FLUSH;
 
 2551             } 
else if (
event == TAP_VBUCKET_SET) {
 
 2552                 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET;
 
 2553                 msg.flush.message.body.tap.flags = htons(tap_flags);
 
 2559             msg.flush.message.header.request.bodylen = htonl(8 + nengine);
 
 2560             memcpy(c->wcurr, msg.flush.bytes, 
sizeof(msg.flush.bytes));
 
 2561             add_iov(c, c->wcurr, 
sizeof(msg.flush.bytes));
 
 2562             c->wcurr += 
sizeof(msg.flush.bytes);
 
 2563             c->wbytes += 
sizeof(msg.flush.bytes);
 
 2565                 memcpy(c->wcurr, engine, nengine);
 
 2566                 add_iov(c, c->wcurr, nengine);
 
 2567                 c->wcurr += nengine;
 
 2568                 c->wbytes += nengine;
 
 2574     } 
while (more_data);
 
 2576     c->ewouldblock = 
false;
 
 2578         conn_set_state(c, conn_mwrite);
 
 2586             conn_set_state(c, conn_closing);
 
 2590                 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 2591                                                 "%d: No more items in tap log.. waiting\n",
 
 2594             c->ewouldblock = 
true;
 
 2599 static void process_bin_unknown_packet(conn *c) {
 
 2600     void *packet = c->
rcurr - (c->binary_header.request.bodylen +
 
 2601                                sizeof(c->binary_header));
 
 2603     ENGINE_ERROR_CODE ret = c->aiostat;
 
 2604     c->aiostat = ENGINE_SUCCESS;
 
 2605     c->ewouldblock = 
false;
 
 2607     if (ret == ENGINE_SUCCESS) {
 
 2609                                                   binary_response_handler);
 
 2612     if (ret == ENGINE_SUCCESS) {
 
 2613         if (c->dynamic_buffer.buffer != NULL) {
 
 2614             write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
 
 2615             c->dynamic_buffer.buffer = NULL;
 
 2617             conn_set_state(c, conn_new_cmd);
 
 2619     } 
else if (ret == ENGINE_ENOTSUP) {
 
 2620         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0);
 
 2621     } 
else if (ret == ENGINE_EWOULDBLOCK) {
 
 2622         c->ewouldblock = 
true;
 
 2625         conn_set_state(c, conn_closing);
 
 2629 static void process_bin_tap_connect(conn *c) {
 
 2630     char *packet = (c->
rcurr - (c->binary_header.request.bodylen +
 
 2631                                 sizeof(c->binary_header)));
 
 2633     const char *key = packet + 
sizeof(req->bytes);
 
 2634     const char *data = key + c->binary_header.request.keylen;
 
 2636     size_t ndata = c->binary_header.request.bodylen -
 
 2637         c->binary_header.request.extlen -
 
 2638         c->binary_header.request.keylen;
 
 2640     if (c->binary_header.request.extlen == 4) {
 
 2641         flags = ntohl(req->message.body.flags);
 
 2643         if (flags & TAP_CONNECT_FLAG_BACKFILL) {
 
 2646                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 2647                                                 "%d: ERROR: Invalid tap connect message\n",
 
 2649                 conn_set_state(c, conn_closing);
 
 2658     if (
settings.verbose && c->binary_header.request.keylen > 0) {
 
 2660         int len = c->binary_header.request.keylen;
 
 2661         if (len >= 
sizeof(buffer)) {
 
 2662             len = 
sizeof(buffer) - 1;
 
 2664         memcpy(buffer, key, len);
 
 2666         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 2667                                         "%d: Trying to connect with named tap connection: <%s>\n",
 
 2672         settings.engine.v0, c, key, c->binary_header.request.keylen,
 
 2673         flags, data, ndata);
 
 2675     if (iterator == NULL) {
 
 2676         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 2677                                         "%d: FATAL: The engine does not support tap\n",
 
 2679         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
 
 2682         c->tap_iterator = iterator;
 
 2683         c->which = EV_WRITE;
 
 2684         conn_set_state(c, conn_ship_log);
 
 2688 static void process_bin_tap_packet(tap_event_t 
event, conn *c) {
 
 2690     char *packet = (c->
rcurr - (c->binary_header.request.bodylen +
 
 2691                                 sizeof(c->binary_header)));
 
 2693     uint16_t nengine = ntohs(tap->message.body.tap.enginespecific_length);
 
 2694     uint16_t tap_flags = ntohs(tap->message.body.tap.flags);
 
 2695     uint32_t seqno = ntohl(tap->message.header.request.opaque);
 
 2696     uint8_t ttl = tap->message.body.tap.ttl;
 
 2698     char *engine_specific = packet + 
sizeof(tap->bytes);
 
 2699     char *key = engine_specific + nengine;
 
 2700     uint16_t nkey = c->binary_header.request.keylen;
 
 2701     char *data = key + nkey;
 
 2703     uint32_t exptime = 0;
 
 2704     uint32_t ndata = c->binary_header.request.bodylen - nengine - nkey - 8;
 
 2706     if (event == TAP_MUTATION || event == TAP_CHECKPOINT_START ||
 
 2707         event == TAP_CHECKPOINT_END) {
 
 2709         flags = ntohl(mutation->message.body.item.flags);
 
 2710         exptime = ntohl(mutation->message.body.item.expiration);
 
 2716     ENGINE_ERROR_CODE ret = c->aiostat;
 
 2717     if (ret == ENGINE_SUCCESS) {
 
 2719                                              engine_specific, nengine,
 
 2724                                              ntohll(tap->message.header.request.cas),
 
 2726                                              c->binary_header.request.vbucket);
 
 2730     case ENGINE_DISCONNECT:
 
 2731         conn_set_state(c, conn_closing);
 
 2733     case ENGINE_EWOULDBLOCK:
 
 2734         c->ewouldblock = 
true;
 
 2737         if ((tap_flags & TAP_FLAG_ACK) ||
 
 2738             (ret != ENGINE_SUCCESS && c->tap_nack_mode))
 
 2740             write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
 
 2742             conn_set_state(c, conn_new_cmd);
 
 2747 static void process_bin_tap_ack(conn *c) {
 
 2749     char *packet = (c->
rcurr - (c->binary_header.request.bodylen +
 
 2750                                 sizeof(c->binary_header)));
 
 2752     uint32_t seqno = ntohl(rsp->message.header.response.opaque);
 
 2753     uint16_t status = ntohs(rsp->message.header.response.status);
 
 2754     char *key = packet + 
sizeof(rsp->bytes);
 
 2756     ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
 
 2759                                              TAP_ACK, seqno, key,
 
 2760                                              c->binary_header.request.keylen, 0, 0,
 
 2764     if (ret == ENGINE_DISCONNECT) {
 
 2765         conn_set_state(c, conn_closing);
 
 2767         conn_set_state(c, conn_ship_log);
 
 2774 static void process_bin_noop_response(conn *c) {
 
 2776     conn_set_state(c, conn_new_cmd);
 
 2779 static void process_bin_verbosity(conn *c) {
 
 2780     char *packet = (c->
rcurr - (c->binary_header.request.bodylen +
 
 2781                                 sizeof(c->binary_header)));
 
 2783     uint32_t 
level = (uint32_t)ntohl(req->message.body.level);
 
 2784     if (level > MAX_VERBOSITY_LEVEL) {
 
 2785         level = MAX_VERBOSITY_LEVEL;
 
 2788     perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
 
 2789     write_bin_response(c, NULL, 0, 0, 0);
 
 2792 static void process_bin_packet(conn *c) {
 
 2794     switch (c->binary_header.request.opcode) {
 
 2795     case PROTOCOL_BINARY_CMD_TAP_CONNECT:
 
 2799         conn_set_state(c, conn_add_tap_client);
 
 2801     case PROTOCOL_BINARY_CMD_TAP_MUTATION:
 
 2805         process_bin_tap_packet(TAP_MUTATION, c);
 
 2807     case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
 
 2811         process_bin_tap_packet(TAP_CHECKPOINT_START, c);
 
 2813     case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
 
 2817         process_bin_tap_packet(TAP_CHECKPOINT_END, c);
 
 2819     case PROTOCOL_BINARY_CMD_TAP_DELETE:
 
 2823         process_bin_tap_packet(TAP_DELETION, c);
 
 2825     case PROTOCOL_BINARY_CMD_TAP_FLUSH:
 
 2829         process_bin_tap_packet(TAP_FLUSH, c);
 
 2831     case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
 
 2835         process_bin_tap_packet(TAP_OPAQUE, c);
 
 2837     case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
 
 2841         process_bin_tap_packet(TAP_VBUCKET_SET, c);
 
 2843     case PROTOCOL_BINARY_CMD_VERBOSITY:
 
 2844         process_bin_verbosity(c);
 
 2847         process_bin_unknown_packet(c);
 
 2853 typedef void (*RESPONSE_HANDLER)(conn*);
 
 2858 static RESPONSE_HANDLER response_handlers[256] = {
 
 2859     [PROTOCOL_BINARY_CMD_NOOP] = process_bin_noop_response,
 
 2860     [PROTOCOL_BINARY_CMD_TAP_MUTATION] = process_bin_tap_ack,
 
 2861     [PROTOCOL_BINARY_CMD_TAP_DELETE] = process_bin_tap_ack,
 
 2862     [PROTOCOL_BINARY_CMD_TAP_FLUSH] = process_bin_tap_ack,
 
 2863     [PROTOCOL_BINARY_CMD_TAP_OPAQUE] = process_bin_tap_ack,
 
 2864     [PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET] = process_bin_tap_ack,
 
 2865     [PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START] = process_bin_tap_ack,
 
 2866     [PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END] = process_bin_tap_ack
 
 2869 static void dispatch_bin_command(conn *c) {
 
 2870     int protocol_error = 0;
 
 2872     int extlen = c->binary_header.request.extlen;
 
 2873     uint16_t keylen = c->binary_header.request.keylen;
 
 2874     uint32_t bodylen = c->binary_header.request.bodylen;
 
 2876     if (
settings.require_sasl && !authenticated(c)) {
 
 2877         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
 
 2882     MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->
rcurr, c->
rbytes);
 
 2887         handle_binary_protocol_error(c);
 
 2892     case PROTOCOL_BINARY_CMD_SETQ:
 
 2893         c->cmd = PROTOCOL_BINARY_CMD_SET;
 
 2895     case PROTOCOL_BINARY_CMD_ADDQ:
 
 2896         c->cmd = PROTOCOL_BINARY_CMD_ADD;
 
 2898     case PROTOCOL_BINARY_CMD_REPLACEQ:
 
 2899         c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
 
 2901     case PROTOCOL_BINARY_CMD_DELETEQ:
 
 2902         c->cmd = PROTOCOL_BINARY_CMD_DELETE;
 
 2904     case PROTOCOL_BINARY_CMD_INCREMENTQ:
 
 2905         c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
 
 2907     case PROTOCOL_BINARY_CMD_DECREMENTQ:
 
 2908         c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
 
 2910     case PROTOCOL_BINARY_CMD_QUITQ:
 
 2911         c->cmd = PROTOCOL_BINARY_CMD_QUIT;
 
 2913     case PROTOCOL_BINARY_CMD_FLUSHQ:
 
 2914         c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
 
 2916     case PROTOCOL_BINARY_CMD_APPENDQ:
 
 2917         c->cmd = PROTOCOL_BINARY_CMD_APPEND;
 
 2919     case PROTOCOL_BINARY_CMD_PREPENDQ:
 
 2920         c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
 
 2922     case PROTOCOL_BINARY_CMD_GETQ:
 
 2923         c->cmd = PROTOCOL_BINARY_CMD_GET;
 
 2925     case PROTOCOL_BINARY_CMD_GETKQ:
 
 2926         c->cmd = PROTOCOL_BINARY_CMD_GETK;
 
 2933         case PROTOCOL_BINARY_CMD_VERSION:
 
 2934             if (extlen == 0 && keylen == 0 && bodylen == 0) {
 
 2935                 write_bin_response(c, VERSION, 0, 0, strlen(VERSION));
 
 2940         case PROTOCOL_BINARY_CMD_FLUSH:
 
 2941             if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
 
 2942                 bin_read_key(c, bin_read_flush_exptime, extlen);
 
 2947         case PROTOCOL_BINARY_CMD_NOOP:
 
 2948             if (extlen == 0 && keylen == 0 && bodylen == 0) {
 
 2949                 write_bin_response(c, NULL, 0, 0, 0);
 
 2954         case PROTOCOL_BINARY_CMD_SET: 
 
 2955         case PROTOCOL_BINARY_CMD_ADD: 
 
 2956         case PROTOCOL_BINARY_CMD_REPLACE:
 
 2957             if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
 
 2958                 bin_read_key(c, bin_reading_set_header, 8);
 
 2963         case PROTOCOL_BINARY_CMD_GETQ:  
 
 2964         case PROTOCOL_BINARY_CMD_GET:   
 
 2965         case PROTOCOL_BINARY_CMD_GETKQ: 
 
 2966         case PROTOCOL_BINARY_CMD_GETK:
 
 2967             if (extlen == 0 && bodylen == keylen && keylen > 0) {
 
 2968                 bin_read_key(c, bin_reading_get_key, 0);
 
 2973         case PROTOCOL_BINARY_CMD_DELETE:
 
 2974             if (keylen > 0 && extlen == 0 && bodylen == keylen) {
 
 2975                 bin_read_key(c, bin_reading_del_header, extlen);
 
 2980         case PROTOCOL_BINARY_CMD_INCREMENT:
 
 2981         case PROTOCOL_BINARY_CMD_DECREMENT:
 
 2982             if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
 
 2983                 bin_read_key(c, bin_reading_incr_header, 20);
 
 2988         case PROTOCOL_BINARY_CMD_APPEND:
 
 2989         case PROTOCOL_BINARY_CMD_PREPEND:
 
 2990             if (keylen > 0 && extlen == 0) {
 
 2991                 bin_read_key(c, bin_reading_set_header, 0);
 
 2996         case PROTOCOL_BINARY_CMD_STAT:
 
 2998                 bin_read_key(c, bin_reading_stat, 0);
 
 3003         case PROTOCOL_BINARY_CMD_QUIT:
 
 3004             if (keylen == 0 && extlen == 0 && bodylen == 0) {
 
 3005                 write_bin_response(c, NULL, 0, 0, 0);
 
 3008                     conn_set_state(c, conn_closing);
 
 3014        case PROTOCOL_BINARY_CMD_TAP_CONNECT:
 
 3016                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, bodylen);
 
 3018                 bin_read_chunk(c, bin_reading_packet,
 
 3019                                c->binary_header.request.bodylen);
 
 3022        case PROTOCOL_BINARY_CMD_TAP_MUTATION:
 
 3023        case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
 
 3024        case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
 
 3025        case PROTOCOL_BINARY_CMD_TAP_DELETE:
 
 3026        case PROTOCOL_BINARY_CMD_TAP_FLUSH:
 
 3027        case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
 
 3028        case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
 
 3030                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, bodylen);
 
 3032                 bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen);
 
 3036         case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
 
 3037             if (extlen == 0 && keylen == 0 && bodylen == 0) {
 
 3038                 bin_list_sasl_mechs(c);
 
 3043         case PROTOCOL_BINARY_CMD_SASL_AUTH:
 
 3044         case PROTOCOL_BINARY_CMD_SASL_STEP:
 
 3045             if (extlen == 0 && keylen != 0) {
 
 3046                 bin_read_key(c, bin_reading_sasl_auth, 0);
 
 3052         case PROTOCOL_BINARY_CMD_VERBOSITY:
 
 3053             if (extlen == 4 && keylen == 0 && bodylen == 4) {
 
 3054                 bin_read_chunk(c, bin_reading_packet,
 
 3055                                c->binary_header.request.bodylen);
 
 3062                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
 
 3065                 bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen);
 
 3070         handle_binary_protocol_error(c);
 
 3073 static void process_bin_update(conn *c) {
 
 3082     key = binary_get_key(c);
 
 3083     nkey = c->binary_header.request.keylen;
 
 3086     req->message.body.flags = req->message.body.flags;
 
 3087     rel_time_t expiration = ntohl(req->message.body.expiration);
 
 3089     vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
 
 3094         if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
 
 3096         } 
else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
 
 3103         nw = key_to_printable_buffer(buffer, 
sizeof(buffer), c->sfd, 
true,
 
 3107             if (snprintf(buffer + nw, 
sizeof(buffer) - nw,
 
 3108                          " Value len is %d\n", vlen)) {
 
 3109                 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c, 
"%s",
 
 3116         stats_prefix_record_set(key, nkey);
 
 3119     ENGINE_ERROR_CODE ret = c->aiostat;
 
 3120     c->aiostat = ENGINE_SUCCESS;
 
 3121     c->ewouldblock = 
false;
 
 3124     if (ret == ENGINE_SUCCESS) {
 
 3128                                            req->message.body.flags,
 
 3133             write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
 
 3139     case ENGINE_SUCCESS:
 
 3140         item_set_cas(c, it, c->binary_header.request.cas);
 
 3143         case PROTOCOL_BINARY_CMD_ADD:
 
 3144             c->store_op = OPERATION_ADD;
 
 3146         case PROTOCOL_BINARY_CMD_SET:
 
 3147             c->store_op = OPERATION_SET;
 
 3149         case PROTOCOL_BINARY_CMD_REPLACE:
 
 3150             c->store_op = OPERATION_REPLACE;
 
 3156         if (c->binary_header.request.cas != 0) {
 
 3157             c->store_op = OPERATION_CAS;
 
 3161         c->
ritem = info.value[0].iov_base;
 
 3163         conn_set_state(c, conn_nread);
 
 3164         c->substate = bin_read_set_value;
 
 3166     case ENGINE_EWOULDBLOCK:
 
 3167         c->ewouldblock = 
true;
 
 3169     case ENGINE_DISCONNECT:
 
 3170         c->state = conn_closing;
 
 3173         if (ret == ENGINE_E2BIG) {
 
 3174             write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
 
 3176             write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
 
 3184         if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
 
 3187                                        ntohll(req->message.header.request.cas),
 
 3188                                        c->binary_header.request.vbucket);
 
 3196 static void process_bin_append_prepend(conn *c) {
 
 3204     key = binary_get_key(c);
 
 3205     nkey = c->binary_header.request.keylen;
 
 3206     vlen = c->binary_header.request.bodylen - nkey;
 
 3209         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 3210                                         "Value len is %d\n", vlen);
 
 3214         stats_prefix_record_set(key, nkey);
 
 3217     ENGINE_ERROR_CODE ret = c->aiostat;
 
 3218     c->aiostat = ENGINE_SUCCESS;
 
 3219     c->ewouldblock = 
false;
 
 3222     if (ret == ENGINE_SUCCESS) {
 
 3229             write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
 
 3235     case ENGINE_SUCCESS:
 
 3236         item_set_cas(c, it, c->binary_header.request.cas);
 
 3239         case PROTOCOL_BINARY_CMD_APPEND:
 
 3240             c->store_op = OPERATION_APPEND;
 
 3242         case PROTOCOL_BINARY_CMD_PREPEND:
 
 3243             c->store_op = OPERATION_PREPEND;
 
 3250         c->
ritem = info.value[0].iov_base;
 
 3252         conn_set_state(c, conn_nread);
 
 3253         c->substate = bin_read_set_value;
 
 3255     case ENGINE_EWOULDBLOCK:
 
 3256         c->ewouldblock = 
true;
 
 3258     case ENGINE_DISCONNECT:
 
 3259         c->state = conn_closing;
 
 3262         if (ret == ENGINE_E2BIG) {
 
 3263             write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
 
 3265             write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
 
 3272 static void process_bin_flush(conn *c) {
 
 3276     if (c->binary_header.request.extlen == 
sizeof(req->message.body)) {
 
 3277         exptime = ntohl(req->message.body.expiration);
 
 3281         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 3282                                         "%d: flush %ld", c->sfd,
 
 3286     ENGINE_ERROR_CODE 
ret;
 
 3289     if (ret == ENGINE_SUCCESS) {
 
 3290         write_bin_response(c, NULL, 0, 0, 0);
 
 3291     } 
else if (ret == ENGINE_ENOTSUP) {
 
 3292         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
 
 3294         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
 
 3296     STATS_NOKEY(c, cmd_flush);
 
 3299 static void process_bin_delete(conn *c) {
 
 3302     char* key = binary_get_key(c);
 
 3303     size_t nkey = c->binary_header.request.keylen;
 
 3309         if (key_to_printable_buffer(buffer, 
sizeof(buffer), c->sfd, 
true,
 
 3310                                     "DELETE", key, nkey) != -1) {
 
 3311             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c, 
"%s\n",
 
 3316     ENGINE_ERROR_CODE ret = c->aiostat;
 
 3317     c->aiostat = ENGINE_SUCCESS;
 
 3318     c->ewouldblock = 
false;
 
 3320     if (ret == ENGINE_SUCCESS) {
 
 3322             stats_prefix_record_delete(key, nkey);
 
 3325                                          ntohll(req->message.header.request.cas),
 
 3326                                          c->binary_header.request.vbucket);
 
 3332     case ENGINE_SUCCESS:
 
 3333         write_bin_response(c, NULL, 0, 0, 0);
 
 3334         SLAB_INCR(c, delete_hits, key, nkey);
 
 3336     case ENGINE_KEY_EEXISTS:
 
 3337         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
 
 3339     case ENGINE_KEY_ENOENT:
 
 3340         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
 
 3341         STATS_INCR(c, delete_misses, key, nkey);
 
 3343     case ENGINE_NOT_MY_VBUCKET:
 
 3344         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
 
 3346     case ENGINE_EWOULDBLOCK:
 
 3347         c->ewouldblock = 
true;
 
 3350         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
 
 3354 static void complete_nread_binary(conn *c) {
 
 3356     assert(c->cmd >= 0);
 
 3358     switch(c->substate) {
 
 3359     case bin_reading_set_header:
 
 3360         if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
 
 3361                 c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
 
 3362             process_bin_append_prepend(c);
 
 3364             process_bin_update(c);
 
 3367     case bin_read_set_value:
 
 3368         complete_update_bin(c);
 
 3370     case bin_reading_get_key:
 
 3373     case bin_reading_stat:
 
 3374         process_bin_stat(c);
 
 3376     case bin_reading_del_header:
 
 3377         process_bin_delete(c);
 
 3379     case bin_reading_incr_header:
 
 3380         complete_incr_bin(c);
 
 3382     case bin_read_flush_exptime:
 
 3383         process_bin_flush(c);
 
 3385     case bin_reading_sasl_auth:
 
 3386         process_bin_sasl_auth(c);
 
 3388     case bin_reading_sasl_auth_data:
 
 3389         process_bin_complete_sasl_auth(c);
 
 3391     case bin_reading_packet:
 
 3392         if (c->binary_header.request.magic == PROTOCOL_BINARY_RES) {
 
 3394             handler = response_handlers[c->binary_header.request.opcode];
 
 3398                 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 3399                        "%d: ERROR: Unsupported response packet received: %u\n",
 
 3400                         c->sfd, (
unsigned int)c->binary_header.request.opcode);
 
 3401                 conn_set_state(c, conn_closing);
 
 3404             process_bin_packet(c);
 
 3408         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 3409                 "Not handling substate %d\n", c->substate);
 
 3414 static void reset_cmd_handler(conn *c) {
 
 3418     c->substate = bin_no_state;
 
 3419     if(c->
item != NULL) {
 
 3425         conn_set_state(c, conn_parse_cmd);
 
 3427         conn_set_state(c, conn_waiting);
 
 3431 static ENGINE_ERROR_CODE ascii_response_handler(
const void *cookie,
 
 3435     conn *c = (conn*)cookie;
 
 3436     if (!grow_dynamic_buffer(c, nbytes)) {
 
 3438             settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 3439                     "<%d ERROR: Failed to allocate memory for response\n",
 
 3442         return ENGINE_ENOMEM;
 
 3445     char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
 
 3446     memcpy(buf, dta, nbytes);
 
 3447     c->dynamic_buffer.offset += nbytes;
 
 3449     return ENGINE_SUCCESS;
 
 3452 static void complete_nread_ascii(conn *c) {
 
 3454         c->ewouldblock = 
false;
 
 3456                                       ascii_response_handler)) {
 
 3457         case ENGINE_SUCCESS:
 
 3458             if (c->dynamic_buffer.buffer != NULL) {
 
 3459                 write_and_free(c, c->dynamic_buffer.buffer,
 
 3460                                c->dynamic_buffer.offset);
 
 3461                 c->dynamic_buffer.buffer = NULL;
 
 3463                 conn_set_state(c, conn_new_cmd);
 
 3466         case ENGINE_EWOULDBLOCK:
 
 3467             c->ewouldblock = 
true;
 
 3469         case ENGINE_DISCONNECT:
 
 3471             conn_set_state(c, conn_closing);
 
 3474         complete_update_ascii(c);
 
 3478 static void complete_nread(conn *c) {
 
 3480     assert(c->protocol == ascii_prot
 
 3481            || c->protocol == binary_prot);
 
 3483     if (c->protocol == ascii_prot) {
 
 3484         complete_nread_ascii(c);
 
 3485     } 
else if (c->protocol == binary_prot) {
 
 3486         complete_nread_binary(c);
 
 3490 #define COMMAND_TOKEN 0 
 3491 #define SUBCOMMAND_TOKEN 1 
 3494 #define MAX_TOKENS 30 
 3513 static size_t tokenize_command(
char *command, 
token_t *tokens, 
const size_t max_tokens) {
 
 3517     assert(command != NULL && tokens != NULL && max_tokens > 1);
 
 3519     for (s = e = command; ntokens < max_tokens - 1; ++e) {
 
 3522                 tokens[ntokens].value = s;
 
 3523                 tokens[ntokens].length = e - s;
 
 3529         else if (*e == 
'\0') {
 
 3531                 tokens[ntokens].value = s;
 
 3532                 tokens[ntokens].length = e - s;
 
 3544     tokens[ntokens].value =  *e == 
'\0' ? NULL : e;
 
 3545     tokens[ntokens].length = 0;
 
 3551 static void detokenize(
token_t *tokens, 
int ntokens, 
char **out, 
int *nbytes) {
 
 3556     for (i = 0; i < ntokens; ++
i) {
 
 3557         nb += tokens[
i].length;
 
 3560     buf = malloc(nb * 
sizeof(
char));
 
 3563         for (i = 0; i < ntokens; ++
i) {
 
 3564             memcpy(p, tokens[i].value, tokens[i].length);
 
 3565             p += tokens[
i].length;
 
 3577 static void write_and_free(conn *c, 
char *buf, 
int bytes) {
 
 3579         c->write_and_free = 
buf;
 
 3582         conn_set_state(c, conn_write);
 
 3585         out_string(c, 
"SERVER_ERROR out of memory writing stats");
 
 3589 static inline bool set_noreply_maybe(conn *c, 
token_t *tokens, 
size_t ntokens)
 
 3591     int noreply_index = ntokens - 2;
 
 3600     if (tokens[noreply_index].value
 
 3601         && strcmp(tokens[noreply_index].value, 
"noreply") == 0) {
 
 3607 void append_stat(
const char *
name, 
ADD_STAT add_stats, conn *c,
 
 3608                  const char *
fmt, ...) {
 
 3609     char val_str[STAT_VAL_LEN];
 
 3619     vlen = vsnprintf(val_str, 
sizeof(val_str) - 1, fmt, ap);
 
 3622     add_stats(name, strlen(name), val_str, vlen, c);
 
 3625 inline static void process_stats_detail(conn *c, 
const char *command) {
 
 3629         if (strcmp(command, 
"on") == 0) {
 
 3631             out_string(c, 
"OK");
 
 3633         else if (strcmp(command, 
"off") == 0) {
 
 3635             out_string(c, 
"OK");
 
 3637         else if (strcmp(command, 
"dump") == 0) {
 
 3639             char *
stats = stats_prefix_dump(&len);
 
 3640             write_and_free(c, stats, len);
 
 3643             out_string(c, 
"CLIENT_ERROR usage: stats detail on|off|dump");
 
 3647         out_string(c, 
"CLIENT_ERROR detailed stats disabled");
 
 3651 static void aggregate_callback(
void *in, 
void *out) {
 
 3654     threadlocal_stats_aggregate(in_independent_stats->thread_stats,
 
 3659 static void server_stats(
ADD_STAT add_stats, conn *c, 
bool aggregate) {
 
 3660     pid_t pid = getpid();
 
 3661     rel_time_t now = current_time;
 
 3672         threadlocal_stats_aggregate(get_independent_stats(c)->
thread_stats,
 
 3681     getrusage(RUSAGE_SELF, &usage);
 
 3688     APPEND_STAT(
"time", 
"%ld", now + (
long)process_started);
 
 3690     APPEND_STAT(
"libevent", 
"%s", event_get_version());
 
 3691     APPEND_STAT(
"pointer_size", 
"%d", (
int)(8 * 
sizeof(
void *)));
 
 3694     append_stat(
"rusage_user", add_stats, c, 
"%ld.%06ld",
 
 3695                 (
long)usage.ru_utime.tv_sec,
 
 3696                 (
long)usage.ru_utime.tv_usec);
 
 3697     append_stat(
"rusage_system", add_stats, c, 
"%ld.%06ld",
 
 3698                 (
long)usage.ru_stime.tv_sec,
 
 3699                 (
long)usage.ru_stime.tv_usec);
 
 3725     APPEND_STAT(
"accepting_conns", 
"%u",  is_listen_disabled() ? 0 : 1);
 
 3726     APPEND_STAT(
"listen_disabled_num", 
"%"PRIu64, get_listen_disabled_num());
 
 3727     APPEND_STAT(
"rejected_conns", 
"%" PRIu64, (
unsigned long long)
stats.rejected_conns);
 
 3740     if (ts.sent.connect) {
 
 3741         APPEND_STAT(
"tap_connect_sent", 
"%"PRIu64, ts.sent.connect);
 
 3743     if (ts.sent.mutation) {
 
 3744         APPEND_STAT(
"tap_mutation_sent", 
"%"PRIu64, ts.sent.mutation);
 
 3746     if (ts.sent.checkpoint_start) {
 
 3747         APPEND_STAT(
"tap_checkpoint_start_sent", 
"%"PRIu64, ts.sent.checkpoint_start);
 
 3749     if (ts.sent.checkpoint_end) {
 
 3750         APPEND_STAT(
"tap_checkpoint_end_sent", 
"%"PRIu64, ts.sent.checkpoint_end);
 
 3752     if (ts.sent.delete) {
 
 3753         APPEND_STAT(
"tap_delete_sent", 
"%"PRIu64, ts.sent.delete);
 
 3755     if (ts.sent.flush) {
 
 3756         APPEND_STAT(
"tap_flush_sent", 
"%"PRIu64, ts.sent.flush);
 
 3758     if (ts.sent.opaque) {
 
 3759         APPEND_STAT(
"tap_opaque_sent", 
"%"PRIu64, ts.sent.opaque);
 
 3761     if (ts.sent.vbucket_set) {
 
 3763                     ts.sent.vbucket_set);
 
 3765     if (ts.received.connect) {
 
 3766         APPEND_STAT(
"tap_connect_received", 
"%"PRIu64, ts.received.connect);
 
 3768     if (ts.received.mutation) {
 
 3769         APPEND_STAT(
"tap_mutation_received", 
"%"PRIu64, ts.received.mutation);
 
 3771     if (ts.received.checkpoint_start) {
 
 3772         APPEND_STAT(
"tap_checkpoint_start_received", 
"%"PRIu64, ts.received.checkpoint_start);
 
 3774     if (ts.received.checkpoint_end) {
 
 3775         APPEND_STAT(
"tap_checkpoint_end_received", 
"%"PRIu64, ts.received.checkpoint_end);
 
 3777     if (ts.received.delete) {
 
 3778         APPEND_STAT(
"tap_delete_received", 
"%"PRIu64, ts.received.delete);
 
 3780     if (ts.received.flush) {
 
 3781         APPEND_STAT(
"tap_flush_received", 
"%"PRIu64, ts.received.flush);
 
 3783     if (ts.received.opaque) {
 
 3784         APPEND_STAT(
"tap_opaque_received", 
"%"PRIu64, ts.received.opaque);
 
 3786     if (ts.received.vbucket_set) {
 
 3787         APPEND_STAT(
"tap_vbucket_set_received", 
"%"PRIu64,
 
 3788                     ts.received.vbucket_set);
 
 3792 static void process_stat_settings(
ADD_STAT add_stats, 
void *c) {
 
 3811                 settings.detail_enabled ? 
"yes" : 
"no");
 
 3813                 settings.allow_detailed ? 
"yes" : 
"no");
 
 3819                 prot_text(
settings.binding_protocol));
 
 3828 #elif defined(ENABLE_SASL) 
 3848         APPEND_STAT(
"ascii_extension", 
"%s", ptr->get_name(ptr->cookie));
 
 3852 static char *process_stat(conn *c, 
token_t *tokens, 
const size_t ntokens) {
 
 3853     const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
 
 3854     c->dynamic_buffer.offset = 0;
 
 3857         ENGINE_ERROR_CODE ret = c->aiostat;
 
 3858         c->aiostat = ENGINE_SUCCESS;
 
 3859         c->ewouldblock = 
false;
 
 3860         if (ret == ENGINE_SUCCESS) {
 
 3861             server_stats(&append_stats, c, 
false);
 
 3863                                                 NULL, 0, &append_stats);
 
 3864             if (ret == ENGINE_EWOULDBLOCK) {
 
 3865                 c->ewouldblock = 
true;
 
 3866                 return c->
rcurr + 5;
 
 3869     } 
else if (strcmp(subcommand, 
"reset") == 0) {
 
 3871         out_string(c, 
"RESET");
 
 3873     } 
else if (strcmp(subcommand, 
"detail") == 0) {
 
 3876             process_stats_detail(c, 
"");  
 
 3878             process_stats_detail(c, tokens[2].value);
 
 3882     } 
else if (strcmp(subcommand, 
"settings") == 0) {
 
 3883         process_stat_settings(&append_stats, c);
 
 3884     } 
else if (strcmp(subcommand, 
"cachedump") == 0) {
 
 3886         unsigned int bytes = 0, 
id, 
limit = 0;
 
 3889             out_string(c, 
"CLIENT_ERROR bad command line");
 
 3893         if (!safe_strtoul(tokens[2].value, &
id) ||
 
 3894             !safe_strtoul(tokens[3].value, &limit)) {
 
 3895             out_string(c, 
"CLIENT_ERROR bad command line format");
 
 3899         if (
id >= POWER_LARGEST) {
 
 3900             out_string(c, 
"CLIENT_ERROR Illegal slab id");
 
 3905         buf = item_cachedump(
id, limit, &bytes);
 
 3907         write_and_free(c, buf, bytes);
 
 3909     } 
else if (strcmp(subcommand, 
"aggregate") == 0) {
 
 3910         server_stats(&append_stats, c, 
true);
 
 3911     } 
else if (strcmp(subcommand, 
"topkeys") == 0) {
 
 3912         topkeys_t *tk = get_independent_stats(c)->topkeys;
 
 3914             topkeys_stats(tk, c, current_time, append_stats);
 
 3916             out_string(c, 
"ERROR");
 
 3922         ENGINE_ERROR_CODE ret = c->aiostat;
 
 3923         c->aiostat = ENGINE_SUCCESS;
 
 3924         c->ewouldblock = 
false;
 
 3925         if (ret == ENGINE_SUCCESS) {
 
 3928             detokenize(&tokens[1], ntokens - 2, &buf, &nb);
 
 3935         case ENGINE_SUCCESS:
 
 3936             append_stats(NULL, 0, NULL, 0, c);
 
 3937             write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
 
 3938             c->dynamic_buffer.buffer = NULL;
 
 3941             out_string(c, 
"SERVER_ERROR out of memory writing stats");
 
 3943         case ENGINE_DISCONNECT:
 
 3944             c->state = conn_closing;
 
 3946         case ENGINE_ENOTSUP:
 
 3947             out_string(c, 
"SERVER_ERROR not supported");
 
 3949         case ENGINE_EWOULDBLOCK:
 
 3950             c->ewouldblock = 
true;
 
 3951             return tokens[SUBCOMMAND_TOKEN].value;
 
 3953             out_string(c, 
"ERROR");
 
 3961     append_stats(NULL, 0, NULL, 0, c);
 
 3963     if (c->dynamic_buffer.buffer == NULL) {
 
 3964         out_string(c, 
"SERVER_ERROR out of memory writing stats");
 
 3966         write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
 
 3967         c->dynamic_buffer.buffer = NULL;
 
 3978 static char *get_suffix_buffer(conn *c) {
 
 3979     if (c->suffixleft == c->suffixsize) {
 
 3980         char **new_suffix_list;
 
 3981         size_t sz = 
sizeof(
char*) * c->suffixsize * 2;
 
 3983         new_suffix_list = realloc(c->suffixlist, sz);
 
 3984         if (new_suffix_list) {
 
 3986             c->suffixlist = new_suffix_list;
 
 3989                 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 3990                         "=%d Failed to resize suffix buffer\n", c->sfd);
 
 3997     char *suffix = cache_alloc(c->thread->suffix_cache);
 
 3998     if (suffix != NULL) {
 
 3999         *(c->suffixlist + c->suffixleft) = suffix;
 
 4007 static inline char* process_get_command(conn *c, 
token_t *tokens, 
size_t ntokens, 
bool return_cas) {
 
 4012     token_t *key_token = &tokens[KEY_TOKEN];
 
 4016         while(key_token->length != 0) {
 
 4018             key = key_token->value;
 
 4019             nkey = key_token->length;
 
 4022                 out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4026             ENGINE_ERROR_CODE ret = c->aiostat;
 
 4027             c->aiostat = ENGINE_SUCCESS;
 
 4029             if (ret == ENGINE_SUCCESS) {
 
 4034             case ENGINE_EWOULDBLOCK:
 
 4035                 c->ewouldblock = 
true;
 
 4039             case ENGINE_SUCCESS:
 
 4041             case ENGINE_KEY_ENOENT:
 
 4048                 stats_prefix_record_get(key, nkey, NULL != it);
 
 4056                     out_string(c, 
"SERVER_ERROR error getting item data");
 
 4060                 if (i >= c->isize) {
 
 4061                     item **new_list = realloc(c->ilist, 
sizeof(item *) * c->isize * 2);
 
 4064                         c->ilist = new_list;
 
 4072                 char *suffix = get_suffix_buffer(c);
 
 4073                 if (suffix == NULL) {
 
 4074                     out_string(c, 
"SERVER_ERROR out of memory rebuilding suffix");
 
 4078                 int suffix_len = snprintf(suffix, SUFFIX_SIZE,
 
 4079                                           " %u %u\r\n", htonl(info.
flags),
 
 4090                 MEMCACHED_COMMAND_GET(c->sfd, info.
key, info.
nkey,
 
 4095                   char *cas = get_suffix_buffer(c);
 
 4097                     out_string(c, 
"SERVER_ERROR out of memory making CAS suffix");
 
 4101                   int cas_len = snprintf(cas, SUFFIX_SIZE, 
" %"PRIu64
"\r\n",
 
 4103                   if (add_iov(c, 
"VALUE ", 6) != 0 ||
 
 4104                       add_iov(c, info.
key, info.
nkey) != 0 ||
 
 4105                       add_iov(c, suffix, suffix_len - 2) != 0 ||
 
 4106                       add_iov(c, cas, cas_len) != 0 ||
 
 4107                       add_iov(c, info.value[0].iov_base, info.value[0].iov_len) != 0 ||
 
 4108                       add_iov(c, 
"\r\n", 2) != 0)
 
 4116                   if (add_iov(c, 
"VALUE ", 6) != 0 ||
 
 4117                       add_iov(c, info.
key, info.
nkey) != 0 ||
 
 4118                       add_iov(c, suffix, suffix_len) != 0 ||
 
 4119                       add_iov(c, info.value[0].iov_base, info.value[0].iov_len) != 0 ||
 
 4120                       add_iov(c, 
"\r\n", 2) != 0)
 
 4129                     settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 4130                                                     ">%d sending key %s\n",
 
 4135                 STATS_HIT(c, 
get, key, nkey);
 
 4136                 *(c->ilist + 
i) = it;
 
 4140                 STATS_MISS(c, 
get, key, nkey);
 
 4141                 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
 
 4151         if(key_token->value != NULL) {
 
 4152             ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
 
 4156     } 
while(key_token->value != NULL);
 
 4158     c->icurr = c->ilist;
 
 4160     c->suffixcurr = c->suffixlist;
 
 4163         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 4164                                         ">%d END\n", c->sfd);
 
 4172     if (key_token->value != NULL || add_iov(c, 
"END\r\n", 5) != 0
 
 4173         || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
 
 4174         out_string(c, 
"SERVER_ERROR out of memory writing get response");
 
 4177         conn_set_state(c, conn_mwrite);
 
 4184 static void process_update_command(conn *c, 
token_t *tokens, 
const size_t ntokens, ENGINE_STORE_OPERATION store_op, 
bool handle_cas) {
 
 4188     int32_t exptime_int = 0;
 
 4191     uint64_t req_cas_id=0;
 
 4196     set_noreply_maybe(c, tokens, ntokens);
 
 4199         out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4203     key = tokens[KEY_TOKEN].value;
 
 4204     nkey = tokens[KEY_TOKEN].length;
 
 4206     if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
 
 4207            && safe_strtol(tokens[3].value, &exptime_int)
 
 4208            && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
 
 4209         out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4214     exptime = exptime_int;
 
 4218         if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
 
 4219             out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4225         out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4230         stats_prefix_record_set(key, nkey);
 
 4233     ENGINE_ERROR_CODE ret = c->aiostat;
 
 4234     c->aiostat = ENGINE_SUCCESS;
 
 4235     c->ewouldblock = 
false;
 
 4237     if (ret == ENGINE_SUCCESS) {
 
 4240                                            vlen, htonl(flags), exptime);
 
 4245     case ENGINE_SUCCESS:
 
 4246         item_set_cas(c, it, req_cas_id);
 
 4249             out_string(c, 
"SERVER_ERROR error getting item data");
 
 4253         c->
ritem = info.value[0].iov_base;
 
 4255         c->store_op = store_op;
 
 4256         conn_set_state(c, conn_nread);
 
 4258     case ENGINE_EWOULDBLOCK:
 
 4259         c->ewouldblock = 
true;
 
 4261     case ENGINE_DISCONNECT:
 
 4262         c->state = conn_closing;
 
 4265         if (ret == ENGINE_E2BIG) {
 
 4266             out_string(c, 
"SERVER_ERROR object too large for cache");
 
 4268             out_string(c, 
"SERVER_ERROR out of memory storing object");
 
 4272         c->sbytes = vlen + 2;
 
 4276         if (store_op == OPERATION_SET) {
 
 4282 static char* process_arithmetic_command(conn *c, 
token_t *tokens, 
const size_t ntokens, 
const bool incr) {
 
 4290     set_noreply_maybe(c, tokens, ntokens);
 
 4293         out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4297     key = tokens[KEY_TOKEN].value;
 
 4298     nkey = tokens[KEY_TOKEN].length;
 
 4300     if (!safe_strtoull(tokens[2].value, &delta)) {
 
 4301         out_string(c, 
"CLIENT_ERROR invalid numeric delta argument");
 
 4305     ENGINE_ERROR_CODE ret = c->aiostat;
 
 4306     c->aiostat = ENGINE_SUCCESS;
 
 4309     if (ret == ENGINE_SUCCESS) {
 
 4311                                              incr, 
false, delta, 0, 0, &cas,
 
 4317     case ENGINE_SUCCESS:
 
 4319             STATS_INCR(c, incr_hits, key, nkey);
 
 4321             STATS_INCR(c, decr_hits, key, nkey);
 
 4323         snprintf(temp, 
sizeof(temp), 
"%"PRIu64, result);
 
 4324         out_string(c, temp);
 
 4326     case ENGINE_KEY_ENOENT:
 
 4328             STATS_INCR(c, incr_misses, key, nkey);
 
 4330             STATS_INCR(c, decr_misses, key, nkey);
 
 4332         out_string(c, 
"NOT_FOUND");
 
 4335         out_string(c, 
"SERVER_ERROR out of memory");
 
 4337     case ENGINE_TMPFAIL:
 
 4338         out_string(c, 
"SERVER_ERROR temporary failure");
 
 4341         out_string(c, 
"CLIENT_ERROR cannot increment or decrement non-numeric value");
 
 4343     case ENGINE_NOT_STORED:
 
 4344         out_string(c, 
"SERVER_ERROR failed to store item");
 
 4346     case ENGINE_DISCONNECT:
 
 4347         c->state = conn_closing;
 
 4349     case ENGINE_ENOTSUP:
 
 4350         out_string(c, 
"SERVER_ERROR not supported");
 
 4352     case ENGINE_EWOULDBLOCK:
 
 4353         c->ewouldblock = 
true;
 
 4362 static char *process_delete_command(conn *c, 
token_t *tokens,
 
 4363                                     const size_t ntokens) {
 
 4370         bool hold_is_zero = strcmp(tokens[KEY_TOKEN+1].value, 
"0") == 0;
 
 4371         bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
 
 4372         bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
 
 4373             || (ntokens == 5 && hold_is_zero && sets_noreply);
 
 4375             out_string(c, 
"CLIENT_ERROR bad command line format.  " 
 4376                        "Usage: delete <key> [noreply]");
 
 4381     key = tokens[KEY_TOKEN].value;
 
 4382     nkey = tokens[KEY_TOKEN].length;
 
 4385         out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4389     ENGINE_ERROR_CODE ret = c->aiostat;
 
 4390     c->aiostat = ENGINE_SUCCESS;
 
 4391     c->ewouldblock = 
false;
 
 4392     if (ret == ENGINE_SUCCESS) {
 
 4400     case ENGINE_SUCCESS:
 
 4401         out_string(c, 
"DELETED");
 
 4402         SLAB_INCR(c, delete_hits, key, nkey);
 
 4404     case ENGINE_EWOULDBLOCK:
 
 4405         c->ewouldblock = 
true;
 
 4407     case ENGINE_TMPFAIL:
 
 4408         out_string(c, 
"SERVER_ERROR temporary failure");
 
 4411         out_string(c, 
"NOT_FOUND");
 
 4412         STATS_INCR(c, delete_misses, key, nkey);
 
 4415     if (ret != ENGINE_EWOULDBLOCK && 
settings.detail_enabled) {
 
 4416         stats_prefix_record_delete(key, nkey);
 
 4421 static char *process_bind_command(conn *c, 
token_t *tokens,
 
 4422                                   const size_t ntokens) {
 
 4429         out_string(c, 
"CLIENT_ERROR bad command line format.  " 
 4430                       "Usage: bind <table_id_name>");
 
 4434     name = tokens[KEY_TOKEN].value;
 
 4435     name_len = tokens[KEY_TOKEN].length;
 
 4438         out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4442     ENGINE_ERROR_CODE ret = c->aiostat;
 
 4443     c->aiostat = ENGINE_SUCCESS;
 
 4444     c->ewouldblock = 
false;
 
 4445     if (ret == ENGINE_SUCCESS) {
 
 4453     case ENGINE_SUCCESS:
 
 4454         out_string(c, 
"SUCCEED");
 
 4456     case ENGINE_EWOULDBLOCK:
 
 4457         c->ewouldblock = 
true;
 
 4459     case ENGINE_TMPFAIL:
 
 4461         out_string(c, 
"NOT_FOUND");
 
 4467 static void process_verbosity_command(conn *c, 
token_t *tokens, 
const size_t ntokens) {
 
 4472     set_noreply_maybe(c, tokens, ntokens);
 
 4473     if (c->noreply && ntokens == 3) {
 
 4476         out_string(c, 
"ERROR");
 
 4480     if (safe_strtoul(tokens[1].value, &level)) {
 
 4481         settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : 
level;
 
 4482         perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
 
 4483         out_string(c, 
"OK");
 
 4485         out_string(c, 
"ERROR");
 
 4489 static char* process_command(conn *c, 
char *command) {
 
 4498     MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->
rcurr, c->
rbytes);
 
 4501         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 4502                                         "<%d %s\n", c->sfd, command);
 
 4510     if (c->ewouldblock) {
 
 4515         c->ewouldblock = 
false;
 
 4520         if (add_msghdr(c) != 0) {
 
 4521             out_string(c, 
"SERVER_ERROR out of memory preparing response");
 
 4526     ntokens = tokenize_command(command, tokens, MAX_TOKENS);
 
 4528         ((strcmp(tokens[COMMAND_TOKEN].value, 
"get") == 0) ||
 
 4529          (strcmp(tokens[COMMAND_TOKEN].value, 
"bget") == 0))) {
 
 4531         ret = process_get_command(c, tokens, ntokens, 
false);
 
 4533     } 
else if ((ntokens == 6 || ntokens == 7) &&
 
 4534                ((strcmp(tokens[COMMAND_TOKEN].value, 
"add") == 0 && (comm = (
int)OPERATION_ADD)) ||
 
 4535                 (strcmp(tokens[COMMAND_TOKEN].value, 
"set") == 0 && (comm = (
int)OPERATION_SET)) ||
 
 4536                 (strcmp(tokens[COMMAND_TOKEN].value, 
"replace") == 0 && (comm = (
int)OPERATION_REPLACE)) ||
 
 4537                 (strcmp(tokens[COMMAND_TOKEN].value, 
"prepend") == 0 && (comm = (
int)OPERATION_PREPEND)) ||
 
 4538                 (strcmp(tokens[COMMAND_TOKEN].value, 
"append") == 0 && (comm = (
int)OPERATION_APPEND)) )) {
 
 4540         process_update_command(c, tokens, ntokens, (ENGINE_STORE_OPERATION)comm, 
false);
 
 4542     } 
else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, 
"cas") == 0 && (comm = (
int)OPERATION_CAS))) {
 
 4544         process_update_command(c, tokens, ntokens, (ENGINE_STORE_OPERATION)comm, 
true);
 
 4546     } 
else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, 
"incr") == 0)) {
 
 4548         ret = process_arithmetic_command(c, tokens, ntokens, 1);
 
 4550     } 
else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, 
"gets") == 0)) {
 
 4552         ret = process_get_command(c, tokens, ntokens, 
true);
 
 4554     } 
else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, 
"decr") == 0)) {
 
 4556         ret = process_arithmetic_command(c, tokens, ntokens, 0);
 
 4558     } 
else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, 
"delete") == 0)) {
 
 4560         ret = process_delete_command(c, tokens, ntokens);
 
 4562     } 
else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, 
"bind") == 0)) {
 
 4564         ret = process_bind_command(c, tokens, ntokens);
 
 4566     } 
else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, 
"stats") == 0)) {
 
 4568         ret = process_stat(c, tokens, ntokens);
 
 4570     } 
else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, 
"flush_all") == 0)) {
 
 4573         set_noreply_maybe(c, tokens, ntokens);
 
 4575         if (ntokens == (c->noreply ? 3 : 2)) {
 
 4578             exptime = strtol(tokens[1].value, NULL, 10);
 
 4579             if(errno == ERANGE) {
 
 4580                 out_string(c, 
"CLIENT_ERROR bad command line format");
 
 4585         ENGINE_ERROR_CODE ret = c->aiostat;
 
 4586         c->aiostat = ENGINE_SUCCESS;
 
 4587         c->ewouldblock = 
false;
 
 4588         if (ret == ENGINE_SUCCESS) {
 
 4593         case  ENGINE_SUCCESS:
 
 4594             out_string(c, 
"OK");
 
 4596         case ENGINE_ENOTSUP:
 
 4597             out_string(c, 
"SERVER_ERROR not supported");
 
 4599         case ENGINE_EWOULDBLOCK:
 
 4600             c->ewouldblock = 
true;
 
 4601             return c->
rcurr + 9;
 
 4603             out_string(c, 
"SERVER_ERROR failed to flush cache");
 
 4606         if (ret != ENGINE_EWOULDBLOCK) {
 
 4607             STATS_NOKEY(c, cmd_flush);
 
 4611     } 
else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, 
"version") == 0)) {
 
 4613         out_string(c, 
"VERSION " VERSION);
 
 4615     } 
else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, 
"quit") == 0)) {
 
 4617         conn_set_state(c, conn_closing);
 
 4619     } 
else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, 
"verbosity") == 0)) {
 
 4620         process_verbosity_command(c, tokens, ntokens);
 
 4621     } 
else if (
settings.extensions.ascii != NULL) {
 
 4627             if (ntokens == MAX_TOKENS) {
 
 4628                 out_string(c, 
"ERROR too many arguments");
 
 4632             if (tokens[ntokens - 1].length == 0) {
 
 4637         for (cmd = 
settings.extensions.ascii; cmd != NULL; cmd = cmd->
next) {
 
 4638             if (cmd->
accept(cmd->
cookie, c, ntokens, tokens, &nbytes, &ptr)) {
 
 4644             out_string(c, 
"ERROR unknown command");
 
 4645         } 
else if (nbytes == 0) {
 
 4647                                  ascii_response_handler)) {
 
 4648             case ENGINE_SUCCESS:
 
 4649                 if (c->dynamic_buffer.buffer != NULL) {
 
 4650                     write_and_free(c, c->dynamic_buffer.buffer,
 
 4651                                    c->dynamic_buffer.offset);
 
 4652                     c->dynamic_buffer.buffer = NULL;
 
 4654                     conn_set_state(c, conn_new_cmd);
 
 4657             case ENGINE_EWOULDBLOCK:
 
 4658                 c->ewouldblock = 
true;
 
 4659                 ret = tokens[KEY_TOKEN].value;;
 
 4661             case ENGINE_DISCONNECT:
 
 4663                 conn_set_state(c, conn_closing);
 
 4671             conn_set_state(c, conn_nread);
 
 4674         out_string(c, 
"ERROR");
 
 4682 static int try_read_command(conn *c) {
 
 4687     if (c->protocol == negotiating_prot || c->transport == udp_transport)  {
 
 4688         if ((
unsigned char)c->
rbuf[0] == (
unsigned char)PROTOCOL_BINARY_REQ) {
 
 4689             c->protocol = binary_prot;
 
 4691             c->protocol = ascii_prot;
 
 4695             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 4696                     "%d: Client using the %s protocol\n", c->sfd,
 
 4697                     prot_text(c->protocol));
 
 4701     if (c->protocol == binary_prot) {
 
 4703         if (c->
rbytes < 
sizeof(c->binary_header)) {
 
 4708             if (((
long)(c->
rcurr)) % 8 != 0) {
 
 4713                     settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 4714                              "%d: Realign input buffer\n", c->sfd);
 
 4725                 nw = bytes_to_output_string(buffer, 
sizeof(buffer), c->sfd,
 
 4726                                             true, 
"Read binary protocol data:",
 
 4727                                             (
const char*)req->bytes,
 
 4728                                             sizeof(req->bytes));
 
 4730                     settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 4735             c->binary_header = *req;
 
 4736             c->binary_header.request.keylen = ntohs(req->request.keylen);
 
 4737             c->binary_header.request.bodylen = ntohl(req->request.bodylen);
 
 4738             c->binary_header.request.vbucket = ntohs(req->request.vbucket);
 
 4739             c->binary_header.request.cas = ntohll(req->request.cas);
 
 4742             if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ &&
 
 4743                 !(c->binary_header.request.magic == PROTOCOL_BINARY_RES &&
 
 4744                   response_handlers[c->binary_header.request.opcode])) {
 
 4746                     if (c->binary_header.request.magic != PROTOCOL_BINARY_RES) {
 
 4747                         settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 4748                               "%d: Invalid magic:  %x\n", c->sfd,
 
 4749                               c->binary_header.request.magic);
 
 4751                         settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 4752                               "%d: ERROR: Unsupported response packet received: %u\n",
 
 4753                               c->sfd, (
unsigned int)c->binary_header.request.opcode);
 
 4757                 conn_set_state(c, conn_closing);
 
 4764             if (add_msghdr(c) != 0) {
 
 4765                 out_string(c, 
"SERVER_ERROR out of memory");
 
 4769             c->cmd = c->binary_header.request.opcode;
 
 4770             c->keylen = c->binary_header.request.keylen;
 
 4771             c->opaque = c->binary_header.request.opaque;
 
 4775             dispatch_bin_command(c);
 
 4777             c->
rbytes -= 
sizeof(c->binary_header);
 
 4778             c->
rcurr += 
sizeof(c->binary_header);
 
 4781         char *el, *cont, *left, lb;
 
 4794                 char *ptr = c->
rcurr;
 
 4795                 while (*ptr == 
' ') { 
 
 4799                 if (ptr - c->
rcurr > 100 ||
 
 4800                     (strncmp(ptr, 
"get ", 4) && strncmp(ptr, 
"gets ", 5))) {
 
 4802                     conn_set_state(c, conn_closing);
 
 4810         if ((el - c->
rcurr) > 1 && *(el - 1) == 
'\r') {
 
 4819         LOCK_THREAD(thread);
 
 4820         left = process_command(c, c->
rcurr);
 
 4821         if (c->ewouldblock) {
 
 4822             unregister_event(c);
 
 4824         UNLOCK_THREAD(thread);
 
 4832             assert (left <= el);
 
 4834             int count = strlen(c->
rcurr);
 
 4835             if ((c->
rcurr + count) == left) {
 
 4839                 left -= (count + 1);
 
 4841                 assert(cont >= c->
rcurr);
 
 4842                 if (cont > c->
rcurr) {
 
 4843                     memmove(cont, c->
rcurr, count);
 
 4848             while ((left = memchr(left, 
'\0', el - left)) != NULL) {
 
 4866 static enum try_read_result try_read_udp(conn *c) {
 
 4871     c->request_addr_size = 
sizeof(c->request_addr);
 
 4872     res = recvfrom(c->sfd, c->
rbuf, c->
rsize,
 
 4873                    0, (
struct sockaddr *)&c->request_addr, &c->request_addr_size);
 
 4875         unsigned char *buf = (
unsigned char *)c->
rbuf;
 
 4876         STATS_ADD(c, bytes_read, res);
 
 4879         c->request_id = buf[0] * 256 + buf[1];
 
 4882         if (buf[4] != 0 || buf[5] != 1) {
 
 4883             out_string(c, 
"SERVER_ERROR multi-packet request not supported");
 
 4884             return READ_NO_DATA_RECEIVED;
 
 4889         memmove(c->
rbuf, c->
rbuf + 8, res);
 
 4893         return READ_DATA_RECEIVED;
 
 4895     return READ_NO_DATA_RECEIVED;
 
 4910 static enum try_read_result try_read_network(conn *c) {
 
 4911     enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
 
 4924             if (num_allocs == 4) {
 
 4928             char *new_rbuf = realloc(c->
rbuf, c->
rsize * 2);
 
 4931                  settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 4932                           "Couldn't realloc input buffer\n");
 
 4935                 out_string(c, 
"SERVER_ERROR out of memory reading request");
 
 4937                 return READ_MEMORY_ERROR;
 
 4944         res = recv(c->sfd, c->
rbuf + c->
rbytes, avail, 0);
 
 4946             STATS_ADD(c, bytes_read, res);
 
 4947             gotdata = READ_DATA_RECEIVED;
 
 4959             if (errno == EAGAIN || errno == EWOULDBLOCK) {
 
 4968 bool register_event(conn *c, 
struct timeval *timeout) {
 
 4970     assert(!c->registered_in_libevent);
 
 4973     if (event_add(&c->event, timeout) == -1) {
 
 4974         settings.extensions.logger->
log(EXTENSION_LOG_WARNING,
 
 4976                                         "Failed to add connection to libevent: %s",
 
 4982     c->registered_in_libevent = 
true;
 
 4988 bool unregister_event(conn *c) {
 
 4990     assert(c->registered_in_libevent);
 
 4993     if (event_del(&c->event) == -1) {
 
 4998     c->registered_in_libevent = 
false;
 
 5005 bool update_event(conn *c, 
const int new_flags) {
 
 5009     if (c->ev_flags == new_flags)
 
 5012     settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
 
 5013                                     "Updated event for %d to read=%s, write=%s\n",
 
 5014                                     c->sfd, (new_flags & EV_READ ? 
"yes" : 
"no"),
 
 5015                                     (new_flags & EV_WRITE ? 
"yes" : 
"no"));
 
 5017     if (!unregister_event(c)) {
 
 5021     event_set(&c->event, c->sfd, new_flags, event_handler, (
void *)c);
 
 5022     event_base_set(base, &c->event);
 
 5023     c->ev_flags = new_flags;
 
 5025     return register_event(c, NULL);
 
 5037 static enum transmit_result transmit(conn *c) {
 
 5040     if (c->msgcurr < c->msgused &&
 
 5041             c->msglist[c->msgcurr].msg_iovlen == 0) {
 
 5045     if (c->msgcurr < c->msgused) {
 
 5047         struct msghdr *m = &c->msglist[c->msgcurr];
 
 5049         res = sendmsg(c->sfd, m, 0);
 
 5051             STATS_ADD(c, bytes_written, res);
 
 5055             while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
 
 5056                 res -= m->msg_iov->iov_len;
 
 5064                 m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
 
 5065                 m->msg_iov->iov_len -= res;
 
 5067             return TRANSMIT_INCOMPLETE;
 
 5069         if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
 
 5070             if (!update_event(c, EV_WRITE | EV_PERSIST)) {
 
 5072                     settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 5073                             "Couldn't update event\n");
 
 5075                 conn_set_state(c, conn_closing);
 
 5076                 return TRANSMIT_HARD_ERROR;
 
 5078             return TRANSMIT_SOFT_ERROR;
 
 5083             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 5084                                             "Failed to write, and not due to blocking: %s",
 
 5088         if (IS_UDP(c->transport))
 
 5089             conn_set_state(c, conn_read);
 
 5091             conn_set_state(c, conn_closing);
 
 5092         return TRANSMIT_HARD_ERROR;
 
 5094         return TRANSMIT_COMPLETE;
 
 5098 bool conn_listening(conn *c)
 
 5101     struct sockaddr_storage addr;
 
 5102     socklen_t addrlen = 
sizeof(addr);
 
 5104     if ((sfd = accept(c->sfd, (
struct sockaddr *)&addr, &addrlen)) == -1) {
 
 5105         if (errno == EMFILE) {
 
 5107                 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5108                                                 "Too many open connections\n");
 
 5111         } 
else if (errno != EAGAIN && errno != EWOULDBLOCK) {
 
 5112             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 5113                                             "Failed to accept new client: %s\n",
 
 5121     int curr_conns = ++
stats.curr_conns;
 
 5124     if (curr_conns >= 
settings.maxconns) {
 
 5126         ++
stats.rejected_conns;
 
 5130             settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5131                                             "Too many open connections\n");
 
 5138     if (evutil_make_socket_nonblocking(sfd) == -1) {
 
 5143     dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
 
 5144                       DATA_BUFFER_SIZE, tcp_transport);
 
 5160 bool conn_ship_log(conn *c) {
 
 5163     if (c->sfd == INVALID_SOCKET) {
 
 5167     short mask = EV_READ | EV_PERSIST | EV_WRITE;
 
 5169     if (c->which & EV_READ || c->
rbytes > 0) {
 
 5171             if (try_read_command(c) == 0) {
 
 5172                 conn_set_state(c, conn_read);
 
 5175             conn_set_state(c, conn_read);
 
 5189         c->nevents = 
settings.reqs_per_tap_event;
 
 5190     } 
else if (c->which & EV_WRITE) {
 
 5192         if (c->nevents >= 0) {
 
 5193             LOCK_THREAD(c->thread);
 
 5194             c->ewouldblock = 
false;
 
 5196             if (c->ewouldblock) {
 
 5197                 mask = EV_READ | EV_PERSIST;
 
 5201             UNLOCK_THREAD(c->thread);
 
 5205     if (!update_event(c, mask)) {
 
 5207             settings.extensions.logger->
log(EXTENSION_LOG_INFO,
 
 5208                                             c, 
"Couldn't update event\n");
 
 5210         conn_set_state(c, conn_closing);
 
 5216 bool conn_waiting(conn *c) {
 
 5217     if (!update_event(c, EV_READ | EV_PERSIST)) {
 
 5219             settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5220                                             "Couldn't update event\n");
 
 5222         conn_set_state(c, conn_closing);
 
 5225     conn_set_state(c, conn_read);
 
 5229 bool conn_read(conn *c) {
 
 5230     int res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
 
 5232     case READ_NO_DATA_RECEIVED:
 
 5233         conn_set_state(c, conn_waiting);
 
 5235     case READ_DATA_RECEIVED:
 
 5236         conn_set_state(c, conn_parse_cmd);
 
 5239         conn_set_state(c, conn_closing);
 
 5241     case READ_MEMORY_ERROR: 
 
 5249 bool conn_parse_cmd(conn *c) {
 
 5250     if (try_read_command(c) == 0) {
 
 5252         conn_set_state(c, conn_waiting);
 
 5255     return !c->ewouldblock;
 
 5258 bool conn_new_cmd(conn *c) {
 
 5261     if (c->nevents >= 0) {
 
 5262         reset_cmd_handler(c);
 
 5264         STATS_NOKEY(c, conn_yields);
 
 5272             if (!update_event(c, EV_WRITE | EV_PERSIST)) {
 
 5274                     settings.extensions.logger->
log(EXTENSION_LOG_INFO,
 
 5275                                                     c, 
"Couldn't update event\n");
 
 5277                 conn_set_state(c, conn_closing);
 
 5288 bool conn_swallow(conn *c) {
 
 5291     if (c->sbytes == 0) {
 
 5292         conn_set_state(c, conn_new_cmd);
 
 5298         uint32_t tocopy = c->
rbytes > c->sbytes ? c->sbytes : c->
rbytes;
 
 5299         c->sbytes -= tocopy;
 
 5306     res = recv(c->sfd, c->
rbuf, c->
rsize > c->sbytes ? c->sbytes : c->
rsize, 0);
 
 5308         STATS_ADD(c, bytes_read, res);
 
 5313         conn_set_state(c, conn_closing);
 
 5316     if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
 
 5317         if (!update_event(c, EV_READ | EV_PERSIST)) {
 
 5319                 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5320                                                 "Couldn't update event\n");
 
 5322             conn_set_state(c, conn_closing);
 
 5328     if (errno != ENOTCONN && errno != ECONNRESET) {
 
 5330         settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5331                                         "Failed to read, and not due to blocking (%s)\n",
 
 5335     conn_set_state(c, conn_closing);
 
 5341 bool conn_nread(conn *c) {
 
 5347         bool block = c->ewouldblock = 
false;
 
 5354         if (c->ewouldblock) {
 
 5355             unregister_event(c);
 
 5379         STATS_ADD(c, bytes_read, res);
 
 5388         conn_set_state(c, conn_closing);
 
 5392 #ifdef INNODB_MEMCACHED 
 5397     if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || !errno)) {
 
 5399     if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
 
 5401         if (!update_event(c, EV_READ | EV_PERSIST)) {
 
 5403                 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5404                                                 "Couldn't update event\n");
 
 5406             conn_set_state(c, conn_closing);
 
 5412     if (errno != ENOTCONN && errno != ECONNRESET) {
 
 5414         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 5415                                         "Failed to read, and not due to blocking:\n" 
 5417                                         "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
 
 5418                                         errno, strerror(errno),
 
 5422     conn_set_state(c, conn_closing);
 
 5426 bool conn_write(conn *c) {
 
 5432     if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
 
 5433         if (add_iov(c, c->wcurr, c->wbytes) != 0) {
 
 5435                 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5436                                                 "Couldn't build response\n");
 
 5438             conn_set_state(c, conn_closing);
 
 5443     return conn_mwrite(c);
 
 5446 bool conn_mwrite(conn *c) {
 
 5447     if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
 
 5449             settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5450                                             "Failed to build UDP headers\n");
 
 5452         conn_set_state(c, conn_closing);
 
 5456     switch (transmit(c)) {
 
 5457     case TRANSMIT_COMPLETE:
 
 5458         if (c->state == conn_mwrite) {
 
 5459             while (c->ileft > 0) {
 
 5460                 item *it = *(c->icurr);
 
 5465             while (c->suffixleft > 0) {
 
 5466                 char *suffix = *(c->suffixcurr);
 
 5467                 cache_free(c->thread->suffix_cache, suffix);
 
 5472             if(c->protocol == binary_prot) {
 
 5475                 conn_set_state(c, conn_new_cmd);
 
 5477         } 
else if (c->state == conn_write) {
 
 5478             if (c->write_and_free) {
 
 5479                 free(c->write_and_free);
 
 5480                 c->write_and_free = 0;
 
 5485                 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
 
 5486                                                 "Unexpected state %d\n", c->state);
 
 5488             conn_set_state(c, conn_closing);
 
 5492     case TRANSMIT_INCOMPLETE:
 
 5493     case TRANSMIT_HARD_ERROR:
 
 5496     case TRANSMIT_SOFT_ERROR:
 
 5503 bool conn_pending_close(conn *c) {
 
 5504     assert(c->sfd == INVALID_SOCKET);
 
 5505     settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 5506                                     "Awaiting clients to release the cookie (pending close for %p)",
 
 5508     LOCK_THREAD(c->thread);
 
 5509     c->thread->pending_io = list_remove(c->thread->pending_io, c);
 
 5510     if (!list_contains(c->thread->pending_close, c)) {
 
 5511         enlist_conn(c, &c->thread->pending_close);
 
 5513     UNLOCK_THREAD(c->thread);
 
 5519     perform_callbacks(ON_DISCONNECT, NULL, c);
 
 5525     return c->state != conn_pending_close;
 
 5528 bool conn_immediate_close(conn *c) {
 
 5529     settings.extensions.logger->
log(EXTENSION_LOG_DETAIL, c,
 
 5530                                     "Immediate close of %p",
 
 5532     perform_callbacks(ON_DISCONNECT, NULL, c);
 
 5538 bool conn_closing(conn *c) {
 
 5539     if (IS_UDP(c->transport)) {
 
 5545     unregister_event(c);
 
 5547     c->sfd = INVALID_SOCKET;
 
 5549     if (c->refcount > 1) {
 
 5550         conn_set_state(c, conn_pending_close);
 
 5552         conn_set_state(c, conn_immediate_close);
 
 5557 bool conn_add_tap_client(conn *c) {
 
 5561     assert(orig_thread);
 
 5562     assert(orig_thread != tp);
 
 5564     c->ewouldblock = 
true;
 
 5566     unregister_event(c);
 
 5568     LOCK_THREAD(orig_thread);
 
 5570     orig_thread->pending_io = list_remove(orig_thread->pending_io, c);
 
 5571     orig_thread->pending_close = list_remove(orig_thread->pending_close, c);
 
 5575     conn_set_state(c, conn_setup_tap_stream);
 
 5576     settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
 
 5577                                     "Moving %d conn from %p to %p\n",
 
 5578                                     c->sfd, c->thread, tp);
 
 5580     c->event.ev_base = tp->base;
 
 5581     assert(c->next == NULL);
 
 5582     assert(c->list_state == 0);
 
 5583     enlist_conn(c, &tp->pending_io);
 
 5587     UNLOCK_THREAD(orig_thread);
 
 5594 bool conn_setup_tap_stream(conn *c) {
 
 5595     process_bin_tap_connect(c);
 
 5599 void event_handler(
const int fd, 
const short which, 
void *arg) {
 
 5605     if (memcached_shutdown) {
 
 5606         event_base_loopbreak(c->event.ev_base);
 
 5615             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
 
 5616                     "Catastrophic: event fd doesn't match conn fd!\n");
 
 5622     perform_callbacks(ON_SWITCH_CONN, c, c);
 
 5624     c->nevents = 
settings.reqs_per_event;
 
 5625     if (c->state == conn_ship_log) {
 
 5626         c->nevents = 
settings.reqs_per_tap_event;
 
 5632     const size_t max_items = 256;
 
 5633     conn *pending_close[max_items];
 
 5634     size_t n_pending_close = 0;
 
 5637         if (thr->pending_close && thr->last_checked != current_time) {
 
 5638             assert(!has_cycle(thr->pending_close));
 
 5639             thr->last_checked = current_time;
 
 5641             n_pending_close = list_to_array(pending_close, max_items,
 
 5642                                             &thr->pending_close);
 
 5649             settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
 
 5650                                             "%d - Running task: (%s)\n",
 
 5651                                             c->sfd, state_text(c->state));
 
 5652         } 
while (c->state(c));
 
 5654         while (c->state(c)) {
 
 5660     if (n_pending_close > 0) {
 
 5661         for (
size_t i = 0; i < n_pending_close; ++
i) {
 
 5662             conn *ce = pending_close[
i];
 
 5663             if (ce->refcount == 1) {
 
 5664                 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
 
 5665                                                 "OK, time to nuke: %p\n",
 
 5669                 LOCK_THREAD(ce->thread);
 
 5670                 enlist_conn(ce, &ce->thread->pending_close);
 
 5671                 UNLOCK_THREAD(ce->thread);
 
 5678         finalize_list(pending_close, n_pending_close);
 
 5683 static void dispatch_event_handler(
int fd, 
short which, 
void *arg) {
 
 5685     ssize_t nr = recv(fd, buffer, 
sizeof(buffer), 0);
 
 5687     if (nr != -1 && is_listen_disabled()) {
 
 5688         bool enable = 
false;
 
 5689         pthread_mutex_lock(&listen_state.mutex);
 
 5690         listen_state.count -= nr;
 
 5691         if (listen_state.count <= 0) {
 
 5693             listen_state.disabled = 
false;
 
 5695         pthread_mutex_unlock(&listen_state.mutex);
 
 5698             for (next = listen_conn; next; next = next->next) {
 
 5699                 update_event(next, EV_READ | EV_PERSIST);
 
 5700                 if (listen(next->sfd, 
settings.backlog) != 0) {
 
 5701                     settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5712 static SOCKET new_socket(
struct addrinfo *ai) {
 
 5715     sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
 
 5716     if (sfd == INVALID_SOCKET) {
 
 5717         return INVALID_SOCKET;
 
 5720     if (evutil_make_socket_nonblocking(sfd) == -1) {
 
 5722         return INVALID_SOCKET;
 
 5732 static void maximize_sndbuf(
const int sfd) {
 
 5733     socklen_t intsize = 
sizeof(int);
 
 5739     if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (
void *)&old_size, &intsize) != 0) {
 
 5741             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5742                                             "getsockopt(SO_SNDBUF): %s",
 
 5751     max = MAX_SENDBUF_SIZE;
 
 5753     while (min <= max) {
 
 5754         avg = ((
unsigned int)(min + max)) / 2;
 
 5755         if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (
void *)&avg, intsize) == 0) {
 
 5764         settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
 
 5765                  "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
 
 5780 static int server_socket(
const char *interface,
 
 5782                          enum network_transport transport,
 
 5783                          FILE *portnumber_file) {
 
 5785     struct linger ling = {0, 0};
 
 5788     struct addrinfo hints = { .ai_flags = AI_PASSIVE,
 
 5789                               .ai_family = AF_UNSPEC };
 
 5790     char port_buf[NI_MAXSERV];
 
 5796     hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
 
 5801     snprintf(port_buf, 
sizeof(port_buf), 
"%d", port);
 
 5802     error= getaddrinfo(interface, port_buf, &hints, &ai);
 
 5804         if (error != EAI_SYSTEM) {
 
 5805             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5806                      "getaddrinfo(): %s\n", gai_strerror(error));
 
 5808             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5809                      "getaddrinfo(): %s\n", strerror(error));
 
 5814     for (next= ai; next; next= next->ai_next) {
 
 5815         conn *listen_conn_add;
 
 5816         if ((sfd = new_socket(next)) == INVALID_SOCKET) {
 
 5824         if (next->ai_family == AF_INET6) {
 
 5825             error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (
char *) &flags, 
sizeof(flags));
 
 5827                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5828                                                 "setsockopt(IPV6_V6ONLY): %s",
 
 5836         setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (
void *)&flags, 
sizeof(flags));
 
 5837         if (IS_UDP(transport)) {
 
 5838             maximize_sndbuf(sfd);
 
 5839             udp_socket[num_udp_socket] = sfd;
 
 5842             error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (
void *)&flags, 
sizeof(flags));
 
 5844                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5845                                                 "setsockopt(SO_KEEPALIVE): %s",
 
 5849             error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (
void *)&ling, 
sizeof(ling));
 
 5851                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5852                                                 "setsockopt(SO_LINGER): %s",
 
 5856             error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (
void *)&flags, 
sizeof(flags));
 
 5858                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5859                                                 "setsockopt(TCP_NODELAY): %s",
 
 5864         if (bind(sfd, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR) {
 
 5865             if (errno != EADDRINUSE) {
 
 5866                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5877             if (!IS_UDP(transport) && listen(sfd, 
settings.backlog) == SOCKET_ERROR) {
 
 5878                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5885             if (portnumber_file != NULL &&
 
 5886                 (next->ai_addr->sa_family == AF_INET ||
 
 5887                  next->ai_addr->sa_family == AF_INET6)) {
 
 5889                     struct sockaddr_in in;
 
 5890                     struct sockaddr_in6 in6;
 
 5892                 socklen_t len = 
sizeof(my_sockaddr);
 
 5893                 if (getsockname(sfd, (
struct sockaddr*)&my_sockaddr, &len)==0) {
 
 5894                     if (next->ai_addr->sa_family == AF_INET) {
 
 5895                         fprintf(portnumber_file, 
"%s INET: %u\n",
 
 5896                                 IS_UDP(transport) ? 
"UDP" : 
"TCP",
 
 5897                                 ntohs(my_sockaddr.in.sin_port));
 
 5899                         fprintf(portnumber_file, 
"%s INET6: %u\n",
 
 5900                                 IS_UDP(transport) ? 
"UDP" : 
"TCP",
 
 5901                                 ntohs(my_sockaddr.in6.sin6_port));
 
 5907         if (IS_UDP(transport)) {
 
 5910             for (c = 0; c < 
settings.num_threads_per_udp; c++) {
 
 5912                 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
 
 5913                                   UDP_READ_BUFFER_SIZE, transport);
 
 5916                 ++
stats.daemon_conns;
 
 5920             if (!(listen_conn_add = conn_new(sfd, conn_listening,
 
 5921                                              EV_READ | EV_PERSIST, 1,
 
 5922                                              transport, main_base, NULL))) {
 
 5923                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5924                         "failed to create listening connection\n");
 
 5927             listen_conn_add->next = listen_conn;
 
 5928             listen_conn = listen_conn_add;
 
 5931             ++
stats.daemon_conns;
 
 5939     return success == 0;
 
 5942 static int server_sockets(
int port, 
enum network_transport transport,
 
 5943                           FILE *portnumber_file) {
 
 5945         return server_socket(
settings.inter, port, transport, portnumber_file);
 
 5950         char *list = strdup(
settings.inter);
 
 5953             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5954                                             "Failed to allocate memory for parsing server interface string\n");
 
 5957         for (
char *p = strtok_r(list, 
";,", &b);
 
 5959              p = strtok_r(NULL, 
";,", &b)) {
 
 5960             int the_port = port;
 
 5962             char *s = strchr(p, 
':');
 
 5966                 if (!safe_strtol(s, &the_port)) {
 
 5967                     settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5968                                                     "Invalid port number: \"%s\"", s);
 
 5972             if (strcmp(p, 
"*") == 0) {
 
 5975             ret |= server_socket(p, the_port, transport, portnumber_file);
 
 5982 static int new_socket_unix(
void) {
 
 5985     if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == INVALID_SOCKET) {
 
 5986         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 5987                                         "socket(AF_UNIX, SOCK_STREAM, 0): %s",
 
 5989         return INVALID_SOCKET;
 
 5992     if (evutil_make_socket_nonblocking(sfd) == -1) {
 
 5994         return INVALID_SOCKET;
 
 6000 static int server_socket_unix(
const char *path, 
int access_mask) {
 
 6002     struct linger ling = {0, 0};
 
 6003     struct sockaddr_un addr;
 
 6012     if ((sfd = new_socket_unix()) == -1) {
 
 6019     if (lstat(path, &tstat) == 0) {
 
 6020         if (S_ISSOCK(tstat.st_mode))
 
 6024     setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (
void *)&flags, 
sizeof(flags));
 
 6025     setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (
void *)&flags, 
sizeof(flags));
 
 6026     setsockopt(sfd, SOL_SOCKET, SO_LINGER, (
void *)&ling, 
sizeof(ling));
 
 6032     memset(&addr, 0, 
sizeof(addr));
 
 6034     addr.sun_family = AF_UNIX;
 
 6035     strncpy(addr.sun_path, path, 
sizeof(addr.sun_path) - 1);
 
 6036     assert(strcmp(addr.sun_path, path) == 0);
 
 6037     old_umask = umask( ~(access_mask&0777));
 
 6038     if (bind(sfd, (
struct sockaddr *)&addr, 
sizeof(addr)) == -1) {
 
 6039         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6047     if (listen(sfd, 
settings.backlog) == -1) {
 
 6048         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6054     if (!(listen_conn = conn_new(sfd, conn_listening,
 
 6055                                  EV_READ | EV_PERSIST, 1,
 
 6056                                  local_transport, main_base, NULL))) {
 
 6057         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6058                  "failed to create listening connection\n");
 
 6062     ++
stats.daemon_conns;
 
 6068 static struct event clockevent;
 
 6071 static void set_current_time(
void) {
 
 6074     gettimeofday(&timer, NULL);
 
 6075     current_time = (rel_time_t) (timer.tv_sec - process_started);
 
 6078 static void clock_handler(
const int fd, 
const short which, 
void *arg) {
 
 6079     struct timeval t = {.tv_sec = 1, .tv_usec = 0};
 
 6080     static bool initialized = 
false;
 
 6082     if (memcached_shutdown) {
 
 6083         event_base_loopbreak(main_base);
 
 6095     event_base_set(main_base, &clockevent);
 
 6101 static void usage(
void) {
 
 6102     printf(PACKAGE 
" " VERSION 
"\n");
 
 6103     printf(
"-p <num>      TCP port number to listen on (default: 11211)\n" 
 6104            "-U <num>      UDP port number to listen on (default: 11211, 0 is off)\n" 
 6105            "-s <file>     UNIX socket path to listen on (disables network support)\n" 
 6106            "-a <mask>     access mask for UNIX socket, in octal (default: 0700)\n" 
 6107            "-l <addr>     interface to listen on (default: INADDR_ANY, all addresses)\n" 
 6108            "              <addr> may be specified as host:port. If you don't specify\n" 
 6109            "              a port number, the value you specified with -p or -U is\n" 
 6110            "              used. You may specify multiple addresses separated by comma\n" 
 6111            "              or by using -l multiple times\n" 
 6112            "-d            run as a daemon\n" 
 6113            "-r            maximize core file limit\n" 
 6114            "-u <username> assume identity of <username> (only when run as root)\n" 
 6115            "-m <num>      max memory to use for items in megabytes (default: 64 MB)\n" 
 6116            "-M            return error on memory exhausted (rather than removing items)\n" 
 6117            "-c <num>      max simultaneous connections (default: 1000)\n" 
 6118            "-k            lock down all paged memory.  Note that there is a\n" 
 6119            "              limit on how much memory you may lock.  Trying to\n" 
 6120            "              allocate more than that would fail, so be sure you\n" 
 6121            "              set the limit correctly for the user you started\n" 
 6122            "              the daemon with (not for -u <username> user;\n" 
 6123            "              under sh this is done with 'ulimit -S -l NUM_KB').\n" 
 6124            "-v            verbose (print errors/warnings while in event loop)\n" 
 6125            "-vv           very verbose (also print client commands/reponses)\n" 
 6126            "-vvv          extremely verbose (also print internal state transitions)\n" 
 6127            "-h            print this help and exit\n" 
 6128            "-i            print memcached and libevent license\n" 
 6129            "-P <file>     save PID in <file>, only used with -d option\n" 
 6130            "-f <factor>   chunk size growth factor (default: 1.25)\n" 
 6131            "-n <bytes>    minimum space allocated for key+value+flags (default: 48)\n");
 
 6132     printf(
"-L            Try to use large memory pages (if available). Increasing\n" 
 6133            "              the memory page size could reduce the number of TLB misses\n" 
 6134            "              and improve the performance. In order to get large pages\n" 
 6135            "              from the OS, memcached will allocate the total item-cache\n" 
 6136            "              in one large chunk.\n");
 
 6137     printf(
"-D <char>     Use <char> as the delimiter between key prefixes and IDs.\n" 
 6138            "              This is used for per-prefix stats reporting. The default is\n" 
 6139            "              \":\" (colon). If this option is specified, stats collection\n" 
 6140            "              is turned on automatically; if not, then it may be turned on\n" 
 6141            "              by sending the \"stats detail on\" command to the server.\n");
 
 6142     printf(
"-t <num>      number of threads to use (default: 4)\n");
 
 6143     printf(
"-R            Maximum number of requests per event, limits the number of\n" 
 6144            "              requests process for a given connection to prevent \n" 
 6145            "              starvation (default: 20)\n");
 
 6146     printf(
"-C            Disable use of CAS\n");
 
 6147     printf(
"-b            Set the backlog queue limit (default: 1024)\n");
 
 6148     printf(
"-B            Binding protocol - one of ascii, binary, or auto (default)\n");
 
 6149     printf(
"-I            Override the size of each slab page. Adjusts max item size\n" 
 6150            "              (default: 1mb, min: 1k, max: 128m)\n");
 
 6151     printf(
"-q            Disable detailed stats commands\n");
 
 6153     printf(
"-S            Require SASL authentication\n");
 
 6155     printf(
"-X module,cfg Load the module and initialize it with the config\n");
 
 6156     printf(
"-E engine     Load engine as the storage engine\n");
 
 6157     printf(
"-e config     Pass config as configuration options to the storage engine\n");
 
 6158     printf(
"\nEnvironment variables:\n" 
 6159            "MEMCACHED_PORT_FILENAME   File to write port information to\n" 
 6160            "MEMCACHED_TOP_KEYS        Number of top keys to keep track of\n" 
 6161            "MEMCACHED_REQS_TAP_EVENT  Similar to -R but for tap_ship_log\n");
 
 6163 static void usage_license(
void) {
 
 6164     printf(PACKAGE 
" " VERSION 
"\n\n");
 
 6166     "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n" 
 6167     "All rights reserved.\n" 
 6169     "Redistribution and use in source and binary forms, with or without\n" 
 6170     "modification, are permitted provided that the following conditions are\n" 
 6173     "    * Redistributions of source code must retain the above copyright\n" 
 6174     "notice, this list of conditions and the following disclaimer.\n" 
 6176     "    * Redistributions in binary form must reproduce the above\n" 
 6177     "copyright notice, this list of conditions and the following disclaimer\n" 
 6178     "in the documentation and/or other materials provided with the\n" 
 6181     "    * Neither the name of the Danga Interactive nor the names of its\n" 
 6182     "contributors may be used to endorse or promote products derived from\n" 
 6183     "this software without specific prior written permission.\n" 
 6185     "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n" 
 6186     "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n" 
 6187     "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n" 
 6188     "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n" 
 6189     "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n" 
 6190     "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n" 
 6191     "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n" 
 6192     "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n" 
 6193     "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n" 
 6194     "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n" 
 6195     "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n" 
 6198     "This product includes software developed by Niels Provos.\n" 
 6202     "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n" 
 6203     "All rights reserved.\n" 
 6205     "Redistribution and use in source and binary forms, with or without\n" 
 6206     "modification, are permitted provided that the following conditions\n" 
 6208     "1. Redistributions of source code must retain the above copyright\n" 
 6209     "   notice, this list of conditions and the following disclaimer.\n" 
 6210     "2. Redistributions in binary form must reproduce the above copyright\n" 
 6211     "   notice, this list of conditions and the following disclaimer in the\n" 
 6212     "   documentation and/or other materials provided with the distribution.\n" 
 6213     "3. All advertising materials mentioning features or use of this software\n" 
 6214     "   must display the following acknowledgement:\n" 
 6215     "      This product includes software developed by Niels Provos.\n" 
 6216     "4. The name of the author may not be used to endorse or promote products\n" 
 6217     "   derived from this software without specific prior written permission.\n" 
 6219     "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n" 
 6220     "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n" 
 6221     "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n" 
 6222     "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n" 
 6223     "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n" 
 6224     "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n" 
 6225     "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n" 
 6226     "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n" 
 6227     "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n" 
 6228     "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n" 
 6234 static void save_pid(
const char *pid_file) {
 
 6237     if (access(pid_file, F_OK) == 0) {
 
 6238         if ((fp = fopen(pid_file, 
"r")) != NULL) {
 
 6240             if (fgets(buffer, 
sizeof(buffer), fp) != NULL) {
 
 6242                 if (safe_strtoul(buffer, &pid) && 
kill((pid_t)pid, 0) == 0) {
 
 6243                     settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6244                                "WARNING: The pid file contained the following (running) pid: %u\n", pid);
 
 6251     if ((fp = fopen(pid_file, 
"w")) == NULL) {
 
 6252         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6253                  "Could not open the pid file %s for writing: %s\n",
 
 6254                  pid_file, strerror(errno));
 
 6258     fprintf(fp,
"%ld\n", (
long)getpid());
 
 6259     if (fclose(fp) == -1) {
 
 6260         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6261                 "Could not close the pid file %s: %s\n",
 
 6262                 pid_file, strerror(errno));
 
 6266 static void remove_pidfile(
const char *pid_file) {
 
 6267     if (pid_file != NULL) {
 
 6268         if (unlink(pid_file) != 0) {
 
 6269             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6270                     "Could not remove the pid file %s: %s\n",
 
 6271                     pid_file, strerror(errno));
 
 6276 #ifndef HAVE_SIGIGNORE 
 6277 static int sigignore(
int sig) {
 
 6278     struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
 
 6280     if (sigemptyset(&sa.sa_mask) == -1 || 
sigaction(sig, &sa, 0) == -1) {
 
 6287 static void sigterm_handler(
int sig) {
 
 6288     assert(sig == SIGTERM || sig == SIGINT);
 
 6289     memcached_shutdown = 1;
 
 6292 static int install_sigterm_handler(
void) {
 
 6293     struct sigaction sa = {.sa_handler = sigterm_handler, .sa_flags = 0};
 
 6295     if (sigemptyset(&sa.sa_mask) == -1 || 
sigaction(SIGTERM, &sa, 0) == -1 ||
 
 6307 static int enable_large_pages(
void) {
 
 6308 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL) 
 6311     int avail = getpagesizes(sizes, 32);
 
 6313         size_t max = sizes[0];
 
 6314         struct memcntl_mha arg = {0};
 
 6317         for (ii = 1; ii < avail; ++ii) {
 
 6318             if (max < sizes[ii]) {
 
 6324         arg.mha_pagesize = max;
 
 6325         arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
 
 6327         if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
 
 6328             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6329                   "Failed to set large pages: %s\nWill use default page size\n",
 
 6335         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6336           "Failed to get supported pagesizes: %s\nWill use default page size\n",
 
 6346 static const char* get_server_version(
void) {
 
 6350 static void store_engine_specific(
const void *cookie,
 
 6351                                   void *engine_data) {
 
 6352     conn *c = (conn*)cookie;
 
 6353     c->engine_storage = engine_data;
 
 6356 static void *get_engine_specific(
const void *cookie) {
 
 6357     conn *c = (conn*)cookie;
 
 6358     return c->engine_storage;
 
 6361 static int get_socket_fd(
const void *cookie) {
 
 6362     conn *c = (conn *)cookie;
 
 6366 static void set_tap_nack_mode(
const void *cookie, 
bool enable) {
 
 6367     conn *c = (conn *)cookie;
 
 6368     c->tap_nack_mode = enable;
 
 6371 static void reserve_cookie(
const void *cookie) {
 
 6372     conn *c = (conn *)cookie;
 
 6376 static void release_cookie(
const void *cookie) {
 
 6377     conn *c = (conn *)cookie;
 
 6381 static int num_independent_stats(
void) {
 
 6385 static void *new_independent_stats(
void) {
 
 6387     int nrecords = num_independent_stats();
 
 6390         independent_stats->topkeys = topkeys_init(
settings.topkeys);
 
 6391     for (ii = 0; ii < nrecords; ii++)
 
 6392         pthread_mutex_init(&independent_stats->thread_stats[ii].mutex, NULL);
 
 6393     return independent_stats;
 
 6396 static void release_independent_stats(
void *
stats) {
 
 6398     int nrecords = num_independent_stats();
 
 6399     struct independent_stats *independent_stats = stats;
 
 6400     if (independent_stats->topkeys)
 
 6401         topkeys_free(independent_stats->topkeys);
 
 6402     for (ii = 0; ii < nrecords; ii++)
 
 6403         pthread_mutex_destroy(&independent_stats->thread_stats[ii].mutex);
 
 6404     free(independent_stats);
 
 6407 static inline struct independent_stats *get_independent_stats(conn *c) {
 
 6408     struct independent_stats *independent_stats;
 
 6411         if (independent_stats == NULL)
 
 6412             independent_stats = default_independent_stats;
 
 6414         independent_stats = default_independent_stats;
 
 6416     return independent_stats;
 
 6419 static inline struct thread_stats *get_thread_stats(conn *c) {
 
 6420     struct independent_stats *independent_stats = get_independent_stats(c);
 
 6421     assert(c->thread->index < num_independent_stats());
 
 6422     return &independent_stats->thread_stats[c->thread->index];
 
 6426                               ENGINE_EVENT_TYPE type,
 
 6427                               EVENT_CALLBACK cb, 
const void *cb_data) {
 
 6433     h->cb_data = cb_data;
 
 6434     h->next = engine_event_handlers[
type];
 
 6435     engine_event_handlers[
type] = h;
 
 6438 static rel_time_t get_current_time(
void)
 
 6440     return current_time;
 
 6443 static void count_eviction(
const void *cookie, 
const void *key, 
const int nkey) {
 
 6444     topkeys_t *tk = get_independent_stats((conn*)cookie)->topkeys;
 
 6445     TK(tk, evictions, key, nkey, get_current_time());
 
 6455 static ENGINE_ERROR_CODE internal_arithmetic(
ENGINE_HANDLE* handle,
 
 6459                                              const bool increment,
 
 6461                                              const uint64_t delta,
 
 6462                                              const uint64_t initial,
 
 6463                                              const rel_time_t exptime,
 
 6472     ENGINE_ERROR_CODE 
ret;
 
 6473     ret = e->
get(handle, cookie, &it, key, nkey, vbucket);
 
 6475     if (ret == ENGINE_SUCCESS) {
 
 6479             e->
release(handle, cookie, it);
 
 6480             return ENGINE_FAILED;
 
 6485         if (info.value[0].iov_len > (
sizeof(value) - 1)) {
 
 6486             e->
release(handle, cookie, it);
 
 6487             return ENGINE_EINVAL;
 
 6490         memcpy(value, info.value[0].iov_base, info.value[0].iov_len);
 
 6491         value[info.value[0].iov_len] = 
'\0';
 
 6494         if (!safe_strtoull(value, &val)) {
 
 6495             e->
release(handle, cookie, it);
 
 6496             return ENGINE_EINVAL;
 
 6509         size_t nb = snprintf(value, 
sizeof(value), 
"%"PRIu64, val);
 
 6512         if (e->
allocate(handle, cookie, &nit, key,
 
 6513                         nkey, nb, info.
flags, info.
exptime) != ENGINE_SUCCESS) {
 
 6514             e->
release(handle, cookie, it);
 
 6515             return ENGINE_ENOMEM;
 
 6520             e->
release(handle, cookie, it);
 
 6521             e->
release(handle, cookie, nit);
 
 6522             return ENGINE_FAILED;
 
 6525         memcpy(i2.value[0].iov_base, value, nb);
 
 6527         ret = e->
store(handle, cookie, nit, cas, OPERATION_CAS, vbucket);
 
 6528         e->
release(handle, cookie, it);
 
 6529         e->
release(handle, cookie, nit);
 
 6530     } 
else if (ret == ENGINE_KEY_ENOENT && create) {
 
 6532         size_t nb = snprintf(value, 
sizeof(value), 
"%"PRIu64
"\r\n", initial);
 
 6534         if (e->
allocate(handle, cookie, &it, key, nkey, nb, 0, exptime) != ENGINE_SUCCESS) {
 
 6535             e->
release(handle, cookie, it);
 
 6536             return ENGINE_ENOMEM;
 
 6541             e->
release(handle, cookie, it);
 
 6542             return ENGINE_FAILED;
 
 6545         memcpy(info.value[0].iov_base, value, nb);
 
 6546         ret = e->
store(handle, cookie, it, cas, OPERATION_CAS, vbucket);
 
 6547         e->
release(handle, cookie, it);
 
 6551     if (ret == ENGINE_KEY_EEXISTS) {
 
 6552         return internal_arithmetic(handle, cookie, key, nkey, increment, create, delta,
 
 6553                                    initial, exptime, cas, result, vbucket);
 
 6568     if (extension == NULL) {
 
 6577             if (ptr == extension) {
 
 6582         settings.extensions.daemons = extension;
 
 6585         settings.extensions.logger = extension;
 
 6588         if (
settings.extensions.ascii != NULL) {
 
 6590             for (last = 
settings.extensions.ascii; last->
next != NULL;
 
 6591                  last = last->
next) {
 
 6592                 if (last == extension) {
 
 6596             if (last == extension) {
 
 6599             last->
next = extension;
 
 6602             settings.extensions.ascii = extension;
 
 6626             while (ptr != NULL && ptr != extension) {
 
 6631             if (ptr != NULL && prev != NULL) {
 
 6635             if (
settings.extensions.daemons == ptr) {
 
 6641         if (
settings.extensions.logger == extension) {
 
 6642             if (get_stderr_logger() == extension) {
 
 6643                 settings.extensions.logger = get_null_logger();
 
 6645                 settings.extensions.logger = get_stderr_logger();
 
 6654             while (ptr != NULL && ptr != extension) {
 
 6659             if (ptr != NULL && prev != NULL) {
 
 6663             if (
settings.extensions.ascii == ptr) {
 
 6682         return settings.extensions.daemons;
 
 6695 #ifdef INNODB_MEMCACHED 
 6696 void shutdown_server(
void) {
 
 6698 static void shutdown_server(
void) {
 
 6700 #ifdef INNODB_MEMCACHED 
 6703     while (listen_conn) {
 
 6704         conn_closing(listen_conn);
 
 6705         listen_conn = listen_conn->next;
 
 6708     for (i = 0; i < num_udp_socket; i++) {
 
 6709         safe_close(udp_socket[i]);
 
 6712     memcached_shutdown = 1;
 
 6715 #ifdef INNODB_MEMCACHED 
 6716 bool shutdown_complete(
void)
 
 6718     return(memcached_shutdown == 2);
 
 6727 static EXTENSION_LOG_LEVEL get_log_level(
void)
 
 6729     EXTENSION_LOG_LEVEL 
ret;
 
 6731     case 0: ret = EXTENSION_LOG_WARNING; 
break;
 
 6732     case 1: ret = EXTENSION_LOG_INFO; 
break;
 
 6733     case 2: ret = EXTENSION_LOG_DEBUG; 
break;
 
 6735         ret = EXTENSION_LOG_DETAIL;
 
 6740 static void set_log_level(EXTENSION_LOG_LEVEL severity)
 
 6743     case EXTENSION_LOG_WARNING: 
settings.verbose = 0; 
break;
 
 6744     case EXTENSION_LOG_INFO: 
settings.verbose = 1; 
break;
 
 6745     case EXTENSION_LOG_DEBUG: 
settings.verbose = 2; 
break;
 
 6751 static void get_config_append_stats(
const char *key, 
const uint16_t klen,
 
 6752                                     const char *val, 
const uint32_t vlen,
 
 6755     if (klen == 0  || vlen == 0) {
 
 6759     char *pos = (
char*)cookie;
 
 6760     size_t nbytes = strlen(pos);
 
 6762     if ((nbytes + klen + vlen + 3) > 1024) {
 
 6767     memcpy(pos + nbytes, key, klen);
 
 6771     memcpy(pos + nbytes, val, vlen);
 
 6773     memcpy(pos + nbytes, 
";", 2);
 
 6779     process_stat_settings(get_config_append_stats, config);
 
 6780     int rval = parse_config(config, items, NULL);
 
 6794         .realtime = realtime,
 
 6796         .get_current_time = get_current_time,
 
 6797         .parse_config = parse_config,
 
 6798         .shutdown = shutdown_server,
 
 6799         .get_config = get_config
 
 6804         .store_engine_specific = store_engine_specific,
 
 6805         .get_engine_specific = get_engine_specific,
 
 6806         .get_socket_fd = get_socket_fd,
 
 6807         .set_tap_nack_mode = set_tap_nack_mode,
 
 6808         .notify_io_complete = notify_io_complete,
 
 6809         .reserve = reserve_cookie,
 
 6810         .release = release_cookie
 
 6815         .release_stats = release_independent_stats,
 
 6816         .evicting = count_eviction
 
 6820         .get_logger = get_logger,
 
 6821         .get_level = get_log_level,
 
 6822         .set_level = set_log_level
 
 6826         .unregister_extension = unregister_extension,
 
 6827         .get_extension = get_extension
 
 6832         .perform_callbacks = perform_callbacks,
 
 6838         .stat = &server_stat_api,
 
 6839         .extension = &extension_api,
 
 6840         .callback = &callback_api,
 
 6841         .log = &server_log_api,
 
 6842         .cookie = &server_cookie_api
 
 6845     if (rv.engine == NULL) {
 
 6859 static bool load_extension(
const char *soname, 
const char *
config) {
 
 6860     if (soname == NULL) {
 
 6868     } funky = {.initialize = NULL };
 
 6870     void *handle = dlopen(soname, RTLD_NOW | RTLD_LOCAL);
 
 6871     if (handle == NULL) {
 
 6872         const char *msg = dlerror();
 
 6873         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6874                 "Failed to open library \"%s\": %s\n",
 
 6875                 soname, msg ? msg : 
"unknown error");
 
 6879     void *symbol = dlsym(handle, 
"memcached_extensions_initialize");
 
 6880     if (symbol == NULL) {
 
 6881         const char *msg = dlerror();
 
 6882         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6883                 "Could not find symbol \"memcached_extensions_initialize\" in %s: %s\n",
 
 6884                 soname, msg ? msg : 
"unknown error");
 
 6887     funky.voidptr = symbol;
 
 6892         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6893                 "Failed to initalize extensions from %s. Error code: %d\n",
 
 6900         settings.extensions.logger->
log(EXTENSION_LOG_INFO, NULL,
 
 6901                 "Loaded extensions from: %s\n", soname);
 
 6911 static bool sanitycheck(
void) {
 
 6913     const char *ever = event_get_version();
 
 6915         if (strncmp(ever, 
"1.", 2) == 0) {
 
 6917             if ((ever[2] == 
'1' || ever[2] == 
'2') && !isdigit(ever[3])) {
 
 6918                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 6919                         "You are using libevent %s.\nPlease upgrade to" 
 6920                         " a more recent version (1.3 or newer)\n",
 
 6921                         event_get_version());
 
 6930 #ifdef INNODB_MEMCACHED 
 6933 my_strdupl(
const char* str, 
int len)
 
 6935         char*   s = (
char*) malloc(len + 1);
 
 6937         return((
char*) memcpy(s, str, len));
 
 6944 daemon_memcached_make_option(
char* 
option, 
int* option_argc,
 
 6945                              char*** option_argv)
 
 6947         static const char*      sep = 
" ";
 
 6954         my_str = my_strdupl(option, strlen(option));
 
 6956         for (opt_str = strtok_r(my_str, sep, &last);
 
 6958              opt_str = strtok_r(NULL, sep, &last)) {
 
 6966         *option_argv = (
char**) malloc((num_arg + 1)
 
 6967                                        * 
sizeof(**option_argv));
 
 6969         for (opt_str = strtok_r(my_str, sep, &last);
 
 6971              opt_str = strtok_r(NULL, sep, &last)) {
 
 6972                 (*option_argv)[
i] = my_strdupl(opt_str, strlen(opt_str));
 
 6976         assert(i == num_arg + 1);
 
 6978         *option_argc = (num_arg + 1);
 
 6988         unsigned int    eng_r_batch_size;
 
 6989         unsigned int    eng_w_batch_size;
 
 6994 #ifdef INNODB_MEMCACHED 
 6995 void* daemon_memcached_main(
void *p) {
 
 6997 int main (
int argc, 
char **argv) {
 
 7000     bool lock_memory = 
false;
 
 7001     bool do_daemonize = 
false;
 
 7002     bool preallocate = 
false;
 
 7004     char *username = NULL;
 
 7005     char *pid_file = NULL;
 
 7011     bool protocol_specified = 
false;
 
 7012     bool tcp_specified = 
false;
 
 7013     bool udp_specified = 
false;
 
 7016     const char *engine_config = NULL;
 
 7017     char old_options[1024] = { [0] = 
'\0' };
 
 7018     char *old_opts = old_options;
 
 7019 #ifdef INNODB_MEMCACHED 
 7020     int option_argc = 0;
 
 7021     char** option_argv = NULL;
 
 7024     if (m_config->m_engine_library) {
 
 7025         engine = m_config->m_engine_library;
 
 7030         my_eng_config.
cb_ptr = m_config->m_innodb_api_cb;
 
 7031         my_eng_config.eng_r_batch_size = m_config->m_r_batch_size;
 
 7032         my_eng_config.eng_w_batch_size = m_config->m_w_batch_size;
 
 7033         my_eng_config.enable_binlog = m_config->m_enable_binlog;
 
 7035         engine_config = (
const char *) (&my_eng_config);
 
 7038         engine = 
"default_engine.so";
 
 7041     engine = 
"default_engine.so";
 
 7044     memcached_shutdown = 0;
 
 7046     if (!sanitycheck()) {
 
 7054     process_started = time(0) - 2;
 
 7058     initialize_sockets();
 
 7064         fprintf(stderr, 
"Failed to initialize log system\n");
 
 7068     if (m_config->m_mem_option) {
 
 7069         daemon_memcached_make_option(m_config->m_mem_option,
 
 7074 #ifdef INNODB_MEMCACHED 
 7076     if (option_argc > 0 && option_argv) {
 
 7080             while (-1 != (c = getopt(option_argc, option_argv,
 
 7115                     settings.access= strtol(optarg,NULL,8);
 
 7120                     udp_specified = 
true;
 
 7124                     tcp_specified = 
true;
 
 7130                     settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
 
 7131                      old_opts += sprintf(old_opts, 
"cache_size=%lu;",
 
 7136                     old_opts += sprintf(old_opts, 
"eviction=false;");
 
 7152                     perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
 
 7158                     do_daemonize = 
true;
 
 7164                     settings.reqs_per_event = atoi(optarg);
 
 7165                     if (
settings.reqs_per_event <= 0) {
 
 7166                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7167                               "Number of requests per event must be greater than 0\n");
 
 7180                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7181                                 "Factor must be greater than 1\n");
 
 7184                      old_opts += sprintf(old_opts, 
"factor=%f;",
 
 7188                     settings.chunk_size = atoi(optarg);
 
 7190                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7191                                 "Chunk size must be greater than 0\n");
 
 7194                     old_opts += sprintf(old_opts, 
"chunk_size=%u;",
 
 7198                     settings.num_threads = atoi(optarg);
 
 7200                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7201                                 "Number of threads must be greater than 0\n");
 
 7209                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7210                                 "WARNING: Setting a high number of worker" 
 7211                                 "threads is not recommended.\n" 
 7212                                 " Set this value to the number of cores in" 
 7213                                 " your machine or less.\n");
 
 7217                     settings.prefix_delimiter = optarg[0];
 
 7221                     if (enable_large_pages() == 0) {
 
 7223                         old_opts += sprintf(old_opts, 
"preallocate=true;");
 
 7233                     protocol_specified = 
true;
 
 7234                     if (strcmp(optarg, 
"auto") == 0) {
 
 7235                         settings.binding_protocol = negotiating_prot;
 
 7236                     } 
else if (strcmp(optarg, 
"binary") == 0) {
 
 7237                         settings.binding_protocol = binary_prot;
 
 7238                     } 
else if (strcmp(optarg, 
"ascii") == 0) {
 
 7239                         settings.binding_protocol = ascii_prot;
 
 7241                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7242                                 "Invalid value for binding protocol: %s\n" 
 7243                                 " -- should be one of auto, binary, or ascii\n", optarg);
 
 7248                     unit = optarg[strlen(optarg)-1];
 
 7249                     if (unit == 
'k' || unit == 
'm' ||
 
 7250                         unit == 
'K' || unit == 
'M') {
 
 7251                         optarg[strlen(optarg)-1] = 
'\0';
 
 7252                         size_max = atoi(optarg);
 
 7253                         if (unit == 
'k' || unit == 
'K')
 
 7255                         if (unit == 
'm' || unit == 
'M')
 
 7256                             size_max *= 1024 * 1024;
 
 7259                         settings.item_size_max = atoi(optarg);
 
 7261                     if (
settings.item_size_max < 1024) {
 
 7262                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7263                                 "Item max size cannot be less than 1024 bytes.\n");
 
 7266                     if (
settings.item_size_max > 1024 * 1024 * 128) {
 
 7267                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7268                                 "Cannot set item size limit higher than 128 mb.\n");
 
 7271                     if (
settings.item_size_max > 1024 * 1024) {
 
 7272                         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7273                             "WARNING: Setting item max size above 1MB is not" 
 7275                             " Raising this limit increases the minimum memory requirements\n" 
 7276                             " and will decrease your memory efficiency.\n" 
 7280                     old_opts += sprintf(old_opts, 
"item_size_max=%zu;",
 
 7283                     old_opts += sprintf(old_opts, 
"item_size_max=%lu;", (
long unsigned)
 
 7299 # ifdef ENABLE_MEMCACHED_SASL 
 7300 #  ifndef SASL_ENABLED 
 7301                     settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7302                             "This server is not built with SASL support.\n");
 
 7310                         char *ptr = strchr(optarg, 
',');
 
 7315                         if (!load_extension(optarg, ptr)) {
 
 7324                     settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7325                             "Illegal argument \"%c\"\n", c);
 
 7334     while (-1 != (c = getopt(argc, argv,
 
 7369             settings.access= strtol(optarg,NULL,8);
 
 7374             udp_specified = 
true;
 
 7378             tcp_specified = 
true;
 
 7384             settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
 
 7385              old_opts += sprintf(old_opts, 
"cache_size=%lu;",
 
 7390             old_opts += sprintf(old_opts, 
"eviction=false;");
 
 7406             perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
 
 7410                 size_t len = strlen(
settings.inter) + strlen(optarg) + 2;
 
 7411                 char *p = malloc(len);
 
 7413                     settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7414                                                     "Failed to allocate memory\n");
 
 7417                 snprintf(p, len, 
"%s,%s", 
settings.inter, optarg);
 
 7425             do_daemonize = 
true;
 
 7431             settings.reqs_per_event = atoi(optarg);
 
 7432             if (
settings.reqs_per_event <= 0) {
 
 7433                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7434                       "Number of requests per event must be greater than 0\n");
 
 7447                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7448                         "Factor must be greater than 1\n");
 
 7451              old_opts += sprintf(old_opts, 
"factor=%f;",
 
 7455             settings.chunk_size = atoi(optarg);
 
 7457                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7458                         "Chunk size must be greater than 0\n");
 
 7461             old_opts += sprintf(old_opts, 
"chunk_size=%u;",
 
 7465             settings.num_threads = atoi(optarg);
 
 7467                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7468                         "Number of threads must be greater than 0\n");
 
 7476                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7477                         "WARNING: Setting a high number of worker" 
 7478                         "threads is not recommended.\n" 
 7479                         " Set this value to the number of cores in" 
 7480                         " your machine or less.\n");
 
 7484             settings.prefix_delimiter = optarg[0];
 
 7488             if (enable_large_pages() == 0) {
 
 7490                 old_opts += sprintf(old_opts, 
"preallocate=true;");
 
 7500             protocol_specified = 
true;
 
 7501             if (strcmp(optarg, 
"auto") == 0) {
 
 7502                 settings.binding_protocol = negotiating_prot;
 
 7503             } 
else if (strcmp(optarg, 
"binary") == 0) {
 
 7504                 settings.binding_protocol = binary_prot;
 
 7505             } 
else if (strcmp(optarg, 
"ascii") == 0) {
 
 7506                 settings.binding_protocol = ascii_prot;
 
 7508                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7509                         "Invalid value for binding protocol: %s\n" 
 7510                         " -- should be one of auto, binary, or ascii\n", optarg);
 
 7515             unit = optarg[strlen(optarg)-1];
 
 7516             if (unit == 
'k' || unit == 
'm' ||
 
 7517                 unit == 
'K' || unit == 
'M') {
 
 7518                 optarg[strlen(optarg)-1] = 
'\0';
 
 7519                 size_max = atoi(optarg);
 
 7520                 if (unit == 
'k' || unit == 
'K')
 
 7522                 if (unit == 
'm' || unit == 
'M')
 
 7523                     size_max *= 1024 * 1024;
 
 7526                 settings.item_size_max = atoi(optarg);
 
 7528             if (
settings.item_size_max < 1024) {
 
 7529                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7530                         "Item max size cannot be less than 1024 bytes.\n");
 
 7533             if (
settings.item_size_max > 1024 * 1024 * 128) {
 
 7534                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7535                         "Cannot set item size limit higher than 128 mb.\n");
 
 7538             if (
settings.item_size_max > 1024 * 1024) {
 
 7539                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7540                     "WARNING: Setting item max size above 1MB is not" 
 7542                     " Raising this limit increases the minimum memory requirements\n" 
 7543                     " and will decrease your memory efficiency.\n" 
 7547             old_opts += sprintf(old_opts, 
"item_size_max=%zu;",
 
 7550             old_opts += sprintf(old_opts, 
"item_size_max=%lu;", (
long unsigned)
 
 7558             engine_config = optarg;
 
 7564 #ifndef SASL_ENABLED 
 7565             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7566                     "This server is not built with SASL support.\n");
 
 7573                 char *ptr = strchr(optarg, 
',');
 
 7578                 if (!load_extension(optarg, ptr)) {
 
 7587             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7588                     "Illegal argument \"%c\"\n", c);
 
 7594     if (getenv(
"MEMCACHED_REQS_TAP_EVENT") != NULL) {
 
 7595         settings.reqs_per_tap_event = atoi(getenv(
"MEMCACHED_REQS_TAP_EVENT"));
 
 7598     if (
settings.reqs_per_tap_event <= 0) {
 
 7599         settings.reqs_per_tap_event = DEFAULT_REQS_PER_TAP_EVENT;
 
 7603     if (install_sigterm_handler() != 0) {
 
 7604         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7605                                         "Failed to install SIGTERM handler\n");
 
 7609     char *topkeys_env = getenv(
"MEMCACHED_TOP_KEYS");
 
 7610     if (topkeys_env != NULL) {
 
 7611         settings.topkeys = atoi(topkeys_env);
 
 7618         if (!protocol_specified) {
 
 7619             settings.binding_protocol = binary_prot;
 
 7621             if (
settings.binding_protocol == negotiating_prot) {
 
 7622                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7623                         "ERROR: You cannot use auto-negotiating protocol while requiring SASL.\n");
 
 7626             if (
settings.binding_protocol == ascii_prot) {
 
 7627                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7628                         "ERROR: You cannot use only ASCII protocol while requiring SASL.\n");
 
 7634     if (tcp_specified && !udp_specified) {
 
 7636     } 
else if (udp_specified && !tcp_specified) {
 
 7650         struct rlimit rlim_new;
 
 7655         if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
 
 7656             rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
 
 7657             if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
 
 7659                 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
 
 7660                 (void)setrlimit(RLIMIT_CORE, &rlim_new);
 
 7669         if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
 
 7670             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7671                     "failed to ensure corefile creation\n");
 
 7681     if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
 
 7682         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7683                 "failed to getrlimit number of files\n");
 
 7687         if (rlim.rlim_cur < maxfiles)
 
 7688             rlim.rlim_cur = maxfiles;
 
 7689         if (rlim.rlim_max < rlim.rlim_cur)
 
 7690             rlim.rlim_max = rlim.rlim_cur;
 
 7691         if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
 
 7692             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7693                     "failed to set rlimit for open files. Try running as" 
 7694                     " root or requesting smaller maxconns value.\n");
 
 7705         nfiles += 
settings.num_threads * 2;
 
 7709         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7710                 "Configuratioin error. \n" 
 7711                 "You specified %d connections, but the system will use at " 
 7712                 "least %d\nconnection structures to start.\n",
 
 7718     if (getuid() == 0 || geteuid() == 0) {
 
 7719         if (username == 0 || *username == 
'\0') {
 
 7720             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7721                     "can't run as root without the -u switch\n");
 
 7724         if ((pw = getpwnam(username)) == 0) {
 
 7725             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7726                     "can't find the user %s to switch to\n", username);
 
 7729         if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
 
 7730             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7731                     "failed to assume identity of user %s: %s\n", username,
 
 7744         if (sigignore(SIGHUP) == -1) {
 
 7745             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7746                     "Failed to ignore SIGHUP: ", strerror(errno));
 
 7748         if (daemonize(maxcore, 
settings.verbose) == -1) {
 
 7749              settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7750                     "failed to daemon() in order to daemonize\n");
 
 7757 #ifdef HAVE_MLOCKALL 
 7758         int res = mlockall(MCL_CURRENT | MCL_FUTURE);
 
 7760             settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7761                     "warning: -k invalid, mlockall() failed: %s\n",
 
 7765         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7766                 "warning: -k invalid, mlockall() not supported on this platform.  proceeding without.\n");
 
 7771     main_base = event_init();
 
 7775     if (!load_engine(engine,get_server_api,
settings.extensions.logger,&engine_handle)) {
 
 7780     if(!init_engine(engine_handle,engine_config,
settings.extensions.logger)) {
 
 7781 #ifdef INNODB_MEMCACHED 
 7790         log_engine_details(engine_handle,
settings.extensions.logger);
 
 7801     if (!(conn_cache = cache_create(
"conn", 
sizeof(conn), 
sizeof(
void*),
 
 7802                                     conn_constructor, conn_destructor))) {
 
 7803         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7804                 "Failed to create connection cache\n");
 
 7808     default_independent_stats = new_independent_stats();
 
 7815     if (sigignore(SIGPIPE) == -1) {
 
 7816         settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7817                 "failed to ignore SIGPIPE; sigaction");
 
 7823     thread_init(
settings.num_threads, main_base, dispatch_event_handler);
 
 7826     clock_handler(0, 0, 0);
 
 7831             vperror(
"failed to listen on UNIX socket: %s", 
settings.socketpath);
 
 7840         const char *portnumber_filename = getenv(
"MEMCACHED_PORT_FILENAME");
 
 7841         char temp_portnumber_filename[PATH_MAX];
 
 7842         FILE *portnumber_file = NULL;
 
 7844         if (portnumber_filename != NULL) {
 
 7845             snprintf(temp_portnumber_filename,
 
 7846                      sizeof(temp_portnumber_filename),
 
 7847                      "%s.lck", portnumber_filename);
 
 7849             portnumber_file = fopen(temp_portnumber_filename, 
"a");
 
 7850             if (portnumber_file == NULL) {
 
 7851                 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
 
 7852                         "Failed to open \"%s\": %s\n",
 
 7853                         temp_portnumber_filename, strerror(errno));
 
 7859                 vperror(
"failed to listen on TCP port %d", 
settings.port);
 
 7860 #ifdef INNODB_MEMCACHED 
 7879             vperror(
"failed to listen on UDP port %d", 
settings.udpport);
 
 7883         if (portnumber_file) {
 
 7884             fclose(portnumber_file);
 
 7885             rename(temp_portnumber_filename, portnumber_filename);
 
 7889     if (pid_file != NULL) {
 
 7897     event_base_loop(main_base, 0);
 
 7900         settings.extensions.logger->
log(EXTENSION_LOG_INFO, NULL,
 
 7901                                         "Initiating shutdown\n");
 
 7913         remove_pidfile(pid_file);
 
 7918     memcached_shutdown = 2;
 
 7920     return EXIT_SUCCESS;