MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
items.c
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <assert.h>
10 #include <inttypes.h>
11 
12 #include "default_engine.h"
13 
14 /* Forward Declarations */
15 static void item_link_q(struct default_engine *engine, hash_item *it);
16 static void item_unlink_q(struct default_engine *engine, hash_item *it);
17 static hash_item *do_item_alloc(struct default_engine *engine,
18  const void *key, const size_t nkey,
19  const int flags, const rel_time_t exptime,
20  const int nbytes,
21  const void *cookie);
22 static hash_item *do_item_get(struct default_engine *engine,
23  const char *key, const size_t nkey);
24 static int do_item_link(struct default_engine *engine, hash_item *it);
25 static void do_item_unlink(struct default_engine *engine, hash_item *it);
26 static void do_item_release(struct default_engine *engine, hash_item *it);
27 static void do_item_update(struct default_engine *engine, hash_item *it);
28 static int do_item_replace(struct default_engine *engine,
29  hash_item *it, hash_item *new_it);
30 static void item_free(struct default_engine *engine, hash_item *it);
31 
32 /*
33  * We only reposition items in the LRU queue if they haven't been repositioned
34  * in this many seconds. That saves us from churning on frequently-accessed
35  * items.
36  */
37 #define ITEM_UPDATE_INTERVAL 60
38 /*
39  * To avoid scanning through the complete cache in some circumstances we'll
40  * just give up and return an error after inspecting a fixed number of objects.
41  */
42 static const int search_items = 50;
43 
44 void item_stats_reset(struct default_engine *engine) {
45  pthread_mutex_lock(&engine->cache_lock);
46  memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
47  pthread_mutex_unlock(&engine->cache_lock);
48 }
49 
50 
51 /* warning: don't use these macros with a function, as it evals its arg twice */
52 static inline size_t ITEM_ntotal(struct default_engine *engine,
53  const hash_item *item) {
54  size_t ret = sizeof(*item) + item->nkey + item->nbytes;
55  if (engine->config.use_cas) {
56  ret += sizeof(uint64_t);
57  }
58 
59  return ret;
60 }
61 
62 /* Get the next CAS id for a new item. */
63 static uint64_t get_cas_id(void) {
64  static uint64_t cas_id = 0;
65  return ++cas_id;
66 }
67 
68 /* Enable this for reference-count debugging. */
69 #if 0
70 # define DEBUG_REFCNT(it,op) \
71  fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72  it, op, it->refcount, \
73  (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74  (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
75 #else
76 # define DEBUG_REFCNT(it,op) while(0)
77 #endif
78 
79 
80 /*@null@*/
81 hash_item *do_item_alloc(struct default_engine *engine,
82  const void *key,
83  const size_t nkey,
84  const int flags,
85  const rel_time_t exptime,
86  const int nbytes,
87  const void *cookie) {
88  hash_item *it = NULL;
89  size_t ntotal = sizeof(hash_item) + nkey + nbytes;
90  if (engine->config.use_cas) {
91  ntotal += sizeof(uint64_t);
92  }
93 
94  unsigned int id = slabs_clsid(engine, ntotal);
95  if (id == 0)
96  return 0;
97 
98  /* do a quick check if we have any expired items in the tail.. */
99  int tries = search_items;
100  hash_item *search;
101 
102  rel_time_t current_time = engine->server.core->get_current_time();
103 
104  for (search = engine->items.tails[id];
105  tries > 0 && search != NULL;
106  tries--, search=search->prev) {
107  if (search->refcount == 0 &&
108  (search->exptime != 0 && search->exptime < current_time)) {
109  it = search;
110  /* I don't want to actually free the object, just steal
111  * the item to avoid to grab the slab mutex twice ;-)
112  */
113  pthread_mutex_lock(&engine->stats.lock);
114  engine->stats.reclaimed++;
115  pthread_mutex_unlock(&engine->stats.lock);
116  engine->items.itemstats[id].reclaimed++;
117  it->refcount = 1;
118  slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
119  do_item_unlink(engine, it);
120  /* Initialize the item block: */
121  it->slabs_clsid = 0;
122  it->refcount = 0;
123  break;
124  }
125  }
126 
127  if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
128  /*
129  ** Could not find an expired item at the tail, and memory allocation
130  ** failed. Try to evict some items!
131  */
132  tries = search_items;
133 
134  /* If requested to not push old items out of cache when memory runs out,
135  * we're out of luck at this point...
136  */
137 
138  if (engine->config.evict_to_free == 0) {
139  engine->items.itemstats[id].outofmemory++;
140  return NULL;
141  }
142 
143  /*
144  * try to get one off the right LRU
145  * don't necessariuly unlink the tail because it may be locked: refcount>0
146  * search up from tail an item with refcount==0 and unlink it; give up after search_items
147  * tries
148  */
149 
150  if (engine->items.tails[id] == 0) {
151  engine->items.itemstats[id].outofmemory++;
152  return NULL;
153  }
154 
155  for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
156  if (search->refcount == 0) {
157  if (search->exptime == 0 || search->exptime > current_time) {
158  engine->items.itemstats[id].evicted++;
159  engine->items.itemstats[id].evicted_time = current_time - search->time;
160  if (search->exptime != 0) {
161  engine->items.itemstats[id].evicted_nonzero++;
162  }
163  pthread_mutex_lock(&engine->stats.lock);
164  engine->stats.evictions++;
165  pthread_mutex_unlock(&engine->stats.lock);
166  engine->server.stat->evicting(cookie,
167  item_get_key(search),
168  search->nkey);
169  } else {
170  engine->items.itemstats[id].reclaimed++;
171  pthread_mutex_lock(&engine->stats.lock);
172  engine->stats.reclaimed++;
173  pthread_mutex_unlock(&engine->stats.lock);
174  }
175  do_item_unlink(engine, search);
176  break;
177  }
178  }
179  it = slabs_alloc(engine, ntotal, id);
180  if (it == 0) {
181  engine->items.itemstats[id].outofmemory++;
182  /* Last ditch effort. There is a very rare bug which causes
183  * refcount leaks. We've fixed most of them, but it still happens,
184  * and it may happen in the future.
185  * We can reasonably assume no item can stay locked for more than
186  * three hours, so if we find one in the tail which is that old,
187  * free it anyway.
188  */
189  tries = search_items;
190  for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
191  if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
192  engine->items.itemstats[id].tailrepairs++;
193  search->refcount = 0;
194  do_item_unlink(engine, search);
195  break;
196  }
197  }
198  it = slabs_alloc(engine, ntotal, id);
199  if (it == 0) {
200  return NULL;
201  }
202  }
203  }
204 
205  assert(it->slabs_clsid == 0);
206 
207  it->slabs_clsid = id;
208 
209  assert(it != engine->items.heads[it->slabs_clsid]);
210 
211  it->next = it->prev = it->h_next = 0;
212  it->refcount = 1; /* the caller will have a reference */
213  DEBUG_REFCNT(it, '*');
214  it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
215  it->nkey = nkey;
216  it->nbytes = nbytes;
217  it->flags = flags;
218  memcpy((void*)item_get_key(it), key, nkey);
219  it->exptime = exptime;
220  return it;
221 }
222 
223 static void item_free(struct default_engine *engine, hash_item *it) {
224  size_t ntotal = ITEM_ntotal(engine, it);
225  unsigned int clsid;
226  assert((it->iflag & ITEM_LINKED) == 0);
227  assert(it != engine->items.heads[it->slabs_clsid]);
228  assert(it != engine->items.tails[it->slabs_clsid]);
229  assert(it->refcount == 0);
230 
231  /* so slab size changer can tell later if item is already free or not */
232  clsid = it->slabs_clsid;
233  it->slabs_clsid = 0;
234  it->iflag |= ITEM_SLABBED;
235  DEBUG_REFCNT(it, 'F');
236  slabs_free(engine, it, ntotal, clsid);
237 }
238 
239 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
240  hash_item **head, **tail;
241  assert(it->slabs_clsid < POWER_LARGEST);
242  assert((it->iflag & ITEM_SLABBED) == 0);
243 
244  head = &engine->items.heads[it->slabs_clsid];
245  tail = &engine->items.tails[it->slabs_clsid];
246  assert(it != *head);
247  assert((*head && *tail) || (*head == 0 && *tail == 0));
248  it->prev = 0;
249  it->next = *head;
250  if (it->next) it->next->prev = it;
251  *head = it;
252  if (*tail == 0) *tail = it;
253  engine->items.sizes[it->slabs_clsid]++;
254  return;
255 }
256 
257 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
258  hash_item **head, **tail;
259  assert(it->slabs_clsid < POWER_LARGEST);
260  head = &engine->items.heads[it->slabs_clsid];
261  tail = &engine->items.tails[it->slabs_clsid];
262 
263  if (*head == it) {
264  assert(it->prev == 0);
265  *head = it->next;
266  }
267  if (*tail == it) {
268  assert(it->next == 0);
269  *tail = it->prev;
270  }
271  assert(it->next != it);
272  assert(it->prev != it);
273 
274  if (it->next) it->next->prev = it->prev;
275  if (it->prev) it->prev->next = it->next;
276  engine->items.sizes[it->slabs_clsid]--;
277  return;
278 }
279 
280 int do_item_link(struct default_engine *engine, hash_item *it) {
281  MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
282  assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
283  assert(it->nbytes < (1024 * 1024)); /* 1MB max size */
284  it->iflag |= ITEM_LINKED;
285  it->time = engine->server.core->get_current_time();
286  assoc_insert(engine, engine->server.core->hash(item_get_key(it),
287  it->nkey, 0),
288  it);
289 
290  pthread_mutex_lock(&engine->stats.lock);
291  engine->stats.curr_bytes += ITEM_ntotal(engine, it);
292  engine->stats.curr_items += 1;
293  engine->stats.total_items += 1;
294  pthread_mutex_unlock(&engine->stats.lock);
295 
296  /* Allocate a new CAS ID on link. */
297  item_set_cas(NULL, NULL, it, get_cas_id());
298 
299  item_link_q(engine, it);
300 
301  return 1;
302 }
303 
304 void do_item_unlink(struct default_engine *engine, hash_item *it) {
305  MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
306  if ((it->iflag & ITEM_LINKED) != 0) {
307  it->iflag &= ~ITEM_LINKED;
308  pthread_mutex_lock(&engine->stats.lock);
309  engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
310  engine->stats.curr_items -= 1;
311  pthread_mutex_unlock(&engine->stats.lock);
312  assoc_delete(engine, engine->server.core->hash(item_get_key(it),
313  it->nkey, 0),
314  item_get_key(it), it->nkey);
315  item_unlink_q(engine, it);
316  if (it->refcount == 0) {
317  item_free(engine, it);
318  }
319  }
320 }
321 
322 void do_item_release(struct default_engine *engine, hash_item *it) {
323  MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
324  if (it->refcount != 0) {
325  it->refcount--;
326  DEBUG_REFCNT(it, '-');
327  }
328  if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
329  item_free(engine, it);
330  }
331 }
332 
333 void do_item_update(struct default_engine *engine, hash_item *it) {
334  rel_time_t current_time = engine->server.core->get_current_time();
335  MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
336  if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
337  assert((it->iflag & ITEM_SLABBED) == 0);
338 
339  if ((it->iflag & ITEM_LINKED) != 0) {
340  item_unlink_q(engine, it);
341  it->time = current_time;
342  item_link_q(engine, it);
343  }
344  }
345 }
346 
347 int do_item_replace(struct default_engine *engine,
348  hash_item *it, hash_item *new_it) {
349  MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
350  item_get_key(new_it), new_it->nkey, new_it->nbytes);
351  assert((it->iflag & ITEM_SLABBED) == 0);
352 
353  do_item_unlink(engine, it);
354  return do_item_link(engine, new_it);
355 }
356 
357 /*@null@*/
358 static char *do_item_cachedump(const unsigned int slabs_clsid,
359  const unsigned int limit,
360  unsigned int *bytes) {
361 #ifdef FUTURE
362  unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
363  char *buffer;
364  unsigned int bufcurr;
365  hash_item *it;
366  unsigned int len;
367  unsigned int shown = 0;
368  char key_temp[KEY_MAX_LENGTH + 1];
369  char temp[512];
370 
371  it = engine->items.heads[slabs_clsid];
372 
373  buffer = malloc((size_t)memlimit);
374  if (buffer == 0) return NULL;
375  bufcurr = 0;
376 
377 
378  while (it != NULL && (limit == 0 || shown < limit)) {
379  assert(it->nkey <= KEY_MAX_LENGTH);
380  /* Copy the key since it may not be null-terminated in the struct */
381  strncpy(key_temp, item_get_key(it), it->nkey);
382  key_temp[it->nkey] = 0x00; /* terminate */
383  len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
384  key_temp, it->nbytes,
385  (unsigned long)it->exptime + process_started);
386  if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
387  break;
388  memcpy(buffer + bufcurr, temp, len);
389  bufcurr += len;
390  shown++;
391  it = it->next;
392  }
393 
394 
395  memcpy(buffer + bufcurr, "END\r\n", 6);
396  bufcurr += 5;
397 
398  *bytes = bufcurr;
399  return buffer;
400 #endif
401  (void)slabs_clsid;
402  (void)limit;
403  (void)bytes;
404  return NULL;
405 }
406 
407 static void do_item_stats(struct default_engine *engine,
408  ADD_STAT add_stats, const void *c) {
409  int i;
410  rel_time_t current_time = engine->server.core->get_current_time();
411  for (i = 0; i < POWER_LARGEST; i++) {
412  if (engine->items.tails[i] != NULL) {
413  int search = search_items;
414  while (search > 0 &&
415  engine->items.tails[i] != NULL &&
416  ((engine->config.oldest_live != 0 && /* Item flushd */
417  engine->config.oldest_live <= current_time &&
418  engine->items.tails[i]->time <= engine->config.oldest_live) ||
419  (engine->items.tails[i]->exptime != 0 && /* and not expired */
420  engine->items.tails[i]->exptime < current_time))) {
421  --search;
422  if (engine->items.tails[i]->refcount == 0) {
423  do_item_unlink(engine, engine->items.tails[i]);
424  } else {
425  break;
426  }
427  }
428  if (engine->items.tails[i] == NULL) {
429  /* We removed all of the items in this slab class */
430  continue;
431  }
432 
433  const char *prefix = "items";
434  add_statistics(c, add_stats, prefix, i, "number", "%u",
435  engine->items.sizes[i]);
436  add_statistics(c, add_stats, prefix, i, "age", "%u",
437  engine->items.tails[i]->time);
438  add_statistics(c, add_stats, prefix, i, "evicted",
439  "%u", engine->items.itemstats[i].evicted);
440  add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
441  "%u", engine->items.itemstats[i].evicted_nonzero);
442  add_statistics(c, add_stats, prefix, i, "evicted_time",
443  "%u", engine->items.itemstats[i].evicted_time);
444  add_statistics(c, add_stats, prefix, i, "outofmemory",
445  "%u", engine->items.itemstats[i].outofmemory);
446  add_statistics(c, add_stats, prefix, i, "tailrepairs",
447  "%u", engine->items.itemstats[i].tailrepairs);;
448  add_statistics(c, add_stats, prefix, i, "reclaimed",
449  "%u", engine->items.itemstats[i].reclaimed);;
450  }
451  }
452 }
453 
455 /*@null@*/
456 static void do_item_stats_sizes(struct default_engine *engine,
457  ADD_STAT add_stats, const void *c) {
458 
459  /* max 1MB object, divided into 32 bytes size buckets */
460  const int num_buckets = 32768;
461  unsigned int *histogram = calloc(num_buckets, sizeof(int));
462 
463  if (histogram != NULL) {
464  int i;
465 
466  /* build the histogram */
467  for (i = 0; i < POWER_LARGEST; i++) {
468  hash_item *iter = engine->items.heads[i];
469  while (iter) {
470  int ntotal = ITEM_ntotal(engine, iter);
471  int bucket = ntotal / 32;
472  if ((ntotal % 32) != 0) bucket++;
473  if (bucket < num_buckets) histogram[bucket]++;
474  iter = iter->next;
475  }
476  }
477 
478  /* write the buffer */
479  for (i = 0; i < num_buckets; i++) {
480  if (histogram[i] != 0) {
481  char key[8], val[32];
482  int klen, vlen;
483  klen = snprintf(key, sizeof(key), "%d", i * 32);
484  vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
485  assert(klen < sizeof(key));
486  assert(vlen < sizeof(val));
487  add_stats(key, klen, val, vlen, c);
488  }
489  }
490  free(histogram);
491  }
492 }
493 
495 hash_item *do_item_get(struct default_engine *engine,
496  const char *key, const size_t nkey) {
497  rel_time_t current_time = engine->server.core->get_current_time();
498  hash_item *it = assoc_find(engine, engine->server.core->hash(key,
499  nkey, 0),
500  key, nkey);
501  int was_found = 0;
502 
503  if (engine->config.verbose > 2) {
505  logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
506  if (it == NULL) {
507  logger->log(EXTENSION_LOG_DEBUG, NULL,
508  "> NOT FOUND %s", key);
509  } else {
510  logger->log(EXTENSION_LOG_DEBUG, NULL,
511  "> FOUND KEY %s",
512  (const char*)item_get_key(it));
513  was_found++;
514  }
515  }
516 
517  if (it != NULL && engine->config.oldest_live != 0 &&
518  engine->config.oldest_live <= current_time &&
519  it->time <= engine->config.oldest_live) {
520  do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
521  it = NULL;
522  }
523 
524  if (it == NULL && was_found) {
526  logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
527  logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
528  was_found--;
529  }
530 
531  if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
532  do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
533  it = NULL;
534  }
535 
536  if (it == NULL && was_found) {
538  logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
539  logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
540  was_found--;
541  }
542 
543  if (it != NULL) {
544  it->refcount++;
545  DEBUG_REFCNT(it, '+');
546  do_item_update(engine, it);
547  }
548 
549  return it;
550 }
551 
552 /*
553  * Stores an item in the cache according to the semantics of one of the set
554  * commands. In threaded mode, this is protected by the cache lock.
555  *
556  * Returns the state of storage.
557  */
558 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
559  hash_item *it, uint64_t *cas,
560  ENGINE_STORE_OPERATION operation,
561  const void *cookie) {
562  const char *key = item_get_key(it);
563  hash_item *old_it = do_item_get(engine, key, it->nkey);
564  ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
565 
566  hash_item *new_it = NULL;
567 
568  if (old_it != NULL && operation == OPERATION_ADD) {
569  /* add only adds a nonexistent item, but promote to head of LRU */
570  do_item_update(engine, old_it);
571  } else if (!old_it && (operation == OPERATION_REPLACE
572  || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
573  {
574  /* replace only replaces an existing value; don't store */
575  } else if (operation == OPERATION_CAS) {
576  /* validate cas operation */
577  if(old_it == NULL) {
578  // LRU expired
579  stored = ENGINE_KEY_ENOENT;
580  }
581  else if (item_get_cas(it) == item_get_cas(old_it)) {
582  // cas validates
583  // it and old_it may belong to different classes.
584  // I'm updating the stats for the one that's getting pushed out
585  do_item_replace(engine, old_it, it);
586  stored = ENGINE_SUCCESS;
587  } else {
588  if (engine->config.verbose > 1) {
590  logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
591  logger->log(EXTENSION_LOG_INFO, NULL,
592  "CAS: failure: expected %"PRIu64", got %"PRIu64"\n",
593  item_get_cas(old_it),
594  item_get_cas(it));
595  }
596  stored = ENGINE_KEY_EEXISTS;
597  }
598  } else {
599  /*
600  * Append - combine new and old record into single one. Here it's
601  * atomic and thread-safe.
602  */
603  if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
604  /*
605  * Validate CAS
606  */
607  if (item_get_cas(it) != 0) {
608  // CAS much be equal
609  if (item_get_cas(it) != item_get_cas(old_it)) {
610  stored = ENGINE_KEY_EEXISTS;
611  }
612  }
613 
614  if (stored == ENGINE_NOT_STORED) {
615  /* we have it and old_it here - alloc memory to hold both */
616  new_it = do_item_alloc(engine, key, it->nkey,
617  old_it->flags,
618  old_it->exptime,
619  it->nbytes + old_it->nbytes,
620  cookie);
621 
622  if (new_it == NULL) {
623  /* SERVER_ERROR out of memory */
624  if (old_it != NULL) {
625  do_item_release(engine, old_it);
626  }
627 
628  return ENGINE_NOT_STORED;
629  }
630 
631  /* copy data from it and old_it to new_it */
632 
633  if (operation == OPERATION_APPEND) {
634  memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
635  memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
636  } else {
637  /* OPERATION_PREPEND */
638  memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
639  memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
640  }
641 
642  it = new_it;
643  }
644  }
645 
646  if (stored == ENGINE_NOT_STORED) {
647  if (old_it != NULL) {
648  do_item_replace(engine, old_it, it);
649  } else {
650  do_item_link(engine, it);
651  }
652 
653  *cas = item_get_cas(it);
654  stored = ENGINE_SUCCESS;
655  }
656  }
657 
658  if (old_it != NULL) {
659  do_item_release(engine, old_it); /* release our reference */
660  }
661 
662  if (new_it != NULL) {
663  do_item_release(engine, new_it);
664  }
665 
666  if (stored == ENGINE_SUCCESS) {
667  *cas = item_get_cas(it);
668  }
669 
670  return stored;
671 }
672 
673 
674 /*
675  * adds a delta value to a numeric item.
676  *
677  * c connection requesting the operation
678  * it item to adjust
679  * incr true to increment value, false to decrement
680  * delta amount to adjust value by
681  * buf buffer for response string
682  *
683  * returns a response string to send back to the client.
684  */
685 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
686  hash_item *it, const bool incr,
687  const int64_t delta, uint64_t *rcas,
688  uint64_t *result, const void *cookie) {
689  const char *ptr;
690  uint64_t value;
691  char buf[80];
692  int res;
693 
694  if (it->nbytes >= (sizeof(buf) - 1)) {
695  return ENGINE_EINVAL;
696  }
697 
698  ptr = item_get_data(it);
699  memcpy(buf, ptr, it->nbytes);
700  buf[it->nbytes] = '\0';
701 
702  if (!safe_strtoull(buf, &value)) {
703  return ENGINE_EINVAL;
704  }
705 
706  if (incr) {
707  value += delta;
708  } else {
709  if(delta > value) {
710  value = 0;
711  } else {
712  value -= delta;
713  }
714  }
715 
716  *result = value;
717  if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
718  return ENGINE_EINVAL;
719  }
720 
721  if (it->refcount == 1 && res <= it->nbytes) {
722  // we can do inline replacement
723  memcpy(item_get_data(it), buf, res);
724  memset(item_get_data(it) + res, ' ', it->nbytes - res);
725  item_set_cas(NULL, NULL, it, get_cas_id());
726  *rcas = item_get_cas(it);
727  } else {
728  hash_item *new_it = do_item_alloc(engine, item_get_key(it),
729  it->nkey, it->flags,
730  it->exptime, res,
731  cookie);
732  if (new_it == NULL) {
733  do_item_unlink(engine, it);
734  return ENGINE_ENOMEM;
735  }
736  memcpy(item_get_data(new_it), buf, res);
737  do_item_replace(engine, it, new_it);
738  *rcas = item_get_cas(new_it);
739  do_item_release(engine, new_it); /* release our reference */
740  }
741 
742  return ENGINE_SUCCESS;
743 }
744 
745 /********************************* ITEM ACCESS *******************************/
746 
747 /*
748  * Allocates a new item.
749  */
750 hash_item *item_alloc(struct default_engine *engine,
751  const void *key, size_t nkey, int flags,
752  rel_time_t exptime, int nbytes, const void *cookie) {
753  hash_item *it;
754  pthread_mutex_lock(&engine->cache_lock);
755  it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
756  pthread_mutex_unlock(&engine->cache_lock);
757  return it;
758 }
759 
760 /*
761  * Returns an item if it hasn't been marked as expired,
762  * lazy-expiring as needed.
763  */
764 hash_item *item_get(struct default_engine *engine,
765  const void *key, const size_t nkey) {
766  hash_item *it;
767  pthread_mutex_lock(&engine->cache_lock);
768  it = do_item_get(engine, key, nkey);
769  pthread_mutex_unlock(&engine->cache_lock);
770  return it;
771 }
772 
773 /*
774  * Decrements the reference count on an item and adds it to the freelist if
775  * needed.
776  */
777 void item_release(struct default_engine *engine, hash_item *item) {
778  pthread_mutex_lock(&engine->cache_lock);
779  do_item_release(engine, item);
780  pthread_mutex_unlock(&engine->cache_lock);
781 }
782 
783 /*
784  * Unlinks an item from the LRU and hashtable.
785  */
786 void item_unlink(struct default_engine *engine, hash_item *item) {
787  pthread_mutex_lock(&engine->cache_lock);
788  do_item_unlink(engine, item);
789  pthread_mutex_unlock(&engine->cache_lock);
790 }
791 
792 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
793  const void* cookie,
794  const void* key,
795  const int nkey,
796  const bool increment,
797  const bool create,
798  const uint64_t delta,
799  const uint64_t initial,
800  const rel_time_t exptime,
801  uint64_t *cas,
802  uint64_t *result)
803 {
804  hash_item *item = do_item_get(engine, key, nkey);
805  ENGINE_ERROR_CODE ret;
806 
807  if (item == NULL) {
808  if (!create) {
809  return ENGINE_KEY_ENOENT;
810  } else {
811  char buffer[128];
812  int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
813  (uint64_t)initial);
814 
815  item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
816  if (item == NULL) {
817  return ENGINE_ENOMEM;
818  }
819  memcpy((void*)item_get_data(item), buffer, len);
820  if ((ret = do_store_item(engine, item, cas,
821  OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
822  *result = initial;
823  *cas = item_get_cas(item);
824  }
825  do_item_release(engine, item);
826  }
827  } else {
828  ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
829  do_item_release(engine, item);
830  }
831 
832  return ret;
833 }
834 
835 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
836  const void* cookie,
837  const void* key,
838  const int nkey,
839  const bool increment,
840  const bool create,
841  const uint64_t delta,
842  const uint64_t initial,
843  const rel_time_t exptime,
844  uint64_t *cas,
845  uint64_t *result)
846 {
847  ENGINE_ERROR_CODE ret;
848 
849  pthread_mutex_lock(&engine->cache_lock);
850  ret = do_arithmetic(engine, cookie, key, nkey, increment,
851  create, delta, initial, exptime, cas,
852  result);
853  pthread_mutex_unlock(&engine->cache_lock);
854  return ret;
855 }
856 
857 /*
858  * Stores an item in the cache (high level, obeys set/add/replace semantics)
859  */
860 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
861  hash_item *item, uint64_t *cas,
862  ENGINE_STORE_OPERATION operation,
863  const void *cookie) {
864  ENGINE_ERROR_CODE ret;
865 
866  pthread_mutex_lock(&engine->cache_lock);
867  ret = do_store_item(engine, item, cas, operation, cookie);
868  pthread_mutex_unlock(&engine->cache_lock);
869  return ret;
870 }
871 
872 static hash_item *do_touch_item(struct default_engine *engine,
873  const void *key,
874  uint16_t nkey,
875  uint32_t exptime)
876 {
877  hash_item *item = do_item_get(engine, key, nkey);
878  if (item != NULL) {
879  item->exptime = exptime;
880  }
881  return item;
882 }
883 
884 hash_item *touch_item(struct default_engine *engine,
885  const void *key,
886  uint16_t nkey,
887  uint32_t exptime)
888 {
889  hash_item *ret;
890 
891  pthread_mutex_lock(&engine->cache_lock);
892  ret = do_touch_item(engine, key, nkey, exptime);
893  pthread_mutex_unlock(&engine->cache_lock);
894  return ret;
895 }
896 
897 /*
898  * Flushes expired items after a flush_all call
899  */
900 void item_flush_expired(struct default_engine *engine, time_t when) {
901  int i;
902  hash_item *iter, *next;
903 
904  pthread_mutex_lock(&engine->cache_lock);
905 
906  if (when == 0) {
907  engine->config.oldest_live = engine->server.core->get_current_time() - 1;
908  } else {
909  engine->config.oldest_live = engine->server.core->realtime(when) - 1;
910  }
911 
912  if (engine->config.oldest_live != 0) {
913  for (i = 0; i < POWER_LARGEST; i++) {
914  /*
915  * The LRU is sorted in decreasing time order, and an item's
916  * timestamp is never newer than its last access time, so we
917  * only need to walk back until we hit an item older than the
918  * oldest_live time.
919  * The oldest_live checking will auto-expire the remaining items.
920  */
921  for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
922  if (iter->time >= engine->config.oldest_live) {
923  next = iter->next;
924  if ((iter->iflag & ITEM_SLABBED) == 0) {
925  do_item_unlink(engine, iter);
926  }
927  } else {
928  /* We've hit the first old item. Continue to the next queue. */
929  break;
930  }
931  }
932  }
933  }
934  pthread_mutex_unlock(&engine->cache_lock);
935 }
936 
937 /*
938  * Dumps part of the cache
939  */
940 char *item_cachedump(struct default_engine *engine,
941  unsigned int slabs_clsid,
942  unsigned int limit,
943  unsigned int *bytes) {
944  char *ret;
945 
946  pthread_mutex_lock(&engine->cache_lock);
947  ret = do_item_cachedump(slabs_clsid, limit, bytes);
948  pthread_mutex_unlock(&engine->cache_lock);
949  return ret;
950 }
951 
952 void item_stats(struct default_engine *engine,
953  ADD_STAT add_stat, const void *cookie)
954 {
955  pthread_mutex_lock(&engine->cache_lock);
956  do_item_stats(engine, add_stat, cookie);
957  pthread_mutex_unlock(&engine->cache_lock);
958 }
959 
960 
961 void item_stats_sizes(struct default_engine *engine,
962  ADD_STAT add_stat, const void *cookie)
963 {
964  pthread_mutex_lock(&engine->cache_lock);
965  do_item_stats_sizes(engine, add_stat, cookie);
966  pthread_mutex_unlock(&engine->cache_lock);
967 }
968 
969 static void do_item_link_cursor(struct default_engine *engine,
970  hash_item *cursor, int ii)
971 {
972  cursor->slabs_clsid = (uint8_t)ii;
973  cursor->next = NULL;
974  cursor->prev = engine->items.tails[ii];
975  engine->items.tails[ii]->next = cursor;
976  engine->items.tails[ii] = cursor;
977  engine->items.sizes[ii]++;
978 }
979 
980 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
981  hash_item *item, void *cookie);
982 
983 static bool do_item_walk_cursor(struct default_engine *engine,
984  hash_item *cursor,
985  int steplength,
986  ITERFUNC itemfunc,
987  void* itemdata,
988  ENGINE_ERROR_CODE *error)
989 {
990  int ii = 0;
991  *error = ENGINE_SUCCESS;
992 
993  while (cursor->prev != NULL && ii < steplength) {
994  ++ii;
995  /* Move cursor */
996  hash_item *ptr = cursor->prev;
997  item_unlink_q(engine, cursor);
998 
999  bool done = false;
1000  if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1001  done = true;
1002  cursor->prev = NULL;
1003  } else {
1004  cursor->next = ptr;
1005  cursor->prev = ptr->prev;
1006  cursor->prev->next = cursor;
1007  ptr->prev = cursor;
1008  }
1009 
1010  /* Ignore cursors */
1011  if (ptr->nkey == 0 && ptr->nbytes == 0) {
1012  --ii;
1013  } else {
1014  *error = itemfunc(engine, ptr, itemdata);
1015  if (*error != ENGINE_SUCCESS) {
1016  return false;
1017  }
1018  }
1019 
1020  if (done) {
1021  return false;
1022  }
1023  }
1024 
1025  return (cursor->prev != NULL);
1026 }
1027 
1028 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1029  hash_item *item,
1030  void *cookie) {
1031  (void)cookie;
1032  engine->scrubber.visited++;
1033  rel_time_t current_time = engine->server.core->get_current_time();
1034  if (item->refcount == 0 &&
1035  (item->exptime != 0 && item->exptime < current_time)) {
1036  do_item_unlink(engine, item);
1037  engine->scrubber.cleaned++;
1038  }
1039  return ENGINE_SUCCESS;
1040 }
1041 
1042 static void item_scrub_class(struct default_engine *engine,
1043  hash_item *cursor) {
1044 
1045  ENGINE_ERROR_CODE ret;
1046  bool more;
1047  do {
1048  pthread_mutex_lock(&engine->cache_lock);
1049  more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1050  pthread_mutex_unlock(&engine->cache_lock);
1051  if (ret != ENGINE_SUCCESS) {
1052  break;
1053  }
1054  } while (more);
1055 }
1056 
1057 static void *item_scubber_main(void *arg)
1058 {
1059  struct default_engine *engine = arg;
1060  hash_item cursor = { .refcount = 1 };
1061 
1062  for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1063  pthread_mutex_lock(&engine->cache_lock);
1064  bool skip = false;
1065  if (engine->items.heads[ii] == NULL) {
1066  skip = true;
1067  } else {
1068  // add the item at the tail
1069  do_item_link_cursor(engine, &cursor, ii);
1070  }
1071  pthread_mutex_unlock(&engine->cache_lock);
1072 
1073  if (!skip) {
1074  item_scrub_class(engine, &cursor);
1075  }
1076  }
1077 
1078  pthread_mutex_lock(&engine->scrubber.lock);
1079  engine->scrubber.stopped = time(NULL);
1080  engine->scrubber.running = false;
1081  pthread_mutex_unlock(&engine->scrubber.lock);
1082 
1083  return NULL;
1084 }
1085 
1086 bool item_start_scrub(struct default_engine *engine)
1087 {
1088  bool ret = false;
1089  pthread_mutex_lock(&engine->scrubber.lock);
1090  if (!engine->scrubber.running) {
1091  engine->scrubber.started = time(NULL);
1092  engine->scrubber.stopped = 0;
1093  engine->scrubber.visited = 0;
1094  engine->scrubber.cleaned = 0;
1095  engine->scrubber.running = true;
1096 
1097  pthread_t t;
1098  pthread_attr_t attr;
1099 
1100  if (pthread_attr_init(&attr) != 0 ||
1101  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1102  pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1103  {
1104  engine->scrubber.running = false;
1105  } else {
1106  ret = true;
1107  }
1108  }
1109  pthread_mutex_unlock(&engine->scrubber.lock);
1110 
1111  return ret;
1112 }
1113 
1114 struct tap_client {
1115  hash_item cursor;
1116  hash_item *it;
1117 };
1118 
1119 static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
1120  hash_item *item,
1121  void *cookie) {
1122  struct tap_client *client = cookie;
1123  client->it = item;
1124  ++client->it->refcount;
1125  return ENGINE_SUCCESS;
1126 }
1127 
1128 static tap_event_t do_item_tap_walker(struct default_engine *engine,
1129  const void *cookie, item **itm,
1130  void **es, uint16_t *nes, uint8_t *ttl,
1131  uint16_t *flags, uint32_t *seqno,
1132  uint16_t *vbucket)
1133 {
1134  struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
1135  if (client == NULL) {
1136  return TAP_DISCONNECT;
1137  }
1138 
1139  *es = NULL;
1140  *nes = 0;
1141  *ttl = (uint8_t)-1;
1142  *seqno = 0;
1143  *flags = 0;
1144  *vbucket = 0;
1145  client->it = NULL;
1146 
1147  ENGINE_ERROR_CODE r;
1148  do {
1149  if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1150  // find next slab class to look at..
1151  bool linked = false;
1152  for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked; ++ii) {
1153  if (engine->items.heads[ii] != NULL) {
1154  // add the item at the tail
1155  do_item_link_cursor(engine, &client->cursor, ii);
1156  linked = true;
1157  }
1158  }
1159  if (!linked) {
1160  break;
1161  }
1162  }
1163  } while (client->it == NULL);
1164  *itm = client->it;
1165 
1166  return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1167 }
1168 
1169 tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
1170  const void *cookie, item **itm,
1171  void **es, uint16_t *nes, uint8_t *ttl,
1172  uint16_t *flags, uint32_t *seqno,
1173  uint16_t *vbucket)
1174 {
1175  tap_event_t ret;
1176  struct default_engine *engine = (struct default_engine*)handle;
1177  pthread_mutex_lock(&engine->cache_lock);
1178  ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1179  pthread_mutex_unlock(&engine->cache_lock);
1180 
1181  return ret;
1182 }
1183 
1184 bool initialize_item_tap_walker(struct default_engine *engine,
1185  const void* cookie)
1186 {
1187  struct tap_client *client = calloc(1, sizeof(*client));
1188  if (client == NULL) {
1189  return false;
1190  }
1191  client->cursor.refcount = 1;
1192 
1193  /* Link the cursor! */
1194  bool linked = false;
1195  for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1196  pthread_mutex_lock(&engine->cache_lock);
1197  if (engine->items.heads[ii] != NULL) {
1198  // add the item at the tail
1199  do_item_link_cursor(engine, &client->cursor, ii);
1200  linked = true;
1201  }
1202  pthread_mutex_unlock(&engine->cache_lock);
1203  }
1204 
1205  engine->server.cookie->store_engine_specific(cookie, client);
1206  return true;
1207 }