diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index 83bea31eb..123bd4e35 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -233,11 +233,6 @@ void SnapshotPayloadParseState::trimState() { if (stackParse.empty()) { flushQueuedKeys(); - while (*insertsInFlight > 0) { - // TODO: ProcessEventsWhileBlocked - aeReleaseLock(); - aeAcquireLock(); - } } } diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h index 29ac28fb6..91e9bc956 100644 --- a/src/SnapshotPayloadParseState.h +++ b/src/SnapshotPayloadParseState.h @@ -63,4 +63,5 @@ class SnapshotPayloadParseState { void pushValue(const char *rgch, long long cch); void pushValue(long long value); bool shouldThrottle() const { return *insertsInFlight > (cserver.cthreads*4); } + bool hasIOInFlight() const { return *insertsInFlight > 0; } }; \ No newline at end of file diff --git a/src/config.cpp b/src/config.cpp index 749fb5746..0ac230a9f 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -3201,7 +3201,7 @@ standardConfig configs[] = { createIntConfig("overload-protect-tenacity", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->overload_protect_tenacity, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("force-eviction-percent", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->force_eviction_percent, 0, INTEGER_CONFIG, NULL, NULL), createBoolConfig("enable-async-rehash", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_rehash, 1, NULL, NULL), - createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 0, NULL, NULL), + createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 1, NULL, NULL), #ifdef USE_OPENSSL createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, updateTLSPort), /* TCP port. */ diff --git a/src/replication.cpp b/src/replication.cpp index 91b995991..d0c3d1b91 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1059,7 +1059,7 @@ class replicationBuffer { while (checkClientOutputBufferLimits(replica) && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { ul.unlock(); - usleep(0); + usleep(1000); // give 1ms for the I/O before we poll again ul.lock(); } } @@ -2521,7 +2521,7 @@ size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { return cchNumeral + 3; } -bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) { +bool readFastSyncBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) { int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; serverAssert(GlobalLocksAcquired()); serverAssert(mi->master == nullptr); @@ -2546,6 +2546,10 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi } } + if (mi->repl_state == REPL_STATE_WAIT_STORAGE_IO) { + goto LWaitIO; + } + serverAssert(mi->parseState != nullptr); for (int iter = 0; iter < 10; ++iter) { if (mi->parseState->shouldThrottle()) @@ -2663,7 +2667,14 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi if (!fFinished) return false; +LWaitIO: + if (mi->parseState->hasIOInFlight()) { + mi->repl_state = REPL_STATE_WAIT_STORAGE_IO; + return false; + } + serverLog(LL_NOTICE, "Fast sync complete"); + serverAssert(!mi->parseState->hasIOInFlight()); delete mi->parseState; mi->parseState = nullptr; return true; @@ -3040,7 +3051,7 @@ void readSyncBulkPayload(connection *conn) { } if (mi->isKeydbFastsync) { - if (!readSnapshotBulkPayload(conn, mi, rsi)) + if (!readFastSyncBulkPayload(conn, mi, rsi)) return; } else { if (!readSyncBulkPayloadRdb(conn, mi, rsi, usemark)) @@ -4807,6 +4818,10 @@ void replicationCron(void) { { redisMaster *mi = (redisMaster*)listNodeValue(lnMaster); + if (mi->repl_state == REPL_STATE_WAIT_STORAGE_IO && !mi->parseState->hasIOInFlight()) { + readSyncBulkPayload(mi->repl_transfer_s); + } + std::unique_lockmaster->lock)> ulock; if (mi->master != nullptr) ulock = decltype(ulock)(mi->master->lock); diff --git a/src/server.h b/src/server.h index 3826385a6..69966f9f3 100644 --- a/src/server.h +++ b/src/server.h @@ -612,6 +612,7 @@ typedef enum { REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ + REPL_STATE_WAIT_STORAGE_IO, REPL_STATE_CONNECTED, /* Connected to master */ } repl_state;