Skip to content

Commit

Permalink
move source ip from req context to req common
Browse files Browse the repository at this point in the history
Summary:
- move source ip to req common
- use folly::IPAddress instead of std::string
- change callsites to get ip from req rather than ctx

Reviewed By: disylh

Differential Revision: D64721214

fbshipit-source-id: d5d7bc35def6555e5de78db50c704b9bed0e77ee
  • Loading branch information
Xiaofei Hu authored and facebook-github-bot committed Nov 8, 2024
1 parent ebb0407 commit 4450ce9
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 75 deletions.
21 changes: 6 additions & 15 deletions mcrouter/CarbonRouterClient-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,9 @@ bool srHostInfoPtrFuncCarbonRouterClient(

template <class RouterInfo>
template <class Request, class F>
bool CarbonRouterClient<RouterInfo>::send(
const Request& req,
F&& callback,
folly::StringPiece ipAddr) {
auto makePreq = [this, ipAddr, &req, &callback](bool inBatch) mutable {
return makeProxyRequestContext(
req, std::forward<F>(callback), ipAddr, inBatch);
bool CarbonRouterClient<RouterInfo>::send(const Request& req, F&& callback) {
auto makePreq = [this, &req, &callback](bool inBatch) mutable {
return makeProxyRequestContext(req, std::forward<F>(callback), inBatch);
};

auto cancelRemaining = [&req, &callback]() {
Expand Down Expand Up @@ -259,15 +255,14 @@ template <class InputIt, class F>
bool CarbonRouterClient<RouterInfo>::send(
InputIt begin,
InputIt end,
F&& callback,
folly::StringPiece ipAddr) {
F&& callback) {
using IterReference = typename std::iterator_traits<InputIt>::reference;
using Request = typename std::decay<decltype(detail::unwrapRequest(
std::declval<IterReference>()))>::type;

auto makeNextPreq = [this, ipAddr, &callback, &begin](bool inBatch) {
auto makeNextPreq = [this, &callback, &begin](bool inBatch) {
auto proxyRequestContext = makeProxyRequestContext(
detail::unwrapRequest(*begin), callback, ipAddr, inBatch);
detail::unwrapRequest(*begin), callback, inBatch);
++begin;
return proxyRequestContext;
};
Expand Down Expand Up @@ -371,7 +366,6 @@ std::unique_ptr<ProxyRequestContextWithInfo<RouterInfo>>
CarbonRouterClient<RouterInfo>::makeProxyRequestContext(
const Request& req,
CallbackFunc&& callback,
folly::StringPiece ipAddr,
bool inBatch) {
Proxy<RouterInfo>* proxy = proxies_[proxyIdx_];
if (mode_ == ThreadMode::AffinitizedRemoteThread) {
Expand All @@ -398,9 +392,6 @@ CarbonRouterClient<RouterInfo>::makeProxyRequestContext(
});

proxyRequestContext->setRequester(self_);
if (!ipAddr.empty()) {
proxyRequestContext->setUserIpAddress(ipAddr);
}
return proxyRequestContext;
}

Expand Down
13 changes: 2 additions & 11 deletions mcrouter/CarbonRouterClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ class CarbonRouterClient : public CarbonRouterClientBase {
* callback is called.
*/
template <class Request, class F>
bool send(
const Request& req,
F&& callback,
folly::StringPiece ipAddr = folly::StringPiece());
bool send(const Request& req, F&& callback);

/**
* Multi requests version of send.
Expand All @@ -122,11 +119,7 @@ class CarbonRouterClient : public CarbonRouterClientBase {
* each request.
*/
template <class InputIt, class F>
bool send(
InputIt begin,
InputIt end,
F&& callback,
folly::StringPiece ipAddr = folly::StringPiece());
bool send(InputIt begin, InputIt end, F&& callback);

CacheClientCounters getStatCounters() noexcept {
return stats_.getCounters();
Expand Down Expand Up @@ -222,7 +215,6 @@ class CarbonRouterClient : public CarbonRouterClientBase {
* @param req The request to be routed.
* @param callback The callback function to be called once the reply
* is received.
* @param ipAddr The ip address of the caller (can be empty).
* @param inBatch Whether or not the given request is part of a batch
* of requests.
*
Expand All @@ -233,7 +225,6 @@ class CarbonRouterClient : public CarbonRouterClientBase {
makeProxyRequestContext(
const Request& req,
CallbackFunc&& callback,
folly::StringPiece ipAddr,
bool inBatch);

bool shouldDelayNotification(size_t batchSize) const;
Expand Down
10 changes: 0 additions & 10 deletions mcrouter/ProxyRequestContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,6 @@ class ProxyRequestContext {
"of ProxyRequestContext");
}

const std::string& userIpAddress() const noexcept {
return userIpAddr_;
}

void setUserIpAddress(folly::StringPiece newAddr) noexcept {
userIpAddr_ = newAddr.str();
}

bool isProcessing() const {
return processing_;
}
Expand Down Expand Up @@ -219,8 +211,6 @@ class ProxyRequestContext {

uint64_t senderIdForTest_{0};

std::string userIpAddr_;

ProxyRequestPriority priority_{ProxyRequestPriority::kCritical};

bool failoverDisabled_{false};
Expand Down
9 changes: 4 additions & 5 deletions mcrouter/ServerOnRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,11 @@ class ServerOnRequest {
}
};

folly::Optional<std::string> peerIp;
if (retainSourceIp_ && (peerIp = ctxRef.getPeerSocketAddressStr())) {
client_.send(reqRef, std::move(cb), *peerIp);
} else {
client_.send(reqRef, std::move(cb));
std::optional<folly::IPAddress> peerIp;
if (retainSourceIp_ && (peerIp = ctxRef.getPeerSocketIPAddress())) {
reqRef.setSourceIpAddr(*peerIp);
}
client_.send(reqRef, std::move(cb));
}

private:
Expand Down
8 changes: 4 additions & 4 deletions mcrouter/lib/HashSelector.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "mcrouter/McrouterFiberContext.h"
#include "mcrouter/lib/HashUtil.h"
#include "mcrouter/lib/fbi/cpp/util.h"

namespace facebook {
namespace memcache {
Expand Down Expand Up @@ -88,17 +89,16 @@ class BucketHashSelector : public HashSelectorBase<HashFunc> {
clientFanout_(clientFanout) {}

template <class Request>
size_t select(const Request& /*req*/, size_t size) const {
size_t select(const Request& req, size_t size) const {
auto bucketId = mcrouter::fiber_local<RouterInfo>::getBucketId();
checkRuntime(
bucketId.has_value(),
"The context doesn't contain bucket id. You must use McBucketRoute in front of bucketized PoolRoute");
// Hash functions can be stack-intensive, so jump back to the main context
auto key = folly::to<std::string>(*bucketId);
if (this->clientFanout_) {
auto& ctx = mcrouter::fiber_local<RouterInfo>::getSharedCtx();
if (ctx) {
key += ctx->userIpAddress();
if (auto sourceIpAddr = req.getSourceIpAddr()) {
key += sourceIpAddr->str();
}
}
return folly::fibers::runInMainContext(
Expand Down
11 changes: 11 additions & 0 deletions mcrouter/lib/carbon/RequestCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <optional>
#include <utility>

#include <folly/IPAddress.h>
#include <folly/io/IOBuf.h>

#include "mcrouter/lib/carbon/MessageCommon.h"
Expand Down Expand Up @@ -93,6 +94,14 @@ class RequestCommon : public MessageCommon {
clientIdentifier_ = clientIdentifier.str();
}

const std::optional<folly::IPAddress>& getSourceIpAddr() const noexcept {
return sourceIpAddr_;
}

void setSourceIpAddr(const folly::IPAddress& sourceIpAddr) noexcept {
sourceIpAddr_ = sourceIpAddr;
}

protected:
void markBufferAsDirty() {
serializedBuffer_ = nullptr;
Expand All @@ -104,6 +113,8 @@ class RequestCommon : public MessageCommon {
std::optional<std::string> cryptoAuthToken_;
// Hash string of primary (non-host) tls client identities
std::optional<std::string> clientIdentifier_;
// Source ip address.
std::optional<folly::IPAddress> sourceIpAddr_;
};

} // namespace carbon
9 changes: 5 additions & 4 deletions mcrouter/lib/network/McServerRequestContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ ServerLoad McServerRequestContext::getServerLoad() const noexcept {
return ServerLoad::zero();
}

folly::Optional<std::string> McServerRequestContext::getPeerSocketAddressStr() {
folly::Optional<std::string> peerAddressStr;
std::optional<folly::IPAddress>
McServerRequestContext::getPeerSocketIPAddress() {
std::optional<folly::IPAddress> peerAddress;
if (session_) {
peerAddressStr = session_->getSocketAddress().getAddressStr();
peerAddress = session_->getSocketAddress().getIPAddress();
}
return peerAddressStr;
return peerAddress;
}

folly::Optional<struct sockaddr_storage>
Expand Down
2 changes: 1 addition & 1 deletion mcrouter/lib/network/McServerRequestContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class McServerRequestContext {
ServerLoad getServerLoad() const noexcept;

folly::Optional<struct sockaddr_storage> getPeerSocketAddress();
folly::Optional<std::string> getPeerSocketAddressStr();
std::optional<folly::IPAddress> getPeerSocketIPAddress();

folly::EventBase& getSessionEventBase() const noexcept;
const apache::thrift::Cpp2RequestContext* getThriftRequestContext()
Expand Down
8 changes: 4 additions & 4 deletions mcrouter/lib/network/McThriftContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ class McThriftContext {
ctx.underlying_->exception(std::move(ex));
}

folly::Optional<std::string> getPeerSocketAddressStr() {
folly::Optional<std::string> peerAddressStr;
std::optional<folly::IPAddress> getPeerSocketIPAddress() {
std::optional<folly::IPAddress> peerIPAddr;
auto connectionCxt = underlying_->getConnectionContext();
if (connectionCxt) {
peerAddressStr = connectionCxt->getPeerAddress()->getAddressStr();
peerIPAddr = connectionCxt->getPeerAddress()->getIPAddress();
}
return peerAddressStr;
return peerIPAddr;
}

folly::Optional<struct sockaddr_storage> getPeerSocketAddress() {
Expand Down
41 changes: 41 additions & 0 deletions mcrouter/lib/test/RouteHandleTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,47 @@ TEST(routeHandleTest, hashNoSalt) {
});
}

TEST(routeHandleTest, bucketHashSelector) {
vector<std::shared_ptr<TestHandle>> test_handles{
make_shared<TestHandle>(GetRouteTestData(carbon::Result::FOUND, "a")),
make_shared<TestHandle>(GetRouteTestData(carbon::Result::FOUND, "b")),
make_shared<TestHandle>(GetRouteTestData(carbon::Result::FOUND, "c")),
};
auto outOfRangeRh = createNullRoute<typename TestRouterInfo::RouteHandleIf>();

TestFiberManager<TestRouterInfo> fm;

TestRouteHandle<SelectionRoute<
TestRouterInfo,
BucketHashSelector<HashFunc, TestRouterInfo>>>
rh(get_route_handles(test_handles),
BucketHashSelector<HashFunc, TestRouterInfo>(
/* salt= */ "1",
HashFunc(test_handles.size()),
/*clientFanout=*/true),
std::move(outOfRangeRh));
McGetRequest getReq("0");
fm.run([&]() {
fiber_local<TestRouterInfo>::setBucketId(1);
auto reply = rh.route(getReq);
EXPECT_EQ("c", carbon::valueRangeSlow(reply).str());
});

fm.run([&]() {
fiber_local<TestRouterInfo>::setBucketId(1);
getReq.setSourceIpAddr(folly::IPAddress("0.0.0.0"));
auto reply = rh.route(getReq);
EXPECT_EQ("b", carbon::valueRangeSlow(reply).str());
});

fm.run([&]() {
fiber_local<TestRouterInfo>::setBucketId(1);
getReq.setSourceIpAddr(folly::IPAddress("1.0.0.0"));
auto reply = rh.route(getReq);
EXPECT_EQ("c", carbon::valueRangeSlow(reply).str());
});
}

TEST(routeHandleTest, hashSalt) {
vector<std::shared_ptr<TestHandle>> test_handles{
make_shared<TestHandle>(GetRouteTestData(carbon::Result::FOUND, "a")),
Expand Down
11 changes: 5 additions & 6 deletions mcrouter/routes/LoggingRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,11 @@ class LoggingRoute {

// Pull the IP (if available) out of the saved request
auto& ctx = mcrouter::fiber_local<RouterInfo>::getSharedCtx();
auto& ip = ctx->userIpAddress();
folly::StringPiece userIp;
if (!ip.empty()) {
userIp = ip;
std::string userIpStr;
if (auto sourceIpAddr = req.getSourceIpAddr()) {
userIpStr = sourceIpAddr->str();
} else {
userIp = "N/A";
userIpStr = "N/A";
}

auto& callback = ctx->proxy().router().postprocessCallback();
Expand All @@ -83,7 +82,7 @@ class LoggingRoute {
carbon::convertToFollyDynamic(req),
carbon::convertToFollyDynamic(reply),
Request::name,
userIp);
userIpStr);
}
}
return reply;
Expand Down
15 changes: 6 additions & 9 deletions mcrouter/routes/OriginalClientHashRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ class OriginalClientHashRoute {
return false;
}

auto& ctx = mcrouter::fiber_local<RouterInfo>::getSharedCtx();
auto& ip = ctx->userIpAddress();
if (!ip.empty()) {
if (auto ip = req.getSourceIpAddr()) {
return t(
*children_[(folly::Hash()(ip) + offset_) % children_.size()], req);
*children_[(folly::Hash()(ip->str()) + offset_) % children_.size()],
req);
}
return false;
}
Expand All @@ -69,11 +68,9 @@ class OriginalClientHashRoute {
ReplyT<Request> reply;
assert(children_.size() > 1);

auto& ctx = mcrouter::fiber_local<RouterInfo>::getSharedCtx();
auto& ip = ctx->userIpAddress();
if (!ip.empty()) {
return children_[(folly::Hash()(ip) + offset_) % children_.size()]->route(
req);
if (auto ip = req.getSourceIpAddr()) {
return children_[(folly::Hash()(ip->str()) + offset_) % children_.size()]
->route(req);
} else {
return createReply<Request>(
ErrorReply, carbon::Result::LOCAL_ERROR, "Missing IP");
Expand Down
42 changes: 42 additions & 0 deletions mcrouter/routes/test/LoggingRouteTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <memory>
#include <string>

#include <gtest/gtest.h>

#include "mcrouter/lib/test/TestRouteHandle.h"
#include "mcrouter/routes/LoggingRoute.h"
#include "mcrouter/routes/test/RouteHandleTestUtil.h"

using TestRouterInfo = facebook::memcache::TestRouterInfo;
using TestHandle =
facebook::memcache::TestHandleImpl<facebook::memcache::TestRouteHandleIf>;
using LoggingRoute = facebook::memcache::mcrouter::LoggingRoute<TestRouterInfo>;

TEST(LoggingRouteTest, basic) {
std::shared_ptr<TestHandle> test_handle = std::make_shared<TestHandle>(
facebook::memcache::GetRouteTestData(carbon::Result::FOUND, "a"));

auto rh = facebook::memcache::mcrouter::createLoggingRoute<TestRouterInfo>(
test_handle->rh);
facebook::memcache::McGetRequest req("key");
req.setSourceIpAddr(folly::IPAddress("1.2.3.4"));

facebook::memcache::TestFiberManager<facebook::memcache::TestRouterInfo> fm;
fm.run([&]() {
facebook::memcache::mcrouter::mockFiberContext();
facebook::memcache::mcrouter::getTestRouter()->setPostprocessCallback(
[](const folly::dynamic&,
const folly::dynamic&,
const char* const,
const folly::StringPiece ip) { EXPECT_EQ("1.2.3.4", ip); });
auto reply = rh->route(req);
EXPECT_EQ("a", carbon::valueRangeSlow(reply).str());
});
}
Loading

0 comments on commit 4450ce9

Please sign in to comment.