MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
items.c
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <assert.h>
10 #include <inttypes.h>
11 
12 #include "default_engine.h"
13 
14 /* Forward Declarations */
15 static void item_link_q(struct default_engine *engine, hash_item *it);
16 static void item_unlink_q(struct default_engine *engine, hash_item *it);
17 static hash_item *do_item_alloc(struct default_engine *engine,
18  const void *key, const size_t nkey,
19  const int flags, const rel_time_t exptime,
20  const int nbytes,
21  const void *cookie);
22 static hash_item *do_item_get(struct default_engine *engine,
23  const char *key, const size_t nkey);
24 static int do_item_link(struct default_engine *engine, hash_item *it);
25 static void do_item_unlink(struct default_engine *engine, hash_item *it);
26 static void do_item_release(struct default_engine *engine, hash_item *it);
27 static void do_item_update(struct default_engine *engine, hash_item *it);
28 static int do_item_replace(struct default_engine *engine,
29  hash_item *it, hash_item *new_it);
30 static void item_free(struct default_engine *engine, hash_item *it);
31 
32 /*
33  * We only reposition items in the LRU queue if they haven't been repositioned
34  * in this many seconds. That saves us from churning on frequently-accessed
35  * items.
36  */
37 #define ITEM_UPDATE_INTERVAL 60
38 
39 void cache_set_initial_cas_id(uint64_t cas);
40 static uint64_t cas_id;
41 
42 void item_stats_reset(struct default_engine *engine) {
43  pthread_mutex_lock(&engine->cache_lock);
44  memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
45  pthread_mutex_unlock(&engine->cache_lock);
46 }
47 
48 
49 /* warning: don't use these macros with a function, as it evals its arg twice */
50 static inline size_t ITEM_ntotal(struct default_engine *engine,
51  const hash_item *item) {
52  size_t ret = sizeof(*item) + item->nkey + item->nbytes;
53  if (engine->config.use_cas) {
54  ret += sizeof(uint64_t);
55  }
56 
57  return ret;
58 }
59 
60 /* Set the initial CAS id for the hash table */
61 void cache_set_initial_cas_id(uint64_t cas) {
62  cas_id = cas;
63 }
64 
65 /* Get the next CAS id for a new item. */
66 static uint64_t get_cas_id(void) {
67  return ++cas_id;
68 }
69 
70 /* Enable this for reference-count debugging. */
71 #if 0
72 # define DEBUG_REFCNT(it,op) \
73  fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
74  it, op, it->refcount, \
75  (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
76  (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
77 #else
78 # define DEBUG_REFCNT(it,op) while(0)
79 #endif
80 
81 
82 /*@null@*/
83 hash_item *do_item_alloc(struct default_engine *engine,
84  const void *key,
85  const size_t nkey,
86  const int flags,
87  const rel_time_t exptime,
88  const int nbytes,
89  const void *cookie) {
90  hash_item *it = NULL;
91  size_t ntotal = sizeof(hash_item) + nkey + nbytes;
92  if (engine->config.use_cas) {
93  ntotal += sizeof(uint64_t);
94  }
95 
96  unsigned int id = slabs_clsid(engine, ntotal);
97  if (id == 0)
98  return 0;
99 
100  /* do a quick check if we have any expired items in the tail.. */
101  int tries = 50;
102  hash_item *search;
103 
104  rel_time_t current_time = engine->server.core->get_current_time();
105 
106  for (search = engine->items.tails[id];
107  tries > 0 && search != NULL;
108  tries--, search=search->prev) {
109  if (search->refcount == 0 &&
110  (search->exptime != 0 && search->exptime < current_time)) {
111  it = search;
112  /* I don't want to actually free the object, just steal
113  * the item to avoid to grab the slab mutex twice ;-)
114  */
115  pthread_mutex_lock(&engine->stats.lock);
116  engine->stats.reclaimed++;
117  pthread_mutex_unlock(&engine->stats.lock);
118  engine->items.itemstats[id].reclaimed++;
119  it->refcount = 1;
120  do_item_unlink(engine, it);
121  /* Initialize the item block: */
122  it->slabs_clsid = 0;
123  it->refcount = 0;
124  break;
125  }
126  }
127 
128  if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
129  /*
130  ** Could not find an expired item at the tail, and memory allocation
131  ** failed. Try to evict some items!
132  */
133  tries = 50;
134 
135  /* If requested to not push old items out of cache when memory runs out,
136  * we're out of luck at this point...
137  */
138 
139  if (engine->config.evict_to_free == 0) {
140  engine->items.itemstats[id].outofmemory++;
141  return NULL;
142  }
143 
144  /*
145  * try to get one off the right LRU
146  * don't necessariuly unlink the tail because it may be locked: refcount>0
147  * search up from tail an item with refcount==0 and unlink it; give up after 50
148  * tries
149  */
150 
151  if (engine->items.tails[id] == 0) {
152  engine->items.itemstats[id].outofmemory++;
153  return NULL;
154  }
155 
156  for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
157  if (search->refcount == 0) {
158  if (search->exptime == 0 || search->exptime > current_time) {
159  engine->items.itemstats[id].evicted++;
160  engine->items.itemstats[id].evicted_time = current_time - search->time;
161  if (search->exptime != 0) {
162  engine->items.itemstats[id].evicted_nonzero++;
163  }
164  pthread_mutex_lock(&engine->stats.lock);
165  engine->stats.evictions++;
166  pthread_mutex_unlock(&engine->stats.lock);
167  engine->server.stat->evicting(cookie,
168  item_get_key(search),
169  search->nkey);
170  } else {
171  engine->items.itemstats[id].reclaimed++;
172  pthread_mutex_lock(&engine->stats.lock);
173  engine->stats.reclaimed++;
174  pthread_mutex_unlock(&engine->stats.lock);
175  }
176  do_item_unlink(engine, search);
177  break;
178  }
179  }
180  it = slabs_alloc(engine, ntotal, id);
181  if (it == 0) {
182  engine->items.itemstats[id].outofmemory++;
183  /* Last ditch effort. There is a very rare bug which causes
184  * refcount leaks. We've fixed most of them, but it still happens,
185  * and it may happen in the future.
186  * We can reasonably assume no item can stay locked for more than
187  * three hours, so if we find one in the tail which is that old,
188  * free it anyway.
189  */
190  tries = 50;
191  for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
192  if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
193  engine->items.itemstats[id].tailrepairs++;
194  search->refcount = 0;
195  do_item_unlink(engine, search);
196  break;
197  }
198  }
199  it = slabs_alloc(engine, ntotal, id);
200  if (it == 0) {
201  return NULL;
202  }
203  }
204  }
205 
206  assert(it->slabs_clsid == 0);
207 
208  it->slabs_clsid = id;
209 
210  assert(it != engine->items.heads[it->slabs_clsid]);
211 
212  it->next = it->prev = it->h_next = 0;
213  it->refcount = 1; /* the caller will have a reference */
214  DEBUG_REFCNT(it, '*');
215  it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
216  it->nkey = nkey;
217  it->nbytes = nbytes;
218  it->flags = flags;
219  memcpy((void*)item_get_key(it), key, nkey);
220  it->exptime = exptime;
221  return it;
222 }
223 
224 static void item_free(struct default_engine *engine, hash_item *it) {
225  size_t ntotal = ITEM_ntotal(engine, it);
226  unsigned int clsid;
227  assert((it->iflag & ITEM_LINKED) == 0);
228  assert(it != engine->items.heads[it->slabs_clsid]);
229  assert(it != engine->items.tails[it->slabs_clsid]);
230  assert(it->refcount == 0);
231 
232  /* so slab size changer can tell later if item is already free or not */
233  clsid = it->slabs_clsid;
234  it->slabs_clsid = 0;
235  it->iflag |= ITEM_SLABBED;
236  DEBUG_REFCNT(it, 'F');
237  slabs_free(engine, it, ntotal, clsid);
238 }
239 
240 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
241  hash_item **head, **tail;
242  assert(it->slabs_clsid < POWER_LARGEST);
243  assert((it->iflag & ITEM_SLABBED) == 0);
244 
245  head = &engine->items.heads[it->slabs_clsid];
246  tail = &engine->items.tails[it->slabs_clsid];
247  assert(it != *head);
248  assert((*head && *tail) || (*head == 0 && *tail == 0));
249  it->prev = 0;
250  it->next = *head;
251  if (it->next) it->next->prev = it;
252  *head = it;
253  if (*tail == 0) *tail = it;
254  engine->items.sizes[it->slabs_clsid]++;
255  return;
256 }
257 
258 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
259  hash_item **head, **tail;
260  assert(it->slabs_clsid < POWER_LARGEST);
261  head = &engine->items.heads[it->slabs_clsid];
262  tail = &engine->items.tails[it->slabs_clsid];
263 
264  if (*head == it) {
265  assert(it->prev == 0);
266  *head = it->next;
267  }
268  if (*tail == it) {
269  assert(it->next == 0);
270  *tail = it->prev;
271  }
272  assert(it->next != it);
273  assert(it->prev != it);
274 
275  if (it->next) it->next->prev = it->prev;
276  if (it->prev) it->prev->next = it->next;
277  engine->items.sizes[it->slabs_clsid]--;
278  return;
279 }
280 
281 int do_item_link(struct default_engine *engine, hash_item *it) {
282  MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
283  assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
284  assert(it->nbytes < (1024 * 1024)); /* 1MB max size */
285  it->iflag |= ITEM_LINKED;
286  it->time = engine->server.core->get_current_time();
287  assoc_insert(engine, engine->server.core->hash(item_get_key(it),
288  it->nkey, 0),
289  it);
290 
291  pthread_mutex_lock(&engine->stats.lock);
292  engine->stats.curr_bytes += ITEM_ntotal(engine, it);
293  engine->stats.curr_items += 1;
294  engine->stats.total_items += 1;
295  pthread_mutex_unlock(&engine->stats.lock);
296 
297  /* Allocate a new CAS ID on link. */
298  item_set_cas(NULL, NULL, it, get_cas_id());
299 
300  item_link_q(engine, it);
301 
302  return 1;
303 }
304 
305 void do_item_unlink(struct default_engine *engine, hash_item *it) {
306  MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
307  if ((it->iflag & ITEM_LINKED) != 0) {
308  it->iflag &= ~ITEM_LINKED;
309  pthread_mutex_lock(&engine->stats.lock);
310  engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
311  engine->stats.curr_items -= 1;
312  pthread_mutex_unlock(&engine->stats.lock);
313  assoc_delete(engine, engine->server.core->hash(item_get_key(it),
314  it->nkey, 0),
315  item_get_key(it), it->nkey);
316  item_unlink_q(engine, it);
317  if (it->refcount == 0) {
318  item_free(engine, it);
319  }
320  }
321 }
322 
323 void do_item_release(struct default_engine *engine, hash_item *it) {
324  MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
325  if (it->refcount != 0) {
326  it->refcount--;
327  DEBUG_REFCNT(it, '-');
328  }
329  if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
330  item_free(engine, it);
331  }
332 }
333 
334 void do_item_update(struct default_engine *engine, hash_item *it) {
335  rel_time_t current_time = engine->server.core->get_current_time();
336  MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
337  if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
338  assert((it->iflag & ITEM_SLABBED) == 0);
339 
340  if ((it->iflag & ITEM_LINKED) != 0) {
341  item_unlink_q(engine, it);
342  it->time = current_time;
343  item_link_q(engine, it);
344  }
345  }
346 }
347 
348 int do_item_replace(struct default_engine *engine,
349  hash_item *it, hash_item *new_it) {
350  MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
351  item_get_key(new_it), new_it->nkey, new_it->nbytes);
352  assert((it->iflag & ITEM_SLABBED) == 0);
353 
354  do_item_unlink(engine, it);
355  return do_item_link(engine, new_it);
356 }
357 
358 /*@null@*/
359 static char *do_item_cachedump(const unsigned int slabs_clsid,
360  const unsigned int limit,
361  unsigned int *bytes) {
362 #ifdef FUTURE
363  unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
364  char *buffer;
365  unsigned int bufcurr;
366  hash_item *it;
367  unsigned int len;
368  unsigned int shown = 0;
369  char key_temp[KEY_MAX_LENGTH + 1];
370  char temp[512];
371 
372  it = engine->items.heads[slabs_clsid];
373 
374  buffer = malloc((size_t)memlimit);
375  if (buffer == 0) return NULL;
376  bufcurr = 0;
377 
378 
379  while (it != NULL && (limit == 0 || shown < limit)) {
380  assert(it->nkey <= KEY_MAX_LENGTH);
381  /* Copy the key since it may not be null-terminated in the struct */
382  strncpy(key_temp, item_get_key(it), it->nkey);
383  key_temp[it->nkey] = 0x00; /* terminate */
384  len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
385  key_temp, it->nbytes - 2,
386  (unsigned long)it->exptime + process_started);
387  if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
388  break;
389  memcpy(buffer + bufcurr, temp, len);
390  bufcurr += len;
391  shown++;
392  it = it->next;
393  }
394 
395 
396  memcpy(buffer + bufcurr, "END\r\n", 6);
397  bufcurr += 5;
398 
399  *bytes = bufcurr;
400  return buffer;
401 #endif
402  (void)slabs_clsid;
403  (void)limit;
404  (void)bytes;
405  return NULL;
406 }
407 
408 static void do_item_stats(struct default_engine *engine,
409  ADD_STAT add_stats, const void *c) {
410  int i;
411  for (i = 0; i < POWER_LARGEST; i++) {
412  if (engine->items.tails[i] != NULL) {
413  const char *prefix = "items";
414  add_statistics(c, add_stats, prefix, i, "number", "%u",
415  engine->items.sizes[i]);
416  add_statistics(c, add_stats, prefix, i, "age", "%u",
417  engine->items.tails[i]->time);
418  add_statistics(c, add_stats, prefix, i, "evicted",
419  "%u", engine->items.itemstats[i].evicted);
420  add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
421  "%u", engine->items.itemstats[i].evicted_nonzero);
422  add_statistics(c, add_stats, prefix, i, "evicted_time",
423  "%u", engine->items.itemstats[i].evicted_time);
424  add_statistics(c, add_stats, prefix, i, "outofmemory",
425  "%u", engine->items.itemstats[i].outofmemory);
426  add_statistics(c, add_stats, prefix, i, "tailrepairs",
427  "%u", engine->items.itemstats[i].tailrepairs);;
428  add_statistics(c, add_stats, prefix, i, "reclaimed",
429  "%u", engine->items.itemstats[i].reclaimed);;
430  }
431  }
432 }
433 
435 /*@null@*/
436 static void do_item_stats_sizes(struct default_engine *engine,
437  ADD_STAT add_stats, const void *c) {
438 
439  /* max 1MB object, divided into 32 bytes size buckets */
440  const int num_buckets = 32768;
441  unsigned int *histogram = calloc(num_buckets, sizeof(int));
442 
443  if (histogram != NULL) {
444  int i;
445 
446  /* build the histogram */
447  for (i = 0; i < POWER_LARGEST; i++) {
448  hash_item *iter = engine->items.heads[i];
449  while (iter) {
450  int ntotal = ITEM_ntotal(engine, iter);
451  int bucket = ntotal / 32;
452  if ((ntotal % 32) != 0) bucket++;
453  if (bucket < num_buckets) histogram[bucket]++;
454  iter = iter->next;
455  }
456  }
457 
458  /* write the buffer */
459  for (i = 0; i < num_buckets; i++) {
460  if (histogram[i] != 0) {
461  char key[8], val[32];
462  int klen, vlen;
463  klen = snprintf(key, sizeof(key), "%d", i * 32);
464  vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
465  assert(klen < sizeof(key));
466  assert(vlen < sizeof(val));
467  add_stats(key, klen, val, vlen, c);
468  }
469  }
470  free(histogram);
471  }
472 }
473 
475 hash_item *do_item_get(struct default_engine *engine,
476  const char *key, const size_t nkey) {
477  rel_time_t current_time = engine->server.core->get_current_time();
478  hash_item *it = assoc_find(engine, engine->server.core->hash(key,
479  nkey, 0),
480  key, nkey);
481  int was_found = 0;
482 
483  if (engine->config.verbose > 2) {
484  if (it == NULL) {
485  fprintf(stderr, "> NOT FOUND %s", key);
486  } else {
487  fprintf(stderr, "> FOUND KEY %s", (const char*)item_get_key(it));
488  was_found++;
489  }
490  }
491 
492  if (it != NULL && engine->config.oldest_live != 0 &&
493  engine->config.oldest_live <= current_time &&
494  it->time <= engine->config.oldest_live) {
495  do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
496  it = NULL;
497  }
498 
499  if (it == NULL && was_found) {
500  fprintf(stderr, " -nuked by flush");
501  was_found--;
502  }
503 
504  if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
505  do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
506  it = NULL;
507  }
508 
509  if (it == NULL && was_found) {
510  fprintf(stderr, " -nuked by expire");
511  was_found--;
512  }
513 
514  if (it != NULL) {
515  it->refcount++;
516  DEBUG_REFCNT(it, '+');
517  do_item_update(engine, it);
518  }
519 
520  if (engine->config.verbose > 2)
521  fprintf(stderr, "\n");
522 
523  return it;
524 }
525 
526 /*
527  * Stores an item in the cache according to the semantics of one of the set
528  * commands. In threaded mode, this is protected by the cache lock.
529  *
530  * Returns the state of storage.
531  */
532 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
533  hash_item *it, uint64_t *cas,
534  ENGINE_STORE_OPERATION operation,
535  const void *cookie) {
536  const char *key = item_get_key(it);
537  hash_item *old_it = do_item_get(engine, key, it->nkey);
538  ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
539 
540  hash_item *new_it = NULL;
541 
542  if (old_it != NULL && operation == OPERATION_ADD) {
543  /* add only adds a nonexistent item, but promote to head of LRU */
544  do_item_update(engine, old_it);
545  } else if (!old_it && (operation == OPERATION_REPLACE
546  || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
547  {
548  /* replace only replaces an existing value; don't store */
549  } else if (operation == OPERATION_CAS) {
550  /* validate cas operation */
551  if(old_it == NULL) {
552  // LRU expired
553  stored = ENGINE_KEY_ENOENT;
554  }
555  else if (item_get_cas(it) == item_get_cas(old_it)) {
556  // cas validates
557  // it and old_it may belong to different classes.
558  // I'm updating the stats for the one that's getting pushed out
559  do_item_replace(engine, old_it, it);
560  stored = ENGINE_SUCCESS;
561  } else {
562  if (engine->config.verbose > 1) {
563  fprintf(stderr,
564  "CAS: failure: expected %"PRIu64", got %"PRIu64"\n",
565  item_get_cas(old_it),
566  item_get_cas(it));
567  }
568  stored = ENGINE_KEY_EEXISTS;
569  }
570  } else {
571  /*
572  * Append - combine new and old record into single one. Here it's
573  * atomic and thread-safe.
574  */
575  if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
576  /*
577  * Validate CAS
578  */
579  if (item_get_cas(it) != 0) {
580  // CAS much be equal
581  if (item_get_cas(it) != item_get_cas(old_it)) {
582  stored = ENGINE_KEY_EEXISTS;
583  }
584  }
585 
586  if (stored == ENGINE_NOT_STORED) {
587  /* we have it and old_it here - alloc memory to hold both */
588  new_it = do_item_alloc(engine, key, it->nkey,
589  old_it->flags,
590  old_it->exptime,
591  it->nbytes + old_it->nbytes - 2 /* CRLF */,
592  cookie);
593 
594  if (new_it == NULL) {
595  /* SERVER_ERROR out of memory */
596  if (old_it != NULL) {
597  do_item_release(engine, old_it);
598  }
599 
600  return ENGINE_NOT_STORED;
601  }
602 
603  /* copy data from it and old_it to new_it */
604 
605  if (operation == OPERATION_APPEND) {
606  memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
607  memcpy(item_get_data(new_it) + old_it->nbytes - 2 /* CRLF */, item_get_data(it), it->nbytes);
608  } else {
609  /* OPERATION_PREPEND */
610  memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
611  memcpy(item_get_data(new_it) + it->nbytes - 2 /* CRLF */, item_get_data(old_it), old_it->nbytes);
612  }
613 
614  it = new_it;
615  }
616  }
617 
618  if (stored == ENGINE_NOT_STORED) {
619  if (old_it != NULL) {
620  do_item_replace(engine, old_it, it);
621  } else {
622  do_item_link(engine, it);
623  }
624 
625  *cas = item_get_cas(it);
626  stored = ENGINE_SUCCESS;
627  }
628  }
629 
630  if (old_it != NULL) {
631  do_item_release(engine, old_it); /* release our reference */
632  }
633 
634  if (new_it != NULL) {
635  do_item_release(engine, new_it);
636  }
637 
638  if (stored == ENGINE_SUCCESS) {
639  *cas = item_get_cas(it);
640  }
641 
642  return stored;
643 }
644 
645 
646 /*
647  * adds a delta value to a numeric item.
648  *
649  * c connection requesting the operation
650  * it item to adjust
651  * incr true to increment value, false to decrement
652  * delta amount to adjust value by
653  * buf buffer for response string
654  *
655  * returns a response string to send back to the client.
656  */
657 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
658  hash_item *it, const bool incr,
659  const int64_t delta, uint64_t *rcas,
660  uint64_t *result, const void *cookie) {
661  const char *ptr;
662  uint64_t value;
663  int res;
664 
665  ptr = item_get_data(it);
666 
667  if (!safe_strtoull(ptr, &value)) {
668  return ENGINE_EINVAL;
669  }
670 
671  if (incr) {
672  value += delta;
673  } else {
674  if(delta > value) {
675  value = 0;
676  } else {
677  value -= delta;
678  }
679  }
680 
681  *result = value;
682  char buf[80];
683  if ((res = snprintf(buf, sizeof(buf), "%" PRIu64 "\r\n", value)) == -1) {
684  return ENGINE_EINVAL;
685  }
686  hash_item *new_it = do_item_alloc(engine, item_get_key(it),
687  it->nkey, it->flags,
688  it->exptime, res,
689  cookie );
690  if (new_it == 0) {
691  do_item_unlink(engine, it);
692  return ENGINE_ENOMEM;
693  }
694  memcpy(item_get_data(new_it), buf, res);
695  do_item_replace(engine, it, new_it);
696  *rcas = item_get_cas(new_it);
697  do_item_release(engine, new_it); /* release our reference */
698 
699  return ENGINE_SUCCESS;
700 }
701 
702 /********************************* ITEM ACCESS *******************************/
703 
704 /*
705  * Allocates a new item.
706  */
707 hash_item *item_alloc(struct default_engine *engine,
708  const void *key, size_t nkey, int flags,
709  rel_time_t exptime, int nbytes, const void *cookie) {
710  hash_item *it;
711  pthread_mutex_lock(&engine->cache_lock);
712  it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
713  pthread_mutex_unlock(&engine->cache_lock);
714  return it;
715 }
716 
717 /*
718  * Returns an item if it hasn't been marked as expired,
719  * lazy-expiring as needed.
720  */
721 hash_item *item_get(struct default_engine *engine,
722  const void *key, const size_t nkey) {
723  hash_item *it;
724  pthread_mutex_lock(&engine->cache_lock);
725  it = do_item_get(engine, key, nkey);
726  pthread_mutex_unlock(&engine->cache_lock);
727  return it;
728 }
729 
730 /*
731  * Decrements the reference count on an item and adds it to the freelist if
732  * needed.
733  */
734 void item_release(struct default_engine *engine, hash_item *item) {
735  pthread_mutex_lock(&engine->cache_lock);
736  do_item_release(engine, item);
737  pthread_mutex_unlock(&engine->cache_lock);
738 }
739 
740 /*
741  * Unlinks an item from the LRU and hashtable.
742  */
743 void item_unlink(struct default_engine *engine, hash_item *item) {
744  pthread_mutex_lock(&engine->cache_lock);
745  do_item_unlink(engine, item);
746  pthread_mutex_unlock(&engine->cache_lock);
747 }
748 
749 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
750  const void* cookie,
751  const void* key,
752  const int nkey,
753  const bool increment,
754  const bool create,
755  const uint64_t delta,
756  const uint64_t initial,
757  const rel_time_t exptime,
758  uint64_t *cas,
759  uint64_t *result)
760 {
761  hash_item *item = do_item_get(engine, key, nkey);
762  ENGINE_ERROR_CODE ret;
763 
764  if (item == NULL) {
765  if (!create) {
766  return ENGINE_KEY_ENOENT;
767  } else {
768  char buffer[128];
769  int len = snprintf(buffer, sizeof(buffer), "%"PRIu64"\r\n",
770  (uint64_t)initial);
771 
772  item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
773  if (item == NULL) {
774  return ENGINE_ENOMEM;
775  }
776  memcpy((void*)item_get_data(item), buffer, len);
777  if ((ret = do_store_item(engine, item, cas,
778  OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
779  *result = initial;
780  *cas = item_get_cas(item);
781  }
782  do_item_release(engine, item);
783  }
784  } else {
785  ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
786  do_item_release(engine, item);
787  }
788 
789  return ret;
790 }
791 
792 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
793  const void* cookie,
794  const void* key,
795  const int nkey,
796  const bool increment,
797  const bool create,
798  const uint64_t delta,
799  const uint64_t initial,
800  const rel_time_t exptime,
801  uint64_t *cas,
802  uint64_t *result)
803 {
804  ENGINE_ERROR_CODE ret;
805 
806  pthread_mutex_lock(&engine->cache_lock);
807  ret = do_arithmetic(engine, cookie, key, nkey, increment,
808  create, delta, initial, exptime, cas,
809  result);
810  pthread_mutex_unlock(&engine->cache_lock);
811  return ret;
812 }
813 
814 /*
815  * Stores an item in the cache (high level, obeys set/add/replace semantics)
816  */
817 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
818  hash_item *item, uint64_t *cas,
819  ENGINE_STORE_OPERATION operation,
820  const void *cookie) {
821  ENGINE_ERROR_CODE ret;
822 
823  pthread_mutex_lock(&engine->cache_lock);
824  ret = do_store_item(engine, item, cas, operation, cookie);
825  pthread_mutex_unlock(&engine->cache_lock);
826  return ret;
827 }
828 
829 /*
830  * Flushes expired items after a flush_all call
831  */
832 void item_flush_expired(struct default_engine *engine, time_t when) {
833  int i;
834  hash_item *iter, *next;
835 
836  pthread_mutex_lock(&engine->cache_lock);
837 
838  if (when == 0) {
839  engine->config.oldest_live = engine->server.core->get_current_time() - 1;
840  } else {
841  engine->config.oldest_live = engine->server.core->realtime(when) - 1;
842  }
843 
844  if (engine->config.oldest_live != 0) {
845  for (i = 0; i < POWER_LARGEST; i++) {
846  /*
847  * The LRU is sorted in decreasing time order, and an item's
848  * timestamp is never newer than its last access time, so we
849  * only need to walk back until we hit an item older than the
850  * oldest_live time.
851  * The oldest_live checking will auto-expire the remaining items.
852  */
853  for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
854  if (iter->time >= engine->config.oldest_live) {
855  next = iter->next;
856  if ((iter->iflag & ITEM_SLABBED) == 0) {
857  do_item_unlink(engine, iter);
858  }
859  } else {
860  /* We've hit the first old item. Continue to the next queue. */
861  break;
862  }
863  }
864  }
865  }
866  pthread_mutex_unlock(&engine->cache_lock);
867 }
868 
869 /*
870  * Dumps part of the cache
871  */
872 char *item_cachedump(struct default_engine *engine,
873  unsigned int slabs_clsid,
874  unsigned int limit,
875  unsigned int *bytes) {
876  char *ret;
877 
878  pthread_mutex_lock(&engine->cache_lock);
879  ret = do_item_cachedump(slabs_clsid, limit, bytes);
880  pthread_mutex_unlock(&engine->cache_lock);
881  return ret;
882 }
883 
884 void item_stats(struct default_engine *engine,
885  ADD_STAT add_stat, const void *cookie)
886 {
887  pthread_mutex_lock(&engine->cache_lock);
888  do_item_stats(engine, add_stat, cookie);
889  pthread_mutex_unlock(&engine->cache_lock);
890 }
891 
892 
893 void item_stats_sizes(struct default_engine *engine,
894  ADD_STAT add_stat, const void *cookie)
895 {
896  pthread_mutex_lock(&engine->cache_lock);
897  do_item_stats_sizes(engine, add_stat, cookie);
898  pthread_mutex_unlock(&engine->cache_lock);
899 }
900 
901 static void do_item_link_cursor(struct default_engine *engine,
902  hash_item *cursor, int ii)
903 {
904  cursor->slabs_clsid = (uint8_t)ii;
905  cursor->next = NULL;
906  cursor->prev = engine->items.tails[ii];
907  engine->items.tails[ii]->next = cursor;
908  engine->items.tails[ii] = cursor;
909  engine->items.sizes[ii]++;
910 }
911 
912 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
913  hash_item *item, void *cookie);
914 
915 static bool do_item_walk_cursor(struct default_engine *engine,
916  hash_item *cursor,
917  int steplength,
918  ITERFUNC itemfunc,
919  void* itemdata,
920  ENGINE_ERROR_CODE *error)
921 {
922  int ii = 0;
923  *error = ENGINE_SUCCESS;
924 
925  while (cursor->prev != NULL && ii < steplength) {
926  ++ii;
927  /* Move cursor */
928  hash_item *ptr = cursor->prev;
929  item_unlink_q(engine, cursor);
930 
931  bool done = false;
932 
933  if (ptr == engine->items.heads[cursor->slabs_clsid]) {
934  done = true;
935  } else {
936  cursor->next = ptr;
937  cursor->prev = ptr->prev;
938  cursor->prev->next = cursor;
939  ptr->prev = cursor;
940  }
941 
942  /* Ignore cursors */
943  if (!(ptr->nkey == 0 && ptr->nbytes == 0)) {
944  *error = itemfunc(engine, ptr, itemdata);
945  if (*error != ENGINE_SUCCESS) {
946  return false;
947  }
948  }
949 
950  if (done) {
951  return false;
952  }
953  }
954 
955  return true;
956 }
957 
958 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
959  hash_item *item,
960  void *cookie) {
961  (void)cookie;
962  engine->scrubber.visited++;
963  rel_time_t current_time = engine->server.core->get_current_time();
964  if (item->refcount == 0 &&
965  (item->exptime != 0 && item->exptime < current_time)) {
966  do_item_unlink(engine, item);
967  engine->scrubber.cleaned++;
968  }
969  return ENGINE_SUCCESS;
970 }
971 
972 static void item_scrub_class(struct default_engine *engine,
973  hash_item *cursor) {
974 
975  ENGINE_ERROR_CODE ret;
976  bool more;
977  do {
978  pthread_mutex_lock(&engine->cache_lock);
979  more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
980  pthread_mutex_unlock(&engine->cache_lock);
981  if (ret != ENGINE_SUCCESS) {
982  break;
983  }
984  } while (more);
985 }
986 
987 static void *item_scubber_main(void *arg)
988 {
989  struct default_engine *engine = arg;
990  hash_item cursor = { .refcount = 1 };
991 
992  for (int ii = 0; ii < POWER_LARGEST; ++ii) {
993  pthread_mutex_lock(&engine->cache_lock);
994  bool skip = false;
995  if (engine->items.heads[ii] == NULL) {
996  skip = true;
997  } else {
998  // add the item at the tail
999  do_item_link_cursor(engine, &cursor, ii);
1000  }
1001  pthread_mutex_unlock(&engine->cache_lock);
1002 
1003  if (!skip) {
1004  item_scrub_class(engine, &cursor);
1005  }
1006  }
1007 
1008  pthread_mutex_lock(&engine->scrubber.lock);
1009  engine->scrubber.stopped = time(NULL);
1010  engine->scrubber.running = false;
1011  pthread_mutex_unlock(&engine->scrubber.lock);
1012 
1013  return NULL;
1014 }
1015 
1016 bool item_start_scrub(struct default_engine *engine)
1017 {
1018  bool ret = false;
1019  pthread_mutex_lock(&engine->scrubber.lock);
1020  if (!engine->scrubber.running) {
1021  engine->scrubber.started = time(NULL);
1022  engine->scrubber.stopped = 0;
1023  engine->scrubber.visited = 0;
1024  engine->scrubber.cleaned = 0;
1025  engine->scrubber.running = true;
1026 
1027  pthread_t t;
1028  pthread_attr_t attr;
1029 
1030  if (pthread_attr_init(&attr) != 0 ||
1031  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1032  pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1033  {
1034  engine->scrubber.running = false;
1035  } else {
1036  ret = true;
1037  }
1038  }
1039  pthread_mutex_unlock(&engine->scrubber.lock);
1040 
1041  return ret;
1042 }