Redis is often referred to as a data structures server. What this means is that Redis provides access to mutable data structures via a set of commands, which are sent using a server-client model with TCP sockets and a simple protocol. So different processes can query and modify the same data structures in a shared way.
Redis server 和一个客户端建立连接后,会在事件驱动框架中注册可读事件——客户端的命令请求。命令处理对应 4 个阶段:
命令读取:对应 readQueryFromClient 函数
命令解析:对应 processInputBuffer 函数
命令执行:对应 processCommand 函数
结果返回:对应 addReply 函数
在redis源码的 server.c中定义了一些 dictType
(位于dict.h)
我的理解是,定义一个所有的命令公共的dictType结构体。
typedefstructdictType { uint64_t (*hashFunction)(constvoid *key); void *(*keyDup)(dict *d, constvoid *key); void *(*valDup)(dict *d, constvoid *obj); int (*keyCompare)(dict *d, constvoid *key1, constvoid *key2); void (*keyDestructor)(dict *d, void *key); void (*valDestructor)(dict *d, void *obj); int (*expandAllowed)(size_t moreMem, double usedRatio); /* Invoked at the start of dict initialization/rehashing (old and new ht are already created) */ void (*rehashingStarted)(dict *d); /* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists * and are cleaned up after this callback. */ void (*rehashingCompleted)(dict *d); /* Flags */ /* The 'no_value' flag, if set, indicates that values are not used, i.e. the * dict is a set. When this flag is set, it's not possible to access the * value of a dictEntry and it's also impossible to use dictSetKey(). Entry * metadata can also not be used. */ unsignedint no_value:1; /* If no_value = 1 and all keys are odd (LSB=1), setting keys_are_odd = 1 * enables one more optimization: to store a key without an allocated * dictEntry. */ unsignedint keys_are_odd:1; /* TODO: Add a 'keys_are_even' flag and use a similar optimization if that * flag is set. */ } dictType;
/* passing NULL as conn it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (conn) { connEnableTcpNoDelay(conn); if (server.tcpkeepalive) connKeepAlive(conn,server.tcpkeepalive); connSetReadHandler(conn, readQueryFromClient); connSetPrivateData(conn, c); } ... }
/* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return;
/* Update total number of reads on server */ atomicIncr(server.stat_total_reads_processed, 1);
readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); big_arg = 1;
/* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0) readlen = remaining;
/* Master client needs expand the readlen when meet BIG_ARG(see #9100), * but doesn't need align to the next arg, we can read more data. */ if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; }
qblen = sdslen(c->querybuf); if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg * into the query buffer, so we don't need to pre-allocate more than we * need, so using the non-greedy growing. For an initial allocation of * the query buffer, we also don't wanna use the greedy growth, in order * to avoid collision with the RESIZE_THRESHOLD mechanism. */ c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen); /* We later set the peak to the used portion of the buffer, but here we over * allocated because we know what we need, make sure it'll not be shrunk before used. */ if (c->querybuf_peak < qblen + readlen) c->querybuf_peak = qblen + readlen; } else { c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
/* Read as much as possible from the socket to save read(2) system calls. */ readlen = sdsavail(c->querybuf); } nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); goto done; } } elseif (nread == 0) { if (server.verbosity <= LL_VERBOSE) { sds info = catClientInfoString(sdsempty(), c); serverLog(LL_VERBOSE, "Client closed connection %s", info); sdsfree(info); } freeClientAsync(c); goto done; }
/* There is more data in the client input buffer, continue parsing it * and check if there is a full command to execute. */ if (processInputBuffer(c) == C_ERR) c = NULL;
/* Get the associated private data pointer */ staticinlinevoid *connGetPrivateData(connection *conn){ return conn->private_data; }
命令解析processInputBuffer
intprocessInputBuffer(client *c){ /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break;
/* Don't process more buffers from clients that have already pending * commands to execute in c->argv. */ if (c->flags & CLIENT_PENDING_COMMAND) break;
/* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. */ if (isInsideYieldingLongCommand() && c->flags & CLIENT_MASTER) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). * * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown. */ if (!c->reqtype) { if (c->querybuf[c->qb_pos] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; } else { c->reqtype = PROTO_REQ_INLINE; } }
if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } elseif (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); }
/* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ if (io_threads_op != IO_THREADS_OP_IDLE) { serverAssert(io_threads_op == IO_THREADS_OP_READ); c->flags |= CLIENT_PENDING_COMMAND; break; }
/* We are finally ready to execute the command. */ if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this * loop and trimming the client buffer later. So we return * ASAP in that case. */ return C_ERR; } } }
if (c->flags & CLIENT_MASTER) { /* If the client is a master, trim the querybuf to repl_applied, * since master client is very special, its querybuf not only * used to parse command, but also proxy to sub-replicas. * * Here are some scenarios we cannot trim to qb_pos: * 1. we don't receive complete command from master * 2. master client blocked cause of client pause * 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND * * In these scenarios, qb_pos points to the part of the current command * or the beginning of next command, and the current command is not applied yet, * so the repl_applied is not equal to qb_pos. */ if (c->repl_applied) { sdsrange(c->querybuf,c->repl_applied,-1); c->qb_pos -= c->repl_applied; c->repl_applied = 0; } } elseif (c->qb_pos) { /* Trim to pos */ sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; }
/* Update client memory usage after processing the query buffer, this is * important in case the query buffer is big and wasn't drained during * the above loop (because of partially sent big commands). */ if (io_threads_op == IO_THREADS_OP_IDLE) updateClientMemUsageAndBucket(c);
intprocessCommandAndResetClient(client *c){ int deadclient = 0; client *old_client = server.current_client; server.current_client = c; if (processCommand(c) == C_OK) { commandProcessed(c); /* Update the client's memory to include output buffer growth following the * processed command. */ updateClientMemUsageAndBucket(c); }
if (server.current_client == NULL) deadclient = 1; /* * Restore the old client, this is needed because when a script * times out, we will get into this code from processEventsWhileBlocked. * Which will cause to set the server.current_client. If not restored * we will return 1 to our caller which will falsely indicate the client * is dead and will stop reading from its buffer. */ server.current_client = old_client; /* performEvictions may flush slave output buffers. This may * result in a slave, that may be the active client, to be * freed. */ return deadclient ? C_ERR : C_OK; }
intprocessCommand(client *c){ if (!scriptIsTimedout()) { /* Both EXEC and scripts call call() directly so there should be * no way in_exec or scriptIsRunning() is 1. * That is unless lua_timedout, in which case client may run * some commands. */ serverAssert(!server.in_exec); serverAssert(!scriptIsRunning()); }
/* in case we are starting to ProcessCommand and we already have a command we assume * this is a reprocessing of this command, so we do not want to perform some of the actions again. */ int client_reprocessing_command = c->cmd ? 1 : 0;
/* only run command filter if not reprocessing command */ if (!client_reprocessing_command) { moduleCallCommandFilters(c); reqresAppendRequest(c); }
/* Handle possible security attacks. */ if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) { securityWarningCommand(c); return C_ERR; }
/* If we're inside a module blocked context yielding that wants to avoid * processing clients, postpone the command. */ if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE && !(server.busy_module_yield_flags & BUSY_MODULE_YIELD_CLIENTS)) { blockPostponeClient(c); return C_OK; }
/* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. * In case we are reprocessing a command after it was blocked, * we do not have to repeat the same checks */ if (!client_reprocessing_command) { c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); sds err; if (!commandCheckExistence(c, &err)) { rejectCommandSds(c, err); return C_OK; } if (!commandCheckArity(c, &err)) { rejectCommandSds(c, err); return C_OK; }
/* Check if the command is marked as protected and the relevant configuration allows it */ if (c->cmd->flags & CMD_PROTECTED) { if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) || (c->cmd->proc == moduleCommand && !allowProtectedAction(server.enable_module_cmd, c))) { rejectCommandFormat(c,"%s command not allowed. If the %s option is set to \"local\", " "you can run it from a local connection, otherwise you need to set this option " "in the configuration file, and then restart the server.", c->cmd->proc == debugCommand ? "DEBUG" : "MODULE", c->cmd->proc == debugCommand ? "enable-debug-command" : "enable-module-command"); return C_OK;
if (authRequired(c)) { /* AUTH and HELLO and no auth commands are valid even in * non-authenticated state. */ if (!(c->cmd->flags & CMD_NO_AUTH)) { rejectCommand(c,shared.noautherr); return C_OK; } }
if (c->flags & CLIENT_MULTI && c->cmd->flags & CMD_NO_MULTI) { rejectCommandFormat(c,"Command not allowed inside a transaction"); return C_OK; }
/* Check if the user can run this command according to the current * ACLs. */ int acl_errpos; int acl_retval = ACLCheckAllPerm(c,&acl_errpos); if (acl_retval != ACL_OK) { addACLLogEntry(c,acl_retval,(c->flags & CLIENT_MULTI) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL,acl_errpos,NULL,NULL); sds msg = getAclErrorMessage(acl_retval, c->user, c->cmd, c->argv[acl_errpos]->ptr, 0); rejectCommandFormat(c, "-NOPERM %s", msg); sdsfree(msg); return C_OK; }
/* If cluster is enabled perform the cluster redirection here. * However we don't perform the redirection if: * 1) The sender of this command is our master. * 2) The command has no key arguments. */ if (server.cluster_enabled && !mustObeyClient(c) && !(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 && c->cmd->proc != execCommand)) { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, &c->slot,&error_code); if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } clusterRedirectClient(c,n,c->slot,error_code); c->cmd->rejected_calls++; return C_OK; } }
/* Disconnect some clients if total clients memory is too high. We do this * before key eviction, after the last command was executed and consumed * some client output buffer memory. */ evictClients(); if (server.current_client == NULL) { /* If we evicted ourself then abort processing the command */ return C_ERR; }
/* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering * the event loop since there is a busy Lua script running in timeout * condition, to avoid mixing the propagation of scripts with the * propagation of DELs due to eviction. */ if (server.maxmemory && !isInsideYieldingLongCommand()) { int out_of_memory = (performEvictions() == EVICT_FAIL);
/* performEvictions may evict keys, so we need flush pending tracking * invalidation keys. If we don't do this, we may get an invalidation * message after we perform operation on the key, where in fact this * message belongs to the old value of the key before it gets evicted.*/ trackingHandlePendingKeyInvalidations();
/* performEvictions may flush slave output buffers. This may result * in a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR;
int reject_cmd_on_oom = is_denyoom_command; /* If client is in MULTI/EXEC context, queuing may consume an unlimited * amount of memory, so we want to stop that. * However, we never want to reject DISCARD, or even EXEC (unless it * contains denied commands, in which case is_denyoom_command is already * set. */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != quitCommand && c->cmd->proc != resetCommand) { reject_cmd_on_oom = 1; }
if (out_of_memory && reject_cmd_on_oom) { rejectCommand(c, shared.oomerr); return C_OK; }
/* Save out_of_memory result at command start, otherwise if we check OOM * in the first write within script, memory used by lua stack and * arguments might interfere. We need to save it for EXEC and module * calls too, since these can call EVAL, but avoid saving it during an * interrupted / yielding busy script / module. */ server.pre_command_oom_state = out_of_memory; }
/* Make sure to use a reasonable amount of memory for client side * caching metadata. */ if (server.tracking_clients) trackingLimitUsedSlots();
/* Don't accept write commands if there are problems persisting on disk * unless coming from our master, in which case check the replica ignore * disk write error config to either log or crash. */ int deny_write_type = writeCommandsDeniedByDiskError(); if (deny_write_type != DISK_ERROR_TYPE_NONE && (is_write_command || c->cmd->proc == pingCommand)) { if (obey_client) { if (!server.repl_ignore_disk_write_error && c->cmd->proc != pingCommand) { serverPanic("Replica was unable to write command to disk."); } else { staticmstime_t last_log_time_ms = 0; constmstime_t log_interval_ms = 10000; if (server.mstime > last_log_time_ms + log_interval_ms) { last_log_time_ms = server.mstime; serverLog(LL_WARNING, "Replica is applying a command even though " "it is unable to write to disk."); } } } else { sds err = writeCommandsGetDiskErrorMessage(deny_write_type); /* remove the newline since rejectCommandSds adds it. */ sdssubstr(err, 0, sdslen(err)-2); rejectCommandSds(c, err); return C_OK; } }
/* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ if (is_write_command && !checkGoodReplicasStatus()) { rejectCommand(c, shared.noreplicaserr); return C_OK; }
/* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ if (server.masterhost && server.repl_slave_ro && !obey_client && is_write_command) { rejectCommand(c, shared.roslaveerr); return C_OK; }
/* Only allow a subset of commands in the context of Pub/Sub if the * connection is in RESP2 mode. With RESP3 there are no limits. */ if ((c->flags & CLIENT_PUBSUB && c->resp == 2) && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != ssubscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != sunsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand && c->cmd->proc != quitCommand && c->cmd->proc != resetCommand) { rejectCommandFormat(c, "Can't execute '%s': only (P|S)SUBSCRIBE / " "(P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context", c->cmd->fullname); return C_OK; }
/* Only allow commands with flag "t", such as INFO, REPLICAOF and so on, * when replica-serve-stale-data is no and we are a replica with a broken * link with master. */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && is_denystale_command) { rejectCommand(c, shared.masterdownerr); return C_OK; }
/* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ if (server.loading && !server.async_loading && is_denyloading_command) { rejectCommand(c, shared.loadingerr); return C_OK; }
/* During async-loading, block certain commands. */ if (server.async_loading && is_deny_async_loading_command) { rejectCommand(c,shared.loadingerr); return C_OK; }
/* when a busy job is being done (script / module) * Only allow a limited number of commands. * Note that we need to allow the transactions commands, otherwise clients * sending a transaction with pipelining without error checking, may have * the MULTI plus a few initial commands refused, then the timeout * condition resolves, and the bottom-half of the transaction gets * executed, see Github PR #7022. */ if (isInsideYieldingLongCommand() && !(c->cmd->flags & CMD_ALLOW_BUSY)) { if (server.busy_module_yield_flags && server.busy_module_yield_reply) { rejectCommandFormat(c, "-BUSY %s", server.busy_module_yield_reply); } elseif (server.busy_module_yield_flags) { rejectCommand(c, shared.slowmoduleerr); } elseif (scriptIsEval()) { rejectCommand(c, shared.slowevalerr); } else { rejectCommand(c, shared.slowscripterr); } return C_OK; }
/* Prevent a replica from sending commands that access the keyspace. * The main objective here is to prevent abuse of client pause check * from which replicas are exempt. */ if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) { rejectCommandFormat(c, "Replica can't interact with the keyspace"); return C_OK; }
/* If the server is paused, block the client until * the pause has ended. Replicas are never paused. */ if (!(c->flags & CLIENT_SLAVE) && ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) || ((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) { blockPostponeClient(c); return C_OK; }
/* ----------------------------------------------------------------------------- * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -------------------------------------------------------------------------- */
/* Add the object 'obj' string representation to the client output buffer. */ voidaddReply(client *c, robj *obj){ if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) { _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr)); } elseif (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); _addReplyToBufferOrList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } }