diff --git a/envoy/http/filter.h b/envoy/http/filter.h index 749c5b0a4078..210181e014f3 100644 --- a/envoy/http/filter.h +++ b/envoy/http/filter.h @@ -644,6 +644,11 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { const absl::optional grpc_status, absl::string_view details) PURE; + /** + * Attempt to send GOAWAY and close the connection, and no filter chain will move forward. + */ + virtual void sendGoAwayAndClose() PURE; + /** * Adds decoded metadata. This function can only be called in * StreamDecoderFilter::decodeHeaders/Data/Trailers(). Do not call in diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index fcd70532e873..9e119615b53c 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -235,6 +235,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream, watermark_callbacks_->get().removeDownstreamWatermarkCallbacks(callbacks); } } + void sendGoAwayAndClose() override {} void setDecoderBufferLimit(uint32_t) override { IS_ENVOY_BUG("decoder buffer limits should not be overridden on async streams."); diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index a580f5aedab5..6d2cd1bc8c79 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -757,6 +757,18 @@ void ConnectionManagerImpl::onDrainTimeout() { checkForDeferredClose(false); } +void ConnectionManagerImpl::sendGoAwayAndClose() { + ENVOY_CONN_LOG(trace, "connection manager sendGoAwayAndClose was triggerred from filters.", + read_callbacks_->connection()); + if (go_away_sent_) { + return; + } + codec_->goAway(); + go_away_sent_ = true; + doConnectionClose(Network::ConnectionCloseType::FlushWriteAndDelay, absl::nullopt, + "forced_goaway"); +} + void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_reason, ConnectionManagerTracingStats& tracing_stats) { switch (tracing_reason) { diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 224fd5303ae7..e1b03433a75f 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -189,6 +189,9 @@ class ConnectionManagerImpl : Logger::Loggable, absl::string_view details) override { return filter_manager_.sendLocalReply(code, body, modify_headers, grpc_status, details); } + + void sendGoAwayAndClose() override { return connection_manager_.sendGoAwayAndClose(); } + AccessLog::InstanceSharedPtrVector accessLogHandlers() override { const AccessLog::InstanceSharedPtrVector& config_log_handlers = connection_manager_.config_->accessLogs(); @@ -597,6 +600,8 @@ class ConnectionManagerImpl : Logger::Loggable, void doConnectionClose(absl::optional close_type, absl::optional response_flag, absl::string_view details); + void sendGoAwayAndClose(); + // Returns true if a RST_STREAM for the given stream is premature. Premature // means the RST_STREAM arrived before response headers were sent and than // the stream was alive for short period of time. This period is specified @@ -646,6 +651,7 @@ class ConnectionManagerImpl : Logger::Loggable, const Server::OverloadActionState& overload_stop_accepting_requests_ref_; const Server::OverloadActionState& overload_disable_keepalive_ref_; TimeSource& time_source_; + bool go_away_sent_{false}; bool remote_close_{}; // Hop by hop headers should always be cleared for Envoy-as-a-proxy but will // not be for Envoy-mobile. diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index f18e42bd44a8..c2ba7ad87b88 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -453,6 +453,8 @@ void ActiveStreamDecoderFilter::sendLocalReply( ActiveStreamFilterBase::sendLocalReply(code, body, modify_headers, grpc_status, details); } +void ActiveStreamDecoderFilter::sendGoAwayAndClose() { parent_.sendGoAwayAndClose(); } + void ActiveStreamDecoderFilter::encode1xxHeaders(ResponseHeaderMapPtr&& headers) { // If Envoy is not configured to proxy 100-Continue responses, swallow the 100 Continue // here. This avoids the potential situation where Envoy strips Expect: 100-Continue and sends a @@ -868,7 +870,7 @@ void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMa metadata_map); if (state_.decoder_filter_chain_aborted_) { // If the decoder filter chain has been aborted, then either: - // 1. This filter has sent a local reply from decode metadata. + // 1. This filter has sent a local reply or GoAway from decode metadata. // 2. This filter is the terminal http filter, and an upstream HTTP filter has sent a local // reply. ASSERT((status == FilterMetadataStatus::StopIterationForLocalReply) || diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 36e240084d36..ae6b002ce095 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -262,6 +262,7 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, void setUpstreamOverrideHost(Upstream::LoadBalancerContext::OverrideHost) override; absl::optional upstreamOverrideHost() const override; bool shouldLoadShed() const override; + void sendGoAwayAndClose() override; // Each decoder filter instance checks if the request passed to the filter is gRPC // so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders() @@ -449,6 +450,11 @@ class FilterManagerCallbacks { */ virtual void endStream() PURE; + /** + * Attempt to send GOAWAY and close the connection. + */ + virtual void sendGoAwayAndClose() PURE; + /** * Called when the stream write buffer is no longer above the low watermark. */ @@ -857,6 +863,16 @@ class FilterManager : public ScopeTrackedObject, virtual bool shouldLoadShed() { return false; }; + void sendGoAwayAndClose() { + // Stop filter chain iteration by checking encoder or decoder chain. + if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) { + state_.decoder_filter_chain_aborted_ = true; + } else if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) { + state_.encoder_filter_chain_aborted_ = true; + } + filter_manager_callbacks_.sendGoAwayAndClose(); + } + protected: struct State { State() = default; diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index f69e1f2678fe..d97504b0a03d 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -338,6 +338,7 @@ class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallback void disarmRequestTimeout() override {} void resetIdleTimer() override {} void onLocalReply(Http::Code) override {} + void sendGoAwayAndClose() override {} // Upgrade filter chains not supported. const Router::RouteEntry::UpgradeMap* upgradeMap() override { return nullptr; } diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 5d618e11aef6..75b6af7fe3e7 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -520,6 +520,7 @@ class Filter : public Network::ReadFilter, std::function, const absl::optional, absl::string_view) override {} + void sendGoAwayAndClose() override {} void encode1xxHeaders(Http::ResponseHeaderMapPtr&&) override {} Http::ResponseHeaderMapOptRef informationalHeaders() override { return {}; } void encodeHeaders(Http::ResponseHeaderMapPtr&&, bool, absl::string_view) override {} diff --git a/test/integration/BUILD b/test/integration/BUILD index 8a7cee09de92..a9ff41909e43 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -557,6 +557,7 @@ envoy_cc_test( "//test/integration/filters:on_local_reply_filter_config_lib", "//test/integration/filters:request_metadata_filter_config_lib", "//test/integration/filters:response_metadata_filter_config_lib", + "//test/integration/filters:send_goaway_filter_lib", "//test/integration/filters:set_response_code_filter_config_proto_cc_proto", "//test/integration/filters:set_response_code_filter_lib", "//test/integration/filters:stop_in_headers_continue_in_data_filter_lib", diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 6dd174614bdd..6c634c656691 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -121,6 +121,21 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "send_goaway_filter_lib", + srcs = [ + "send_goaway_filter.cc", + ], + deps = [ + ":common_lib", + "//envoy/http:filter_interface", + "//envoy/registry", + "//envoy/server:filter_config_interface", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "//test/extensions/filters/http/common:empty_http_filter_config_lib", + ], +) + envoy_cc_test_library( name = "tee_filter_lib", srcs = [ diff --git a/test/integration/filters/send_goaway_filter.cc b/test/integration/filters/send_goaway_filter.cc new file mode 100644 index 000000000000..d9fe43333eee --- /dev/null +++ b/test/integration/filters/send_goaway_filter.cc @@ -0,0 +1,79 @@ +#include + +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "source/common/common/enum_to_int.h" +#include "source/common/http/utility.h" +#include "source/extensions/filters/http/common/pass_through_filter.h" + +#include "test/extensions/filters/http/common/empty_http_filter_config.h" +#include "test/integration/filters/common.h" + +namespace Envoy { + +class GoAwayDuringDecoding : public Http::PassThroughFilter { +public: + constexpr static char name[] = "send-goaway-during-decode-filter"; + + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override { + auto local_reply = request_headers.get(Http::LowerCaseString("send-local-reply")); + if (!local_reply.empty()) { + decoder_callbacks_->sendLocalReply(Http::Code::Gone, "", nullptr, std::nullopt, + "local_reply_test"); + return Http::FilterHeadersStatus::StopIteration; + } + + auto result = request_headers.get(Http::LowerCaseString("skip-goaway")); + if (!result.empty() && result[0]->value() == "true") { + go_away_skiped_ = true; + return Http::FilterHeadersStatus::Continue; + } + decoder_callbacks_->sendGoAwayAndClose(); + result = request_headers.get(Http::LowerCaseString("continue-filter-chain")); + if (!result.empty() && result[0]->value() == "true") { + return Http::FilterHeadersStatus::Continue; + } + return Http::FilterHeadersStatus::StopIteration; + } + + Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override { + return Http::FilterDataStatus::Continue; + } + + // Due to the above local reply, this method should never be invoked in tests. + Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap&) override { + ASSERT(go_away_skiped_); + return Http::FilterMetadataStatus::Continue; + } + + Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers, bool) override { + if (auto status = Http::Utility::getResponseStatus(headers); + status == enumToInt(Http::Code::Gone)) { + decoder_callbacks_->sendGoAwayAndClose(); + return Http::FilterHeadersStatus::Continue; + } + + auto result = headers.get(Http::LowerCaseString("send-encoder-goaway")); + if (result.empty()) { + return Http::FilterHeadersStatus::Continue; + } + decoder_callbacks_->sendGoAwayAndClose(); + result = headers.get(Http::LowerCaseString("continue-encoder-filter-chain")); + if (!result.empty() && result[0]->value() == "true") { + return Http::FilterHeadersStatus::Continue; + } + return Http::FilterHeadersStatus::StopIteration; + } + +private: + bool go_away_skiped_ = false; +}; + +constexpr char GoAwayDuringDecoding::name[]; +static Registry::RegisterFactory, + Server::Configuration::NamedHttpFilterConfigFactory> + register_; + +} // namespace Envoy diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc index 4cf5f719f476..46706a0804a6 100644 --- a/test/integration/multiplexed_integration_test.cc +++ b/test/integration/multiplexed_integration_test.cc @@ -1097,6 +1097,120 @@ TEST_P(MultiplexedIntegrationTest, BadFrame) { } } +// Test GoAway from L7 decoder filters with StopIteration. +TEST_P(MultiplexedIntegrationTest, SendGoAwayTriggerredByFilter) { + config_helper_.addFilter("name: send-goaway-during-decode-filter"); + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"skip-goaway", "false"}}); + + ASSERT_TRUE(response->waitForReset()); + ASSERT_TRUE(codec_client_->waitForDisconnect()); +} + +// Test GoAway from L7 decoder filters with Continue. +TEST_P(MultiplexedIntegrationTest, SendGoAwayTriggerredByFilterContinue) { + config_helper_.addFilter("name: send-goaway-during-decode-filter"); + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"continue-filter-chain", "true"}, + {"skip-goaway", "false"}}); + + ASSERT_TRUE(response->waitForReset()); + ASSERT_TRUE(codec_client_->waitForDisconnect()); +} + +// Test GoAway from L7 encoder filters with StopIteration. +TEST_P(MultiplexedIntegrationTest, SendGoAwayTriggerredByEncoderFilterStopIteration) { + FakeHttpConnectionPtr fake_upstream_connection; + Http::RequestEncoder* encoder; + FakeStreamPtr upstream_request; + config_helper_.addFilter("name: send-goaway-during-decode-filter"); + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + auto encoder_decoder = + codec_client_->startRequest(Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"skip-goaway", "true"}}); + + encoder = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection)); + ASSERT_TRUE(fake_upstream_connection->waitForNewStream(*dispatcher_, upstream_request)); + + codec_client_->sendData(*encoder, 128, true); + ASSERT_TRUE(upstream_request->waitForEndStream(*dispatcher_)); + + upstream_request->encodeHeaders( + Http::TestResponseHeaderMapImpl{{":status", "200"}, + {"send-encoder-goaway", "true"}, + {"continue-encoder-filter-chain", "false"}}, + true); + ASSERT_TRUE(response->waitForReset()); + ASSERT_TRUE(codec_client_->waitForDisconnect()); +} + +// Test GoAway from L7 encoder filters with Continue. +TEST_P(MultiplexedIntegrationTest, SendGoAwayTriggerredByEncoderFilterContinue) { + FakeHttpConnectionPtr fake_upstream_connection; + Http::RequestEncoder* encoder; + FakeStreamPtr upstream_request; + config_helper_.addFilter("name: send-goaway-during-decode-filter"); + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + auto encoder_decoder = + codec_client_->startRequest(Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"skip-goaway", "true"}}); + + encoder = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection)); + ASSERT_TRUE(fake_upstream_connection->waitForNewStream(*dispatcher_, upstream_request)); + + codec_client_->sendData(*encoder, 128, true); + ASSERT_TRUE(upstream_request->waitForEndStream(*dispatcher_)); + + upstream_request->encodeHeaders( + Http::TestResponseHeaderMapImpl{{":status", "200"}, + {"send-encoder-goaway", "true"}, + {"continue-encoder-filter-chain", "true"}}, + true); + ASSERT_TRUE(response->waitForReset()); + ASSERT_TRUE(codec_client_->waitForDisconnect()); +} + +// Test sending GoAway during SendLocalReply from L7 encoder filters. +TEST_P(MultiplexedIntegrationTest, SendGoAwayTriggerredInLocalReplyEncoderFilter) { + config_helper_.addFilter("name: send-goaway-during-decode-filter"); + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto response = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"send-local-reply", "true"}, + }); + ASSERT_TRUE(response->waitForReset()); + ASSERT_TRUE(codec_client_->waitForDisconnect()); +} + // Send client headers, a GoAway and then a body and ensure the full request and // response are received. TEST_P(MultiplexedIntegrationTest, GoAway) { @@ -2736,6 +2850,44 @@ void Http2FrameIntegrationTest::sendRequestsAndResponses(uint32_t num_requests) tcp_client_->close(); } +// Validate that GOAWAY is triggered by a L7 filter. +TEST_P(Http2FrameIntegrationTest, SendGoAwayTriggerredByDecodingFilter) { + config_helper_.addFilter("name: send-goaway-during-decode-filter"); + beginSession(); + uint32_t num_requests = 10; + std::string buffer; + for (uint32_t i = 0; i < num_requests; ++i) { + auto request = Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"request_no", absl::StrCat(i)}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (uint32_t i = 0; i < num_requests; ++i) { + auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a"); + absl::StrAppend(&buffer, std::string(data)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + tcp_client_->waitForDisconnect(); +} + +// GOAWAY is not triggered by a L7 filter. +TEST_P(Http2FrameIntegrationTest, SendGoAwayNotTriggerredByDecodingFilter) { + config_helper_.addFilter("name: send-goaway-during-decode-filter"); + beginSession(); + std::string buffer; + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(1), "a", "/", + {{"skip-goaway", "true"}}); + absl::StrAppend(&buffer, std::string(request)); + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + waitForNextUpstreamConnection({0}, std::chrono::milliseconds(500), fake_upstream_connection_); + FakeStreamPtr upstream_request; + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request)); + ASSERT_TRUE(upstream_request->waitForEndStream(*dispatcher_)); + tcp_client_->close(); +} + // Validate that processing of deferred requests with body and trailers is handled correctly // when there is a filter that pauses and resumes iteration. TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailersWithFilterChainPause) { diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 63b2163fbb89..2125533148db 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -84,6 +84,7 @@ class MockFilterManagerCallbacks : public FilterManagerCallbacks { MOCK_METHOD(ResponseHeaderMapOptRef, responseHeaders, ()); MOCK_METHOD(ResponseTrailerMapOptRef, responseTrailers, ()); MOCK_METHOD(void, endStream, ()); + MOCK_METHOD(void, sendGoAwayAndClose, ()); MOCK_METHOD(void, onDecoderFilterBelowWriteBufferLowWatermark, ()); MOCK_METHOD(void, onDecoderFilterAboveWriteBufferHighWatermark, ()); MOCK_METHOD(void, upgradeFilterChainCreated, ()); @@ -271,6 +272,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD(void, setDecoderBufferLimit, (uint32_t)); MOCK_METHOD(uint32_t, decoderBufferLimit, ()); MOCK_METHOD(bool, recreateStream, (const ResponseHeaderMap* headers)); + MOCK_METHOD(void, sendGoAwayAndClose, ()); MOCK_METHOD(void, addUpstreamSocketOptions, (const Network::Socket::OptionsSharedPtr& options)); MOCK_METHOD(Network::Socket::OptionsSharedPtr, getUpstreamSocketOptions, (), (const)); MOCK_METHOD(const Router::RouteSpecificFilterConfig*, mostSpecificPerFilterConfig, (), (const));