MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
default_engine.c
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <assert.h>
8 #include <errno.h>
9 #include <ctype.h>
10 #include <unistd.h>
11 #include <stddef.h>
12 #include <inttypes.h>
13 
14 #include "default_engine.h"
15 #include "memcached/util.h"
16 #include "memcached/config_parser.h"
17 
18 static const engine_info* default_get_info(ENGINE_HANDLE* handle);
19 static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle,
20  const char* config_str);
21 static void default_destroy(ENGINE_HANDLE* handle,
22  const bool force);
23 static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle,
24  const void* cookie,
25  item **item,
26  const void* key,
27  const size_t nkey,
28  const size_t nbytes,
29  const int flags,
30  const rel_time_t exptime);
31 static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle,
32  const void* cookie,
33  const void* key,
34  const size_t nkey,
35  uint64_t cas,
36  uint16_t vbucket);
37 
38 static void default_item_release(ENGINE_HANDLE* handle, const void *cookie,
39  item* item);
40 static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle,
41  const void* cookie,
42  item** item,
43  const void* key,
44  const int nkey,
45  uint16_t vbucket);
46 static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle,
47  const void *cookie,
48  const char *stat_key,
49  int nkey,
50  ADD_STAT add_stat);
51 static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie);
52 static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle,
53  const void *cookie,
54  item* item,
55  uint64_t *cas,
56  ENGINE_STORE_OPERATION operation,
57  uint16_t vbucket);
58 static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle,
59  const void* cookie,
60  const void* key,
61  const int nkey,
62  const bool increment,
63  const bool create,
64  const uint64_t delta,
65  const uint64_t initial,
66  const rel_time_t exptime,
67  uint64_t *cas,
68  uint64_t *result,
69  uint16_t vbucket);
70 static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle,
71  const void* cookie, time_t when);
72 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
73  const char *cfg_str);
74 static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle,
75  const void* cookie,
77  ADD_RESPONSE response);
78 
79 static ENGINE_ERROR_CODE default_tap_notify(ENGINE_HANDLE* handle,
80  const void *cookie,
81  void *engine_specific,
82  uint16_t nengine,
83  uint8_t ttl,
84  uint16_t tap_flags,
85  tap_event_t tap_event,
86  uint32_t tap_seqno,
87  const void *key,
88  size_t nkey,
89  uint32_t flags,
90  uint32_t exptime,
91  uint64_t cas,
92  const void *data,
93  size_t ndata,
94  uint16_t vbucket);
95 
96 static TAP_ITERATOR default_get_tap_iterator(ENGINE_HANDLE* handle,
97  const void* cookie,
98  const void* client,
99  size_t nclient,
100  uint32_t flags,
101  const void* userdata,
102  size_t nuserdata);
103 
104 static void default_handle_disconnect(const void *cookie,
105  ENGINE_EVENT_TYPE type,
106  const void *event_data,
107  const void *cb_data);
108 
110  char c;
111  struct vbucket_info v;
112 };
113 
114 static void set_vbucket_state(struct default_engine *e,
115  uint16_t vbid, vbucket_state_t to) {
116  union vbucket_info_adapter vi;
117  vi.c = e->vbucket_infos[vbid];
118  vi.v.state = to;
119  e->vbucket_infos[vbid] = vi.c;
120 }
121 
122 static vbucket_state_t get_vbucket_state(struct default_engine *e,
123  uint16_t vbid) {
124  union vbucket_info_adapter vi;
125  vi.c = e->vbucket_infos[vbid];
126  return vi.v.state;
127 }
128 
129 static bool handled_vbucket(struct default_engine *e, uint16_t vbid) {
130  return e->config.ignore_vbucket
131  || (get_vbucket_state(e, vbid) == vbucket_state_active);
132 }
133 
134 /* mechanism for handling bad vbucket requests */
135 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
136 
137 static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie,
138  const item* item, item_info *item_info);
139 
140 static const char const * vbucket_state_name(vbucket_state_t s) {
141  static const char const * vbucket_states[] = {
142  [vbucket_state_active] = "active",
143  [vbucket_state_replica] = "replica",
144  [vbucket_state_pending] = "pending",
145  [vbucket_state_dead] = "dead"
146  };
147  if (is_valid_vbucket_state_t(s)) {
148  return vbucket_states[s];
149  } else {
150  return "Illegal vbucket state";
151  }
152 }
153 
154 ENGINE_ERROR_CODE create_instance(uint64_t interface,
155  GET_SERVER_API get_server_api,
156  ENGINE_HANDLE **handle) {
157  SERVER_HANDLE_V1 *api = get_server_api();
158  if (interface != 1 || api == NULL) {
159  return ENGINE_ENOTSUP;
160  }
161 
162  struct default_engine *engine = malloc(sizeof(*engine));
163  if (engine == NULL) {
164  return ENGINE_ENOMEM;
165  }
166 
167  struct default_engine default_engine = {
168  .engine = {
169  .interface = {
170  .interface = 1
171  },
172  .get_info = default_get_info,
173  .initialize = default_initialize,
174  .destroy = default_destroy,
175  .allocate = default_item_allocate,
176  .remove = default_item_delete,
177  .release = default_item_release,
178  .get = default_get,
179  .get_stats = default_get_stats,
180  .reset_stats = default_reset_stats,
181  .store = default_store,
182  .arithmetic = default_arithmetic,
183  .flush = default_flush,
184  .unknown_command = default_unknown_command,
185  .tap_notify = default_tap_notify,
186  .get_tap_iterator = default_get_tap_iterator,
187  .item_set_cas = item_set_cas,
188  .get_item_info = get_item_info
189  },
190  .server = *api,
191  .get_server_api = get_server_api,
192  .initialized = true,
193  .assoc = {
194  .hashpower = 16,
195  },
196  .slabs = {
197  .lock = PTHREAD_MUTEX_INITIALIZER
198  },
199  .cache_lock = PTHREAD_MUTEX_INITIALIZER,
200  .stats = {
201  .lock = PTHREAD_MUTEX_INITIALIZER,
202  },
203  .config = {
204  .use_cas = true,
205  .verbose = 0,
206  .oldest_live = 0,
207  .evict_to_free = true,
208  .maxbytes = 64 * 1024 * 1024,
209  .preallocate = false,
210  .factor = 1.25,
211  .chunk_size = 48,
212  .item_size_max= 1024 * 1024,
213  },
214  .scrubber = {
215  .lock = PTHREAD_MUTEX_INITIALIZER,
216  },
217  .tap_connections = {
218  .lock = PTHREAD_MUTEX_INITIALIZER,
219  .size = 10,
220  },
221  .info.engine_info = {
222  .description = "Default engine v0.1",
223  .num_features = 1,
224  .features = {
225  [0].feature = ENGINE_FEATURE_LRU
226  }
227  }
228  };
229 
230  *engine = default_engine;
231  engine->tap_connections.clients = calloc(default_engine.tap_connections.size, sizeof(void*));
232  if (engine->tap_connections.clients == NULL) {
233  free(engine);
234  return ENGINE_ENOMEM;
235  }
236  *handle = (ENGINE_HANDLE*)&engine->engine;
237  return ENGINE_SUCCESS;
238 }
239 
240 static inline struct default_engine* get_handle(ENGINE_HANDLE* handle) {
241  return (struct default_engine*)handle;
242 }
243 
244 static inline hash_item* get_real_item(item* item) {
245  return (hash_item*)item;
246 }
247 
248 static const engine_info* default_get_info(ENGINE_HANDLE* handle) {
249  return &get_handle(handle)->info.engine_info;
250 }
251 
252 static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle,
253  const char* config_str) {
254  struct default_engine* se = get_handle(handle);
255 
256  ENGINE_ERROR_CODE ret = initalize_configuration(se, config_str);
257  if (ret != ENGINE_SUCCESS) {
258  return ret;
259  }
260 
261  /* fixup feature_info */
262  if (se->config.use_cas) {
263  se->info.engine_info.features[se->info.engine_info.num_features++].feature = ENGINE_FEATURE_CAS;
264  }
265 
266  ret = assoc_init(se);
267  if (ret != ENGINE_SUCCESS) {
268  return ret;
269  }
270 
271  ret = slabs_init(se, se->config.maxbytes, se->config.factor,
272  se->config.preallocate);
273  if (ret != ENGINE_SUCCESS) {
274  return ret;
275  }
276 
277  se->server.callback->register_callback(handle, ON_DISCONNECT, default_handle_disconnect, handle);
278 
279  return ENGINE_SUCCESS;
280 }
281 
282 static void default_destroy(ENGINE_HANDLE* handle, const bool force) {
283  (void) force;
284  struct default_engine* se = get_handle(handle);
285 
286  if (se->initialized) {
287  pthread_mutex_destroy(&se->cache_lock);
288  pthread_mutex_destroy(&se->stats.lock);
289  pthread_mutex_destroy(&se->slabs.lock);
290  se->initialized = false;
291  free(se->tap_connections.clients);
292  free(se);
293  }
294 }
295 
296 static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle,
297  const void* cookie,
298  item **item,
299  const void* key,
300  const size_t nkey,
301  const size_t nbytes,
302  const int flags,
303  const rel_time_t exptime) {
304  struct default_engine* engine = get_handle(handle);
305  size_t ntotal = sizeof(hash_item) + nkey + nbytes;
306  if (engine->config.use_cas) {
307  ntotal += sizeof(uint64_t);
308  }
309  unsigned int id = slabs_clsid(engine, ntotal);
310  if (id == 0) {
311  return ENGINE_E2BIG;
312  }
313 
314  hash_item *it;
315  it = item_alloc(engine, key, nkey, flags, engine->server.core->realtime(exptime),
316  nbytes, cookie);
317 
318  if (it != NULL) {
319  *item = it;
320  return ENGINE_SUCCESS;
321  } else {
322  return ENGINE_ENOMEM;
323  }
324 }
325 
326 static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle,
327  const void* cookie,
328  const void* key,
329  const size_t nkey,
330  uint64_t cas,
331  uint16_t vbucket)
332 {
333  struct default_engine* engine = get_handle(handle);
334  VBUCKET_GUARD(engine, vbucket);
335 
336  hash_item *it = item_get(engine, key, nkey);
337  if (it == NULL) {
338  return ENGINE_KEY_ENOENT;
339  }
340 
341  if (cas == 0 || cas == item_get_cas(it)) {
342  item_unlink(engine, it);
343  item_release(engine, it);
344  } else {
345  return ENGINE_KEY_EEXISTS;
346  }
347 
348  return ENGINE_SUCCESS;
349 }
350 
351 static void default_item_release(ENGINE_HANDLE* handle,
352  const void *cookie,
353  item* item) {
354  item_release(get_handle(handle), get_real_item(item));
355 }
356 
357 static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle,
358  const void* cookie,
359  item** item,
360  const void* key,
361  const int nkey,
362  uint16_t vbucket) {
363  struct default_engine *engine = get_handle(handle);
364  VBUCKET_GUARD(engine, vbucket);
365 
366  *item = item_get(engine, key, nkey);
367  if (*item != NULL) {
368  return ENGINE_SUCCESS;
369  } else {
370  return ENGINE_KEY_ENOENT;
371  }
372 }
373 
374 static void stats_vbucket(struct default_engine *e,
375  ADD_STAT add_stat,
376  const void *cookie) {
377  for (int i = 0; i < NUM_VBUCKETS; i++) {
378  vbucket_state_t state = get_vbucket_state(e, i);
379  if (state != vbucket_state_dead) {
380  char buf[16];
381  snprintf(buf, sizeof(buf), "vb_%d", i);
382  const char * state_name = vbucket_state_name(state);
383  add_stat(buf, strlen(buf), state_name, strlen(state_name), cookie);
384  }
385  }
386 }
387 
388 static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle,
389  const void* cookie,
390  const char* stat_key,
391  int nkey,
392  ADD_STAT add_stat)
393 {
394  struct default_engine* engine = get_handle(handle);
395  ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
396 
397  if (stat_key == NULL) {
398  char val[128];
399  int len;
400 
401  pthread_mutex_lock(&engine->stats.lock);
402  len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.evictions);
403  add_stat("evictions", 9, val, len, cookie);
404  len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_items);
405  add_stat("curr_items", 10, val, len, cookie);
406  len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.total_items);
407  add_stat("total_items", 11, val, len, cookie);
408  len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_bytes);
409  add_stat("bytes", 5, val, len, cookie);
410  len = sprintf(val, "%"PRIu64, engine->stats.reclaimed);
411  add_stat("reclaimed", 9, val, len, cookie);
412  len = sprintf(val, "%"PRIu64, (uint64_t)engine->config.maxbytes);
413  add_stat("engine_maxbytes", 15, val, len, cookie);
414  pthread_mutex_unlock(&engine->stats.lock);
415  } else if (strncmp(stat_key, "slabs", 5) == 0) {
416  slabs_stats(engine, add_stat, cookie);
417  } else if (strncmp(stat_key, "items", 5) == 0) {
418  item_stats(engine, add_stat, cookie);
419  } else if (strncmp(stat_key, "sizes", 5) == 0) {
420  item_stats_sizes(engine, add_stat, cookie);
421  } else if (strncmp(stat_key, "vbucket", 7) == 0) {
422  stats_vbucket(engine, add_stat, cookie);
423  } else if (strncmp(stat_key, "scrub", 5) == 0) {
424  char val[128];
425  int len;
426 
427  pthread_mutex_lock(&engine->scrubber.lock);
428  if (engine->scrubber.running) {
429  add_stat("scrubber:status", 15, "running", 7, cookie);
430  } else {
431  add_stat("scrubber:status", 15, "stopped", 7, cookie);
432  }
433 
434  if (engine->scrubber.started != 0) {
435  if (engine->scrubber.stopped != 0) {
436  time_t diff = engine->scrubber.started - engine->scrubber.stopped;
437  len = sprintf(val, "%"PRIu64, (uint64_t)diff);
438  add_stat("scrubber:last_run", 17, val, len, cookie);
439  }
440 
441  len = sprintf(val, "%"PRIu64, engine->scrubber.visited);
442  add_stat("scrubber:visited", 16, val, len, cookie);
443  len = sprintf(val, "%"PRIu64, engine->scrubber.cleaned);
444  add_stat("scrubber:cleaned", 16, val, len, cookie);
445  }
446  pthread_mutex_unlock(&engine->scrubber.lock);
447  } else {
448  ret = ENGINE_KEY_ENOENT;
449  }
450 
451  return ret;
452 }
453 
454 static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle,
455  const void *cookie,
456  item* item,
457  uint64_t *cas,
458  ENGINE_STORE_OPERATION operation,
459  uint16_t vbucket) {
460  struct default_engine *engine = get_handle(handle);
461  VBUCKET_GUARD(engine, vbucket);
462  return store_item(engine, get_real_item(item), cas, operation,
463  cookie);
464 }
465 
466 static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle,
467  const void* cookie,
468  const void* key,
469  const int nkey,
470  const bool increment,
471  const bool create,
472  const uint64_t delta,
473  const uint64_t initial,
474  const rel_time_t exptime,
475  uint64_t *cas,
476  uint64_t *result,
477  uint16_t vbucket) {
478  struct default_engine *engine = get_handle(handle);
479  VBUCKET_GUARD(engine, vbucket);
480 
481  return arithmetic(engine, cookie, key, nkey, increment,
482  create, delta, initial, engine->server.core->realtime(exptime), cas,
483  result);
484 }
485 
486 static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle,
487  const void* cookie, time_t when) {
488  item_flush_expired(get_handle(handle), when);
489 
490  return ENGINE_SUCCESS;
491 }
492 
493 static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie) {
494  struct default_engine *engine = get_handle(handle);
495  item_stats_reset(engine);
496 
497  pthread_mutex_lock(&engine->stats.lock);
498  engine->stats.evictions = 0;
499  engine->stats.reclaimed = 0;
500  engine->stats.total_items = 0;
501  pthread_mutex_unlock(&engine->stats.lock);
502 }
503 
504 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
505  const char *cfg_str) {
506  ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
507 
508  se->config.vb0 = true;
509 
510  if (cfg_str != NULL) {
511  struct config_item items[] = {
512  { .key = "use_cas",
513  .datatype = DT_BOOL,
514  .value.dt_bool = &se->config.use_cas },
515  { .key = "verbose",
516  .datatype = DT_SIZE,
517  .value.dt_size = &se->config.verbose },
518  { .key = "eviction",
519  .datatype = DT_BOOL,
520  .value.dt_bool = &se->config.evict_to_free },
521  { .key = "cache_size",
522  .datatype = DT_SIZE,
523  .value.dt_size = &se->config.maxbytes },
524  { .key = "preallocate",
525  .datatype = DT_BOOL,
526  .value.dt_bool = &se->config.preallocate },
527  { .key = "factor",
528  .datatype = DT_FLOAT,
529  .value.dt_float = &se->config.factor },
530  { .key = "chunk_size",
531  .datatype = DT_SIZE,
532  .value.dt_size = &se->config.chunk_size },
533  { .key = "item_size_max",
534  .datatype = DT_SIZE,
535  .value.dt_size = &se->config.item_size_max },
536  { .key = "ignore_vbucket",
537  .datatype = DT_BOOL,
538  .value.dt_bool = &se->config.ignore_vbucket },
539  { .key = "vb0",
540  .datatype = DT_BOOL,
541  .value.dt_bool = &se->config.vb0 },
542  { .key = "config_file",
543  .datatype = DT_CONFIGFILE },
544  { .key = NULL}
545  };
546 
547  ret = se->server.core->parse_config(cfg_str, items, stderr);
548  }
549 
550  if (se->config.vb0) {
551  set_vbucket_state(se, 0, vbucket_state_active);
552  }
553 
554  return ENGINE_SUCCESS;
555 }
556 
557 static bool set_vbucket(struct default_engine *e,
558  const void* cookie,
560  ADD_RESPONSE response) {
561  size_t bodylen = ntohl(req->message.header.request.bodylen)
562  - ntohs(req->message.header.request.keylen);
563  if (bodylen != sizeof(vbucket_state_t)) {
564  const char *msg = "Incorrect packet format";
565  return response(NULL, 0, NULL, 0, msg, strlen(msg),
566  PROTOCOL_BINARY_RAW_BYTES,
567  PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
568  }
569  vbucket_state_t state;
570  memcpy(&state, &req->message.body.state, sizeof(state));
571  state = ntohl(state);
572 
573  if (!is_valid_vbucket_state_t(state)) {
574  const char *msg = "Invalid vbucket state";
575  return response(NULL, 0, NULL, 0, msg, strlen(msg),
576  PROTOCOL_BINARY_RAW_BYTES,
577  PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
578  }
579 
580  set_vbucket_state(e, ntohs(req->message.header.request.vbucket), state);
581  return response(NULL, 0, NULL, 0, &state, sizeof(state),
582  PROTOCOL_BINARY_RAW_BYTES,
583  PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
584 }
585 
586 static bool get_vbucket(struct default_engine *e,
587  const void* cookie,
589  ADD_RESPONSE response) {
590  vbucket_state_t state;
591  state = get_vbucket_state(e, ntohs(req->message.header.request.vbucket));
592  state = ntohl(state);
593 
594  return response(NULL, 0, NULL, 0, &state, sizeof(state),
595  PROTOCOL_BINARY_RAW_BYTES,
596  PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
597 }
598 
599 static bool rm_vbucket(struct default_engine *e,
600  const void *cookie,
602  ADD_RESPONSE response) {
603  set_vbucket_state(e, ntohs(req->request.vbucket), vbucket_state_dead);
604  return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
605  PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
606 }
607 
608 static bool scrub_cmd(struct default_engine *e,
609  const void *cookie,
611  ADD_RESPONSE response) {
612 
613  protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
614  if (!item_start_scrub(e)) {
615  res = PROTOCOL_BINARY_RESPONSE_EBUSY;
616  }
617 
618  return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
619  res, 0, cookie);
620 }
621 
622 static bool touch(struct default_engine *e, const void *cookie,
624  ADD_RESPONSE response) {
625  if (request->request.extlen != 4 || request->request.keylen == 0) {
626  return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
627  PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
628  }
629 
630  protocol_binary_request_touch *t = (void*)request;
631  void *key = t->bytes + sizeof(t->bytes);
632  uint32_t exptime = ntohl(t->message.body.expiration);
633  uint16_t nkey = ntohs(request->request.keylen);
634 
635  hash_item *item = touch_item(e, key, nkey,
636  e->server.core->realtime(exptime));
637  if (item == NULL) {
638  if (request->request.opcode == PROTOCOL_BINARY_CMD_GATQ) {
639  return true;
640  } else {
641  return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
642  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie);
643  }
644  } else {
645  bool ret;
646  if (request->request.opcode == PROTOCOL_BINARY_CMD_TOUCH) {
647  ret = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
648  PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
649  } else {
650  ret = response(NULL, 0, &item->flags, sizeof(item->flags),
651  item_get_data(item), item->nbytes,
652  PROTOCOL_BINARY_RAW_BYTES,
653  PROTOCOL_BINARY_RESPONSE_SUCCESS,
654  item_get_cas(item), cookie);
655  }
656  item_release(e, item);
657  return ret;
658  }
659 }
660 
661 static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle,
662  const void* cookie,
664  ADD_RESPONSE response)
665 {
666  struct default_engine* e = get_handle(handle);
667  bool sent;
668 
669  switch(request->request.opcode) {
670  case PROTOCOL_BINARY_CMD_SCRUB:
671  sent = scrub_cmd(e, cookie, request, response);
672  break;
673  case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
674  sent = rm_vbucket(e, cookie, request, response);
675  break;
676  case PROTOCOL_BINARY_CMD_SET_VBUCKET:
677  sent = set_vbucket(e, cookie, (void*)request, response);
678  break;
679  case PROTOCOL_BINARY_CMD_GET_VBUCKET:
680  sent = get_vbucket(e, cookie, (void*)request, response);
681  break;
682  case PROTOCOL_BINARY_CMD_TOUCH:
683  case PROTOCOL_BINARY_CMD_GAT:
684  case PROTOCOL_BINARY_CMD_GATQ:
685  sent = touch(e, cookie, request, response);
686  break;
687  default:
688  sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
689  PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
690  break;
691  }
692 
693  if (sent) {
694  return ENGINE_SUCCESS;
695  } else {
696  return ENGINE_FAILED;
697  }
698 }
699 
700 
701 uint64_t item_get_cas(const hash_item* item)
702 {
703  if (item->iflag & ITEM_WITH_CAS) {
704  return *(uint64_t*)(item + 1);
705  }
706  return 0;
707 }
708 
709 void item_set_cas(ENGINE_HANDLE *handle, const void *cookie,
710  item* item, uint64_t val)
711 {
712  hash_item* it = get_real_item(item);
713  if (it->iflag & ITEM_WITH_CAS) {
714  *(uint64_t*)(it + 1) = val;
715  }
716 }
717 
718 const void* item_get_key(const hash_item* item)
719 {
720  char *ret = (void*)(item + 1);
721  if (item->iflag & ITEM_WITH_CAS) {
722  ret += sizeof(uint64_t);
723  }
724 
725  return ret;
726 }
727 
728 char* item_get_data(const hash_item* item)
729 {
730  return ((char*)item_get_key(item)) + item->nkey;
731 }
732 
733 uint8_t item_get_clsid(const hash_item* item)
734 {
735  return 0;
736 }
737 
738 static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie,
739  const item* item, item_info *item_info)
740 {
741  hash_item* it = (hash_item*)item;
742  if (item_info->nvalue < 1) {
743  return false;
744  }
745  item_info->cas = item_get_cas(it);
746  item_info->exptime = it->exptime;
747  item_info->nbytes = it->nbytes;
748  item_info->flags = it->flags;
749  item_info->clsid = it->slabs_clsid;
750  item_info->nkey = it->nkey;
751  item_info->nvalue = 1;
752  item_info->key = item_get_key(it);
753  item_info->value[0].iov_base = item_get_data(it);
754  item_info->value[0].iov_len = it->nbytes;
755  return true;
756 }
757 
758 static ENGINE_ERROR_CODE default_tap_notify(ENGINE_HANDLE* handle,
759  const void *cookie,
760  void *engine_specific,
761  uint16_t nengine,
762  uint8_t ttl,
763  uint16_t tap_flags,
764  tap_event_t tap_event,
765  uint32_t tap_seqno,
766  const void *key,
767  size_t nkey,
768  uint32_t flags,
769  uint32_t exptime,
770  uint64_t cas,
771  const void *data,
772  size_t ndata,
773  uint16_t vbucket) {
774  struct default_engine* engine = get_handle(handle);
775  vbucket_state_t state;
776  item *it;
777  ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
778 
779  switch (tap_event) {
780  case TAP_ACK:
781  /* We don't provide a tap stream, so we should never receive this */
782  abort();
783 
784  case TAP_FLUSH:
785  return default_flush(handle, cookie, 0);
786 
787  case TAP_DELETION:
788  return default_item_delete(handle, cookie, key, nkey, cas, vbucket);
789 
790  case TAP_MUTATION:
791  it = engine->server.cookie->get_engine_specific(cookie);
792  if (it == NULL) {
793  ret = default_item_allocate(handle, cookie, &it, key, nkey, ndata, flags, exptime);
794  switch (ret) {
795  case ENGINE_SUCCESS:
796  break;
797  case ENGINE_ENOMEM:
798  return ENGINE_TMPFAIL;
799  default:
800  return ret;
801  }
802  }
803  memcpy(item_get_data(it), data, ndata);
804  engine->server.cookie->store_engine_specific(cookie, NULL);
805  item_set_cas(handle, cookie, it, cas);
806  ret = default_store(handle, cookie, it, &cas, OPERATION_SET, vbucket);
807  if (ret == ENGINE_EWOULDBLOCK) {
808  engine->server.cookie->store_engine_specific(cookie, it);
809  } else {
810  item_release(engine, it);
811  }
812 
813  break;
814 
815  case TAP_VBUCKET_SET:
816  if (nengine != sizeof(vbucket_state_t)) {
817  // illegal size of the vbucket set package...
818  return ENGINE_DISCONNECT;
819  }
820 
821  memcpy(&state, engine_specific, nengine);
822  state = (vbucket_state_t)ntohl(state);
823 
824  if (!is_valid_vbucket_state_t(state)) {
825  return ENGINE_DISCONNECT;
826  }
827 
828  set_vbucket_state(engine, vbucket, state);
829  return ENGINE_SUCCESS;
830 
831  case TAP_OPAQUE:
832  // not supported, ignore
833  default:
834  engine->server.log->get_logger()->log(EXTENSION_LOG_DEBUG, cookie,
835  "Ignoring unknown tap event: %x", tap_event);
836  }
837 
838  return ret;
839 }
840 
841 static TAP_ITERATOR default_get_tap_iterator(ENGINE_HANDLE* handle,
842  const void* cookie,
843  const void* client,
844  size_t nclient,
845  uint32_t flags,
846  const void* userdata,
847  size_t nuserdata) {
848  struct default_engine* engine = get_handle(handle);
849 
850  if ((flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS)) { /* Not supported */
851  return NULL;
852  }
853 
854  pthread_mutex_lock(&engine->tap_connections.lock);
855  int ii;
856  for (ii = 0; ii < engine->tap_connections.size; ++ii) {
857  if (engine->tap_connections.clients[ii] == NULL) {
858  engine->tap_connections.clients[ii] = cookie;
859  break;
860  }
861  }
862  pthread_mutex_unlock(&engine->tap_connections.lock);
863  if (ii == engine->tap_connections.size) {
864  // @todo allow more connections :)
865  return NULL;
866  }
867 
868  if (!initialize_item_tap_walker(engine, cookie)) {
869  /* Failed to create */
870  pthread_mutex_lock(&engine->tap_connections.lock);
871  engine->tap_connections.clients[ii] = NULL;
872  pthread_mutex_unlock(&engine->tap_connections.lock);
873  return NULL;
874  }
875 
876  return item_tap_walker;
877  }
878 
879 static void default_handle_disconnect(const void *cookie,
880  ENGINE_EVENT_TYPE type,
881  const void *event_data,
882  const void *cb_data) {
883  struct default_engine *engine = (struct default_engine*)cb_data;
884  pthread_mutex_lock(&engine->tap_connections.lock);
885  int ii;
886  for (ii = 0; ii < engine->tap_connections.size; ++ii) {
887  if (engine->tap_connections.clients[ii] == cookie) {
888  free(engine->server.cookie->get_engine_specific(cookie));
889  break;
890  }
891  }
892  pthread_mutex_unlock(&engine->tap_connections.lock);
893 }