Redis源码阅读0

主要记录我对于redis7.2.3源码的阅读

What is Redis?

Redis is often referred to as a data structures server. What this means is that Redis provides access to mutable data structures via a set of commands, which are sent using a server-client model with TCP sockets and a simple protocol. So different processes can query and modify the same data structures in a shared way.

Redis通常被称为数据结构服务器。这意味着Redis通过一组命令提供对可变数据结构的访问,这些命令使用带有TCP套接字和简单协议的服务器-客户端模型发送。因此,不同的进程可以以共享的方式查询和修改相同的数据结构。

阅读方法采用提出问题 –> 再去探索的思路。

问题一:Redis解析执行命令行命令的流程?

Redis server 和一个客户端建立连接后,会在事件驱动框架中注册可读事件——客户端的命令请求。命令处理对应 4 个阶段:

  • 命令读取:对应 readQueryFromClient 函数
  • 命令解析:对应 processInputBuffer 函数
  • 命令执行:对应 processCommand 函数
  • 结果返回:对应 addReply 函数

在redis源码的 server.c中定义了一些 dictType

(位于dict.h)

我的理解是,定义一个所有的命令公共的dictType结构体。

typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(dict *d, const void *key);
void *(*valDup)(dict *d, const void *obj);
int (*keyCompare)(dict *d, const void *key1, const void *key2);
void (*keyDestructor)(dict *d, void *key);
void (*valDestructor)(dict *d, void *obj);
int (*expandAllowed)(size_t moreMem, double usedRatio);
/* Invoked at the start of dict initialization/rehashing (old and new ht are already created) */
void (*rehashingStarted)(dict *d);
/* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists
* and are cleaned up after this callback. */
void (*rehashingCompleted)(dict *d);
/* Flags */
/* The 'no_value' flag, if set, indicates that values are not used, i.e. the
* dict is a set. When this flag is set, it's not possible to access the
* value of a dictEntry and it's also impossible to use dictSetKey(). Entry
* metadata can also not be used. */
unsigned int no_value:1;
/* If no_value = 1 and all keys are odd (LSB=1), setting keys_are_odd = 1
* enables one more optimization: to store a key without an allocated
* dictEntry. */
unsigned int keys_are_odd:1;
/* TODO: Add a 'keys_are_even' flag and use a similar optimization if that
* flag is set. */
} dictType;

dictType是Redis中的一个结构体类型,用于定义字典(dict)的哈希表实现。它包含了一些回调函数指针,这些函数在字典初始化、重新哈希等操作时被调用。以下是各个字段的作用:

  1. hashFunction:哈希函数,用于计算键的哈希值。
  2. keyDup:键复制函数,用于复制字典中的键。
  3. valDup:值复制函数,用于复制字典中的值。
  4. keyCompare:键比较函数,用于比较两个键是否相等。
  5. keyDestructor:键析构函数,用于释放键所占用的内存。
  6. valDestructor:值析构函数,用于释放值所占用的内存。
  7. expandAllowed:扩展允许函数,用于判断是否可以扩展字典的大小。
  8. rehashingStarted:重新哈希开始函数,在字典初始化或重新哈希开始时调用。
  9. rehashingCompleted:重新哈希完成函数,在字典初始化或重新哈希完成后调用。
  10. no_value:无值标志,如果设置为1,表示字典的值不使用,即字典是一个集合。当此标志为1时,无法访问字典条目的值,也无法使用dictSetKey()函数。条目元数据也不能使用。
  11. keys_are_odd:键奇偶标志,如果设置为1且所有键都是奇数(最低有效位为1),则启用一个优化:在存储没有分配的字典条目时存储键。

在serve.c中运用dictType定义了一些基础命令格式如下:

/* Set dictionary type. Keys are SDS strings, values are not used. */
dictType setDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
.no_value = 1, /* no values in this dict */
.keys_are_odd = 1 /* an SDS string is always an odd pointer */
};

在networking.c中有createClient

这里给conn设置了ReadHandler

connSetReadHandler(conn, readQueryFromClient);

client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));

/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
...
}

那么我们知道了我们的读取命令的Handler在创建client的时候设置的,接下来我们来看看readQueryFormClient的实现

命令的读取

void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;

/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;

/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);

readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
big_arg = 1;

/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0) readlen = remaining;

/* Master client needs expand the readlen when meet BIG_ARG(see #9100),
* but doesn't need align to the next arg, we can read more data. */
if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
readlen = PROTO_IOBUF_LEN;
}

qblen = sdslen(c->querybuf);
if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
(big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
* into the query buffer, so we don't need to pre-allocate more than we
* need, so using the non-greedy growing. For an initial allocation of
* the query buffer, we also don't wanna use the greedy growth, in order
* to avoid collision with the RESIZE_THRESHOLD mechanism. */
c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
/* We later set the peak to the used portion of the buffer, but here we over
* allocated because we know what we need, make sure it'll not be shrunk before used. */
if (c->querybuf_peak < qblen + readlen) c->querybuf_peak = qblen + readlen;
} else {
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

/* Read as much as possible from the socket to save read(2) system calls. */
readlen = sdsavail(c->querybuf);
}
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
goto done;
}
} else if (nread == 0) {
if (server.verbosity <= LL_VERBOSE) {
sds info = catClientInfoString(sdsempty(), c);
serverLog(LL_VERBOSE, "Client closed connection %s", info);
sdsfree(info);
}
freeClientAsync(c);
goto done;
}

sdsIncrLen(c->querybuf,nread);
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) {
c->read_reploff += nread;
atomicIncr(server.stat_net_repl_input_bytes, nread);
} else {
atomicIncr(server.stat_net_input_bytes, nread);
}

if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
atomicIncr(server.stat_client_qbuf_limit_disconnections, 1);
goto done;
}

/* There is more data in the client input buffer, continue parsing it
* and check if there is a full command to execute. */
if (processInputBuffer(c) == C_ERR)
c = NULL;

done:
beforeNextClient(c);
}

这段代码是Redis服务器的一部分,用于处理客户端的请求。它首先获取客户端连接的私有数据,然后根据客户端请求的类型和长度,确定是否需要读取更多的数据。如果需要,它会尝试从客户端读取更多的数据,并将其存储在查询缓冲区中。然后,它会检查客户端是否发送了完整的命令,如果是,则处理该命令。最后,它会更新服务器的统计信息,并在必要时关闭客户端连接。

获取数据:client *c = connGetPrivateData(conn);

/* Get the associated private data pointer */
static inline void *connGetPrivateData(connection *conn) {
return conn->private_data;
}

命令解析processInputBuffer

int processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;

/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
if (c->flags & CLIENT_PENDING_COMMAND) break;

/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (isInsideYieldingLongCommand() && c->flags & CLIENT_MASTER) break;

/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

/* Determine request type when unknown. */
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}

if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}

/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (io_threads_op != IO_THREADS_OP_IDLE) {
serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
break;
}

/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return C_ERR;
}
}
}

if (c->flags & CLIENT_MASTER) {
/* If the client is a master, trim the querybuf to repl_applied,
* since master client is very special, its querybuf not only
* used to parse command, but also proxy to sub-replicas.
*
* Here are some scenarios we cannot trim to qb_pos:
* 1. we don't receive complete command from master
* 2. master client blocked cause of client pause
* 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND
*
* In these scenarios, qb_pos points to the part of the current command
* or the beginning of next command, and the current command is not applied yet,
* so the repl_applied is not equal to qb_pos. */
if (c->repl_applied) {
sdsrange(c->querybuf,c->repl_applied,-1);
c->qb_pos -= c->repl_applied;
c->repl_applied = 0;
}
} else if (c->qb_pos) {
/* Trim to pos */
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}

/* Update client memory usage after processing the query buffer, this is
* important in case the query buffer is big and wasn't drained during
* the above loop (because of partially sent big commands). */
if (io_threads_op == IO_THREADS_OP_IDLE)
updateClientMemUsageAndBucket(c);

return C_OK;
}

这段代码是一个名为processInputBuffer的函数,它接收一个指向客户端结构体的指针c作为参数。这个函数的主要目的是处理客户端发送到服务器的命令缓冲区。

函数首先检查客户端是否处于阻塞状态、是否有待处理的命令或者是否在执行主服务器上的脚本。如果满足这些条件之一,函数将立即终止循环并返回。

接下来,函数根据客户端请求的类型(单行或多行)来处理输入缓冲区。如果请求类型未知,函数将尝试确定请求类型。然后,根据请求类型调用相应的处理函数(processInlineBufferprocessMultibulkBuffer)。

在处理完输入缓冲区后,函数会检查客户端是否需要执行命令。如果需要,它将调用processCommandAndResetClient函数来执行命令并重置客户端。如果客户端不再有效,函数将提前返回错误。

最后,函数会根据客户端类型更新查询缓冲区的大小。对于主服务器客户端,它会将查询缓冲区裁剪到已应用的位置;对于其他类型的客户端,它会将查询缓冲区裁剪到当前位置。

在整个过程中,函数还会更新客户端的内存使用情况,这在某些情况下是很重要的,例如当查询缓冲区很大且在上述循环中没有被完全消耗时。

总之,这个函数的作用是处理客户端发送到服务器的命令缓冲区,并根据请求类型执行相应的操作。

执行命令processCommand

在上述函数中我们回调用processCommandAndResetClient函数,在这里面掉用了执行命令

processCommandAndResetClient函数

int processCommandAndResetClient(client *c) {
int deadclient = 0;
client *old_client = server.current_client;
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
updateClientMemUsageAndBucket(c);
}

if (server.current_client == NULL) deadclient = 1;
/*
* Restore the old client, this is needed because when a script
* times out, we will get into this code from processEventsWhileBlocked.
* Which will cause to set the server.current_client. If not restored
* we will return 1 to our caller which will falsely indicate the client
* is dead and will stop reading from its buffer.
*/
server.current_client = old_client;
/* performEvictions may flush slave output buffers. This may
* result in a slave, that may be the active client, to be
* freed. */
return deadclient ? C_ERR : C_OK;
}

函数首先将当前服务器的客户端设置为传入的客户端c。然后调用processCommand(c)来处理命令,如果处理成功,则调用commandProcessed(c)来标记命令已处理。接下来,调用updateClientMemUsageAndBucket(c)来更新客户端的内存使用情况和存储桶。

在处理完命令后,函数检查当前服务器的客户端是否为空,如果为空,则将deadclient标志设置为1,表示客户端已死亡。

为了确保在脚本超时时能够正确恢复旧的客户端状态,函数将服务器的当前客户端还原为之前保存的旧客户端old_client

最后,函数根据deadclient标志的值返回相应的结果。如果客户端已死亡,则返回C_ERR;否则返回C_OK

重点来了,执行命令

processCommand

int processCommand(client *c) {
if (!scriptIsTimedout()) {
/* Both EXEC and scripts call call() directly so there should be
* no way in_exec or scriptIsRunning() is 1.
* That is unless lua_timedout, in which case client may run
* some commands. */
serverAssert(!server.in_exec);
serverAssert(!scriptIsRunning());
}

/* in case we are starting to ProcessCommand and we already have a command we assume
* this is a reprocessing of this command, so we do not want to perform some of the actions again. */
int client_reprocessing_command = c->cmd ? 1 : 0;

/* only run command filter if not reprocessing command */
if (!client_reprocessing_command) {
moduleCallCommandFilters(c);
reqresAppendRequest(c);
}

/* Handle possible security attacks. */
if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) {
securityWarningCommand(c);
return C_ERR;
}

/* If we're inside a module blocked context yielding that wants to avoid
* processing clients, postpone the command. */
if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE &&
!(server.busy_module_yield_flags & BUSY_MODULE_YIELD_CLIENTS))
{
blockPostponeClient(c);
return C_OK;
}

/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth.
* In case we are reprocessing a command after it was blocked,
* we do not have to repeat the same checks */
if (!client_reprocessing_command) {
c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc);
sds err;
if (!commandCheckExistence(c, &err)) {
rejectCommandSds(c, err);
return C_OK;
}
if (!commandCheckArity(c, &err)) {
rejectCommandSds(c, err);
return C_OK;
}


/* Check if the command is marked as protected and the relevant configuration allows it */
if (c->cmd->flags & CMD_PROTECTED) {
if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) ||
(c->cmd->proc == moduleCommand && !allowProtectedAction(server.enable_module_cmd, c)))
{
rejectCommandFormat(c,"%s command not allowed. If the %s option is set to \"local\", "
"you can run it from a local connection, otherwise you need to set this option "
"in the configuration file, and then restart the server.",
c->cmd->proc == debugCommand ? "DEBUG" : "MODULE",
c->cmd->proc == debugCommand ? "enable-debug-command" : "enable-module-command");
return C_OK;

}
}
}

uint64_t cmd_flags = getCommandFlags(c);

int is_read_command = (cmd_flags & CMD_READONLY) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY));
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (cmd_flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
int is_denystale_command = !(cmd_flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(cmd_flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
int is_may_replicate_command = (cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
int is_deny_async_loading_command = (cmd_flags & CMD_NO_ASYNC_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_NO_ASYNC_LOADING));
int obey_client = mustObeyClient(c);

if (authRequired(c)) {
/* AUTH and HELLO and no auth commands are valid even in
* non-authenticated state. */
if (!(c->cmd->flags & CMD_NO_AUTH)) {
rejectCommand(c,shared.noautherr);
return C_OK;
}
}

if (c->flags & CLIENT_MULTI && c->cmd->flags & CMD_NO_MULTI) {
rejectCommandFormat(c,"Command not allowed inside a transaction");
return C_OK;
}

/* Check if the user can run this command according to the current
* ACLs. */
int acl_errpos;
int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,(c->flags & CLIENT_MULTI) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL,acl_errpos,NULL,NULL);
sds msg = getAclErrorMessage(acl_retval, c->user, c->cmd, c->argv[acl_errpos]->ptr, 0);
rejectCommandFormat(c, "-NOPERM %s", msg);
sdsfree(msg);
return C_OK;
}

/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!mustObeyClient(c) &&
!(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 &&
c->cmd->proc != execCommand))
{
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&c->slot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,c->slot,error_code);
c->cmd->rejected_calls++;
return C_OK;
}
}

/* Disconnect some clients if total clients memory is too high. We do this
* before key eviction, after the last command was executed and consumed
* some client output buffer memory. */
evictClients();
if (server.current_client == NULL) {
/* If we evicted ourself then abort processing the command */
return C_ERR;
}

/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction. */
if (server.maxmemory && !isInsideYieldingLongCommand()) {
int out_of_memory = (performEvictions() == EVICT_FAIL);

/* performEvictions may evict keys, so we need flush pending tracking
* invalidation keys. If we don't do this, we may get an invalidation
* message after we perform operation on the key, where in fact this
* message belongs to the old value of the key before it gets evicted.*/
trackingHandlePendingKeyInvalidations();

/* performEvictions may flush slave output buffers. This may result
* in a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;

int reject_cmd_on_oom = is_denyoom_command;
/* If client is in MULTI/EXEC context, queuing may consume an unlimited
* amount of memory, so we want to stop that.
* However, we never want to reject DISCARD, or even EXEC (unless it
* contains denied commands, in which case is_denyoom_command is already
* set. */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand) {
reject_cmd_on_oom = 1;
}

if (out_of_memory && reject_cmd_on_oom) {
rejectCommand(c, shared.oomerr);
return C_OK;
}

/* Save out_of_memory result at command start, otherwise if we check OOM
* in the first write within script, memory used by lua stack and
* arguments might interfere. We need to save it for EXEC and module
* calls too, since these can call EVAL, but avoid saving it during an
* interrupted / yielding busy script / module. */
server.pre_command_oom_state = out_of_memory;
}

/* Make sure to use a reasonable amount of memory for client side
* caching metadata. */
if (server.tracking_clients) trackingLimitUsedSlots();

/* Don't accept write commands if there are problems persisting on disk
* unless coming from our master, in which case check the replica ignore
* disk write error config to either log or crash. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
(is_write_command || c->cmd->proc == pingCommand))
{
if (obey_client) {
if (!server.repl_ignore_disk_write_error && c->cmd->proc != pingCommand) {
serverPanic("Replica was unable to write command to disk.");
} else {
static mstime_t last_log_time_ms = 0;
const mstime_t log_interval_ms = 10000;
if (server.mstime > last_log_time_ms + log_interval_ms) {
last_log_time_ms = server.mstime;
serverLog(LL_WARNING, "Replica is applying a command even though "
"it is unable to write to disk.");
}
}
} else {
sds err = writeCommandsGetDiskErrorMessage(deny_write_type);
/* remove the newline since rejectCommandSds adds it. */
sdssubstr(err, 0, sdslen(err)-2);
rejectCommandSds(c, err);
return C_OK;
}
}

/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
if (is_write_command && !checkGoodReplicasStatus()) {
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}

/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!obey_client &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);
return C_OK;
}

/* Only allow a subset of commands in the context of Pub/Sub if the
* connection is in RESP2 mode. With RESP3 there are no limits. */
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != ssubscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != sunsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P|S)SUBSCRIBE / "
"(P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
c->cmd->fullname);
return C_OK;
}

/* Only allow commands with flag "t", such as INFO, REPLICAOF and so on,
* when replica-serve-stale-data is no and we are a replica with a broken
* link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
is_denystale_command)
{
rejectCommand(c, shared.masterdownerr);
return C_OK;
}

/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && !server.async_loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}

/* During async-loading, block certain commands. */
if (server.async_loading && is_deny_async_loading_command) {
rejectCommand(c,shared.loadingerr);
return C_OK;
}

/* when a busy job is being done (script / module)
* Only allow a limited number of commands.
* Note that we need to allow the transactions commands, otherwise clients
* sending a transaction with pipelining without error checking, may have
* the MULTI plus a few initial commands refused, then the timeout
* condition resolves, and the bottom-half of the transaction gets
* executed, see Github PR #7022. */
if (isInsideYieldingLongCommand() && !(c->cmd->flags & CMD_ALLOW_BUSY)) {
if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
rejectCommandFormat(c, "-BUSY %s", server.busy_module_yield_reply);
} else if (server.busy_module_yield_flags) {
rejectCommand(c, shared.slowmoduleerr);
} else if (scriptIsEval()) {
rejectCommand(c, shared.slowevalerr);
} else {
rejectCommand(c, shared.slowscripterr);
}
return C_OK;
}

/* Prevent a replica from sending commands that access the keyspace.
* The main objective here is to prevent abuse of client pause check
* from which replicas are exempt. */
if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) {
rejectCommandFormat(c, "Replica can't interact with the keyspace");
return C_OK;
}

/* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */
if (!(c->flags & CLIENT_SLAVE) &&
((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) ||
((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command)))
{
blockPostponeClient(c);
return C_OK;
}

/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand)
{
queueMultiCommand(c, cmd_flags);
addReply(c,shared.queued);
} else {
int flags = CMD_CALL_FULL;
if (client_reprocessing_command) flags |= CMD_CALL_REPROCESSING;
call(c,flags);
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand())
handleClientsBlockedOnKeys();
}
return C_OK;
}

这段代码是一个名为processCommand的函数,它用于处理客户端发送的命令。函数首先检查命令是否超时,然后根据命令的类型和权限进行相应的处理。接下来,函数会检查命令是否具有可重入性、写权限、读权限等属性,并根据这些属性判断命令是否可以执行。最后,函数会根据命令的类型和执行结果进行相应的操作,如拒绝命令、执行命令等。

返回响应addReply(c,shared.queued);

在上满执行命令后,我们会返回结果

/* -----------------------------------------------------------------------------
* Higher level functions to queue data on the client output buffer.
* The following functions are the ones that commands implementations will call.
* -------------------------------------------------------------------------- */

/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;

if (sdsEncodedObject(obj)) {
_addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
_addReplyToBufferOrList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}

这段代码是一个名为addReply的函数,它接受两个参数:一个client指针和一个robj指针。该函数的主要目的是将给定的对象添加到客户端的回复缓冲区或列表中。

首先,函数调用prepareClientToWrite(c)来准备客户端以进行写操作。如果返回值不是C_OK,则函数直接返回,不执行任何操作。

接下来,函数检查对象是否为编码过的对象(通过sdsEncodedObject(obj))。如果是编码过的对象,则使用_addReplyToBufferOrList函数将其添加到客户端的回复缓冲区或列表中。这里使用了obj->ptr作为要添加的对象,并使用sdslen(obj->ptr)获取其长度。

如果对象不是编码过的对象,但对象的编码类型是整数编码(通过obj->encoding == OBJ_ENCODING_INT),则函数会将整数转换为字符串,并将其添加到客户端的回复缓冲区或列表中。这里使用了优化过的函数ll2string来完成转换,并将结果存储在字符数组buf中。然后,使用_addReplyToBufferOrList函数将转换后的字符串添加到客户端的回复缓冲区或列表中。

如果对象既不是编码过的对象,也不是整数编码的对象,则函数会触发服务器异常,提示”Wrong obj->encoding in addReply()”。