Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/notify.c
${CMAKE_SOURCE_DIR}/src/setproctitle.c
${CMAKE_SOURCE_DIR}/src/blocked.c
${CMAKE_SOURCE_DIR}/src/blocked_inuse.c
${CMAKE_SOURCE_DIR}/src/hyperloglog.c
${CMAKE_SOURCE_DIR}/src/latency.c
${CMAKE_SOURCE_DIR}/src/sparkline.c
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ ENGINE_SERVER_OBJ = \
bio.o \
bitops.o \
blocked.o \
blocked_inuse.o \
call_reply.o \
childinfo.o \
cluster.o \
Expand Down
14 changes: 13 additions & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "monotonic.h"
#include "cluster_slot_stats.h"
#include "module.h"
#include "blocked_inuse.h"

/* forward declarations */
static void unblockClientWaitingData(client *c);
Expand Down Expand Up @@ -160,10 +161,13 @@ void processUnblockedClients(void) {
client *c;

while (listLength(server.unblocked_clients)) {
// If one of the unblocked clients executed pause command, then we stop processing further.
if (isPausedActionsWithUpdate(PAUSE_ACTIONS_CLIENT_ALL_SET)) return;
ln = listFirst(server.unblocked_clients);
serverAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients, ln);
serverAssert(!blockInuse_clientBlocked(c));
c->flag.unblocked = 0;

if (c->flag.module) {
Expand All @@ -173,6 +177,14 @@ void processUnblockedClients(void) {
continue;
}

/* Reinstall read handler if it was removed (e.g. by blockInuse) */
Comment thread
harrylin98 marked this conversation as resolved.
if (c->conn && !connHasReadHandler(c->conn)) {
// If it fails because epoll_ctl failed then freeClient.
Comment thread
harrylin98 marked this conversation as resolved.
if (connSetReadHandler(c->conn, readQueryFromClient) == C_ERR) {
freeClient(c);
return;
}
}
/* Process remaining data in the input buffer, unless the client
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
Expand All @@ -183,7 +195,7 @@ void processUnblockedClients(void) {
continue;
}
}
beforeNextClient(c);
if (!c->flag.close_asap) beforeNextClient(c);
}
}

Expand Down
Loading
Loading