summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2011-07-11 18:12:59 -0700
committerdormando <dormando@rydia.net>2011-07-11 18:12:59 -0700
commitea2d42a50c07b862a465ba7926e9715a945304ef (patch)
tree7ceac67bb8cf3e9d138f76768f57d92835ba9af7
parentcbcd3872cbfa0dfc086b82ed08c370dc62fb9235 (diff)
downloadmemcached-ea2d42a50c07b862a465ba7926e9715a945304ef.tar.gz
fix incr/decr race conditions for binary prot
there were two race conditions in the incr/decr binary protocol handler. One was the original "fetches item outside of add_delta", and the second was in the initializer. I went for the quick fix by changing the semantics of the store request to be an ADD instead of a SET, so if someone beat them in that very narrow race the request simply bounces. Not perfect but this is an improvement and good enough for now.
-rw-r--r--memcached.c127
-rw-r--r--memcached.h8
-rw-r--r--thread.c5
3 files changed, 73 insertions, 67 deletions
diff --git a/memcached.c b/memcached.c
index d5c5904..0d484dc 100644
--- a/memcached.c
+++ b/memcached.c
@@ -998,6 +998,9 @@ static void complete_incr_bin(conn *c) {
item *it;
char *key;
size_t nkey;
+ /* Weird magic in add_delta forces me to pad here */
+ char tmpbuf[INCR_MAX_STORAGE_LEN];
+ uint64_t cas = 0;
protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
protocol_binary_request_incr* req = binary_get_request(c);
@@ -1025,72 +1028,62 @@ static void complete_incr_bin(conn *c) {
req->message.body.expiration);
}
- it = item_get(key, nkey);
- if (it && (c->binary_header.request.cas == 0 ||
- c->binary_header.request.cas == ITEM_get_cas(it))) {
- /* Weird magic in add_delta forces me to pad here */
- char tmpbuf[INCR_MAX_STORAGE_LEN];
- protocol_binary_response_status st = PROTOCOL_BINARY_RESPONSE_SUCCESS;
-
- switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
- req->message.body.delta, tmpbuf)) {
- case OK:
- break;
- case NON_NUMERIC:
- st = PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL;
- break;
- case EOM:
- st = PROTOCOL_BINARY_RESPONSE_ENOMEM;
- break;
- case DELTA_ITEM_NOT_FOUND:
- break;
- }
-
- if (st != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
- write_bin_error(c, st, 0);
- } else {
- rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10));
- c->cas = ITEM_get_cas(it);
- write_bin_response(c, &rsp->message.body, 0, 0,
- sizeof(rsp->message.body.value));
+ if (c->binary_header.request.cas != 0) {
+ cas = c->binary_header.request.cas;
+ }
+ switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
+ req->message.body.delta, tmpbuf,
+ &cas)) {
+ case OK:
+ rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10));
+ if (cas) {
+ c->cas = cas;
}
-
- item_remove(it); /* release our reference */
- } else if (!it && req->message.body.expiration != 0xffffffff) {
- /* Save some room for the response */
- rsp->message.body.value = htonll(req->message.body.initial);
- it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
- INCR_MAX_STORAGE_LEN);
-
- if (it != NULL) {
- snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
- (unsigned long long)req->message.body.initial);
-
- if (store_item(it, NREAD_SET, c)) {
- c->cas = ITEM_get_cas(it);
- write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
+ write_bin_response(c, &rsp->message.body, 0, 0,
+ sizeof(rsp->message.body.value));
+ break;
+ case NON_NUMERIC:
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0);
+ break;
+ case EOM:
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
+ break;
+ case DELTA_ITEM_NOT_FOUND:
+ if (req->message.body.expiration != 0xffffffff) {
+ /* Save some room for the response */
+ rsp->message.body.value = htonll(req->message.body.initial);
+ it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
+ INCR_MAX_STORAGE_LEN);
+
+ if (it != NULL) {
+ snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
+ (unsigned long long)req->message.body.initial);
+
+ if (store_item(it, NREAD_ADD, c)) {
+ c->cas = ITEM_get_cas(it);
+ write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
+ } else {
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
+ }
+ item_remove(it); /* release our reference */
} else {
- write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
}
- item_remove(it); /* release our reference */
} else {
- write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
- }
- } else if (it) {
- /* incorrect CAS */
- item_remove(it); /* release our reference */
- write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
- } else {
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
+ c->thread->stats.incr_misses++;
+ } else {
+ c->thread->stats.decr_misses++;
+ }
+ pthread_mutex_unlock(&c->thread->stats.mutex);
- pthread_mutex_lock(&c->thread->stats.mutex);
- if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
- c->thread->stats.incr_misses++;
- } else {
- c->thread->stats.decr_misses++;
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
}
- pthread_mutex_unlock(&c->thread->stats.mutex);
-
- write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
+ break;
+ case DELTA_ITEM_CAS_MISMATCH:
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
+ break;
}
}
@@ -2791,7 +2784,7 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
return;
}
- switch(add_delta(c, key, nkey, incr, delta, temp)) {
+ switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) {
case OK:
out_string(c, temp);
break;
@@ -2812,6 +2805,8 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
out_string(c, "NOT_FOUND");
break;
+ case DELTA_ITEM_CAS_MISMATCH:
+ break; /* Should never get here */
}
}
@@ -2828,7 +2823,7 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
*/
enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
const bool incr, const int64_t delta,
- char *buf) {
+ char *buf, uint64_t *cas) {
char *ptr;
uint64_t value;
int res;
@@ -2839,6 +2834,11 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
return DELTA_ITEM_NOT_FOUND;
}
+ if (cas != NULL && *cas != 0 && ITEM_get_cas(it) != *cas) {
+ do_item_remove(it);
+ return DELTA_ITEM_CAS_MISMATCH;
+ }
+
ptr = ITEM_data(it);
if (!safe_strtoull(ptr, &value)) {
@@ -2888,6 +2888,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
}
+ if (cas) {
+ *cas = ITEM_get_cas(it); /* swap the incoming CAS value */
+ }
do_item_remove(it); /* release our reference */
return OK;
}
diff --git a/memcached.h b/memcached.h
index 27aef52..1146ecc 100644
--- a/memcached.h
+++ b/memcached.h
@@ -191,7 +191,7 @@ enum store_item_type {
};
enum delta_result_type {
- OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND
+ OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND, DELTA_ITEM_CAS_MISMATCH
};
/** Time relative to server start. Smaller than time_t on 64-bit systems. */
@@ -437,7 +437,8 @@ extern volatile rel_time_t current_time;
void do_accept_new_conns(const bool do_accept);
enum delta_result_type do_add_delta(conn *c, const char *key,
const size_t nkey, const bool incr,
- const int64_t delta, char *buf);
+ const int64_t delta, char *buf,
+ uint64_t *cas);
enum store_item_type do_store_item(item *item, int comm, conn* c);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base);
extern int daemonize(int nochdir, int noclose);
@@ -465,7 +466,8 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, in
/* Lock wrappers for cache functions that are called from main loop. */
enum delta_result_type add_delta(conn *c, const char *key,
const size_t nkey, const int incr,
- const int64_t delta, char *buf);
+ const int64_t delta, char *buf,
+ uint64_t *cas);
void accept_new_conns(const bool do_accept);
conn *conn_from_freelist(void);
bool conn_add_to_freelist(conn *c);
diff --git a/thread.c b/thread.c
index d3ae92a..2166821 100644
--- a/thread.c
+++ b/thread.c
@@ -400,11 +400,12 @@ void item_update(item *item) {
*/
enum delta_result_type add_delta(conn *c, const char *key,
const size_t nkey, int incr,
- const int64_t delta, char *buf) {
+ const int64_t delta, char *buf,
+ uint64_t *cas) {
enum delta_result_type ret;
pthread_mutex_lock(&cache_lock);
- ret = do_add_delta(c, key, nkey, incr, delta, buf);
+ ret = do_add_delta(c, key, nkey, incr, delta, buf, cas);
pthread_mutex_unlock(&cache_lock);
return ret;
}