Skip to content

Commit

Permalink
[core] Changed the default value of maxQueue option in muxers from un…
Browse files Browse the repository at this point in the history
…limited to 1 for better compatibility with servers that don't support HTTP pipelining
  • Loading branch information
pajama-coder committed Nov 14, 2024
1 parent 1fc4951 commit 4c08d2b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 22 deletions.
3 changes: 2 additions & 1 deletion src/filters/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,8 @@ auto Mux::verify_http_version(pjs::Str *name) -> int {
//

Mux::Session::Session(const Mux::Options &options, std::shared_ptr<BufferStats> buffer_stats)
: Encoder(false, buffer_stats)
: MuxQueue(this)
, Encoder(false, buffer_stats)
, Decoder(true)
, http2::Client(options)
, m_options(options)
Expand Down
36 changes: 24 additions & 12 deletions src/filters/mux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ MuxSession::Options::Options(pjs::Object *options) {
// - MuxSessionPools
//

void MuxSession::increase_share_count() {
m_share_count++;
m_pool->sort(this);
}

void MuxSession::decrease_share_count() {
if (m_pool) {
m_pool->free(this);
} else {
close();
}
}

void MuxSession::set_pending(bool pending) {
if (pending != m_is_pending) {
if (!pending) {
Expand Down Expand Up @@ -157,14 +170,6 @@ void MuxSession::close() {
}
}

void MuxSession::free() {
if (m_pool) {
m_pool->free(this);
} else {
close();
}
}

//
// MuxSessionPool
//
Expand All @@ -185,9 +190,7 @@ auto MuxSessionPool::alloc() -> MuxSession* {
if ((max_share_count <= 0 || s->m_share_count < max_share_count) &&
(max_message_count <= 0 || s->m_message_count < max_message_count)
) {
s->m_share_count++;
s->m_message_count++;
sort(s);
return s;
}
s = s->next();
Expand Down Expand Up @@ -376,7 +379,6 @@ void MuxSource::reset() {
if (m_session) {
stop_waiting();
close_stream();
m_session->free();
m_session = nullptr;
}
m_waiting_events.clear();
Expand Down Expand Up @@ -424,7 +426,6 @@ void MuxSource::alloc_stream() {
if (m_session && !m_session->is_open()) {
stop_waiting();
close_stream();
m_session->free();
m_session = nullptr;
}

Expand Down Expand Up @@ -570,6 +571,17 @@ void MuxQueue::on_reply(Event *evt) {
// - MuxQueue::Receiver
//

MuxQueue::Stream::Stream(MuxQueue *queue, MuxSource *source)
: m_queue(queue)
, m_source(source)
{
queue->m_session->increase_share_count();
}

MuxQueue::Stream::~Stream() {
m_queue->m_session->decrease_share_count();
}

void MuxQueue::Stream::on_event(Event *evt) {
auto q = m_queue;

Expand Down
25 changes: 16 additions & 9 deletions src/filters/mux.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class MuxSession :

struct Options : public pipy::Options {
double max_idle = 60;
int max_queue = 0;
int max_queue = 1;
int max_messages = 0;
Options() {}
Options(pjs::Object *options);
Expand All @@ -72,6 +72,9 @@ class MuxSession :
virtual void mux_session_close_stream(EventFunction *stream) = 0;
virtual void mux_session_close() = 0;

void increase_share_count();
void decrease_share_count();

protected:
auto pool() const -> MuxSessionPool* { return m_pool; }
auto input() const -> EventTarget::Input* { return m_pipeline->input(); }
Expand All @@ -85,13 +88,12 @@ class MuxSession :
private:
void open(MuxSource *source, Pipeline *pipeline);
void close();
void free();

MuxSessionPool* m_pool = nullptr;
pjs::Ref<Pipeline> m_pipeline;
pjs::Ref<StreamEnd> m_eos;
List<MuxSource> m_waiting_sources;
int m_share_count = 1;
int m_share_count = 0;
int m_message_count = 0;
double m_free_time = 0;
bool m_is_pending = false;
Expand Down Expand Up @@ -217,6 +219,8 @@ class MuxSource : public List<MuxSource>::Item {

class MuxQueue : public EventSource {
protected:
MuxQueue(MuxSession *session) : m_session(session) {}

void reset();
auto stream(MuxSource *source) -> EventFunction*;
void close(EventFunction *stream);
Expand All @@ -238,11 +242,11 @@ class MuxQueue : public EventSource {
public pjs::RefCount<Stream>,
public EventFunction
{
protected:
Stream(MuxQueue *queue, MuxSource *source)
: m_queue(queue)
, m_source(source) {}
public:
Stream(MuxQueue *queue, MuxSource *source);
~Stream();

protected:
virtual void on_event(Event *evt) override;

private:
Expand Down Expand Up @@ -279,12 +283,13 @@ class MuxQueue : public EventSource {
pjs::Ref<Stream> m_stream;
int m_output_count;
bool m_has_message_started = false;

friend class MuxQueue;
};

MuxSession* m_session;
List<Receiver> m_receivers;
pjs::Ref<Stream> m_dedicated_stream;

friend class Stream;
};

//
Expand Down Expand Up @@ -346,6 +351,8 @@ class Mux : public MuxBase {
public pjs::Pooled<Session, MuxSession>,
public MuxQueue
{
Session() : MuxQueue(this) {}

virtual void mux_session_open(MuxSource *source) override;
virtual auto mux_session_open_stream(MuxSource *source) -> EventFunction* override;
virtual void mux_session_close_stream(EventFunction *stream) override;
Expand Down

0 comments on commit 4c08d2b

Please sign in to comment.