Skip to content

Commit

Permalink
[api] Allow changing all listeners at the same time in one call to Li…
Browse files Browse the repository at this point in the history
…stenerArray.set()
  • Loading branch information
pajama-coder committed Aug 17, 2023
1 parent 48bbfa9 commit 56957b3
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/admin-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ void AdminProxy::open(const std::string &ip, int port, const Options &options) {
ppl_forward->append(new http::Mux(nullptr, nullptr))->add_sub_pipeline(ppl_connect);

Listener::Options opts;
opts.reserved = true;
auto listener = Listener::get(Listener::Protocol::TCP, ip, port);
listener->set_reserved(true);
listener->set_options(opts);
listener->pipeline_layout(ppl);
m_ip = ip;
Expand Down
2 changes: 1 addition & 1 deletion src/admin-service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ void AdminService::open(const std::string &ip, int port, const Options &options)
ppl_ws->append(new websocket::Encoder());

Listener::Options opts;
opts.reserved = true;
auto listener = Listener::get(Listener::Protocol::TCP, ip, port);
listener->set_reserved(true);
listener->set_options(opts);
listener->pipeline_layout(ppl);
m_ip = ip;
Expand Down
91 changes: 80 additions & 11 deletions src/listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,16 +400,6 @@ auto ListenerArray::add_listener(pjs::Str *port, pjs::Object *options) -> Listen
get_ip_port(port->str(), ip, port_num);

auto listener = Listener::get(opts.protocol, ip, port_num);
if (listener->reserved()) {
std::string msg("Port reserved: ");
throw std::runtime_error(msg + std::to_string(port_num));
}
#ifndef __linux__
if (opts.transparent) {
Log::error("Trying to listen on %d in transparent mode, which is not supported on this platform", port_num);
}
#endif

m_listeners[listener] = opts;

if (auto *w = m_worker) {
Expand Down Expand Up @@ -447,6 +437,63 @@ auto ListenerArray::remove_listener(pjs::Str *port, pjs::Object *options) -> Lis
return listener;
}

void ListenerArray::set_listeners(pjs::Array *array) {
std::map<Listener*, Listener::Options> listeners;
if (array) {
array->iterate_all(
[&](pjs::Value &v, int i) {
std::string ip;
int port_num;
if (v.is_number()) {
auto l = Listener::get(Listener::Protocol::TCP, "0.0.0.0", v.to_int32());
listeners[l] = m_default_options.get();
} else if (v.is_string()) {
get_ip_port(v.s()->str(), ip, port_num);
auto l = Listener::get(Listener::Protocol::TCP, ip, port_num);
listeners[l] = m_default_options.get();
} else if (v.is_object() && v.o()) {
pjs::Value port;
v.o()->get("port", port);
if (port.is_number()) {
ip = "0.0.0.0";
port_num = port.to_int32();
} else if (port.is_string()) {
get_ip_port(port.s()->str(), ip, port_num);
} else {
std::string err("invalid value type for the port property in element ");
err += std::to_string(i);
throw std::runtime_error(err);
}
Listener::Options opt(v.o());
auto l = Listener::get(opt.protocol, ip, port_num);
listeners[l] = opt;
} else {
std::string err("invalid value type for a listening port in element ");
err += std::to_string(i);
throw std::runtime_error(err);
}
}
);
}

if (m_worker) {
for (const auto &i : m_listeners) {
auto l = i.first;
if (listeners.find(l) == listeners.end()) {
m_worker->remove_listener(l);
}
}

for (const auto &i : listeners) {
m_worker->add_listener(i.first, m_pipeline_layout, i.second);
}

m_worker->update_listeners(true);
}

m_listeners.swap(listeners);
}

void ListenerArray::set_default_options(pjs::Object *options) {
m_default_options = options;
}
Expand Down Expand Up @@ -482,7 +529,29 @@ namespace pjs {
using namespace pipy;

template<> void ClassDef<ListenerArray>::init() {
ctor();
ctor([](Context &ctx) -> Object* {
Array *listeners = nullptr;
if (!ctx.arguments(0, &listeners)) return nullptr;
auto la = ListenerArray::make();
if (listeners) {
try {
la->set_listeners(listeners);
} catch (std::runtime_error &err) {
ctx.error(err);
}
}
return la;
});

method("set", [](Context &ctx, Object *obj, Value &ret) {
Array *listeners = nullptr;
if (!ctx.arguments(0, &listeners)) return;
try {
obj->as<ListenerArray>()->set_listeners(listeners);
} catch (std::runtime_error &err) {
ctx.error(err);
}
});

method("add", [](Context &ctx, Object *obj, Value &ret) {
try {
Expand Down
6 changes: 4 additions & 2 deletions src/listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class Listener {
Protocol protocol = Protocol::TCP;
size_t max_packet_size = 16 * 1024;
int max_connections = -1;
bool reserved = false;

Options() {}
Options(pjs::Object *options);
Expand Down Expand Up @@ -88,12 +87,13 @@ class Listener {
auto ip() const -> const std::string& { return m_ip; }
auto port() const -> int { return m_port; }
bool is_open() const { return m_pipeline_layout; }
bool reserved() const { return m_options.reserved; }
bool reserved() const { return m_reserved; }
auto pipeline_layout() const -> PipelineLayout* { return m_pipeline_layout; }
bool pipeline_layout(PipelineLayout *layout);
auto current_connections() const -> int { return m_inbounds.size(); }
auto peak_connections() const -> int { return m_peak_connections; }

void set_reserved(bool b) { m_reserved = b; }
void set_options(const Options &options);
void for_each_inbound(const std::function<void(Inbound*)> &cb);

Expand Down Expand Up @@ -175,6 +175,7 @@ class Listener {
std::string m_ip;
int m_port;
int m_peak_connections = 0;
bool m_reserved = false;
bool m_paused = false;
asio::ip::address m_address;
pjs::Ref<Acceptor> m_acceptor;
Expand All @@ -200,6 +201,7 @@ class ListenerArray : public pjs::ObjectTemplate<ListenerArray> {
auto add_listener(pjs::Str *port, pjs::Object *options = nullptr) -> Listener*;
auto remove_listener(int port, pjs::Object *options = nullptr) -> Listener*;
auto remove_listener(pjs::Str *port, pjs::Object *options = nullptr) -> Listener*;
void set_listeners(pjs::Array *array);
void set_default_options(pjs::Object *options);
void apply(Worker *worker, PipelineLayout *layout);

Expand Down
5 changes: 5 additions & 0 deletions src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ bool Worker::update_listeners(bool force) {
for (const auto &i : m_listeners) {
auto l = i.first;
if (!l->is_open()) {
#ifndef __linux__
if (l->options().transparent) {
Log::error("Trying to listen on %d in transparent mode, which is not supported on this platform", l->port());
}
#endif
new_open.insert(l);
l->set_options(i.second.options);
if (!l->pipeline_layout(i.second.pipeline_layout)) {
Expand Down

0 comments on commit 56957b3

Please sign in to comment.