Skip to content

Commit

Permalink
[admin] Admin service to handle /admin/* requests and distribute them…
Browse files Browse the repository at this point in the history
… across all worker threads
  • Loading branch information
pajama-coder committed Aug 16, 2023
1 parent 79d5fdf commit 48bbfa9
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 4 deletions.
24 changes: 23 additions & 1 deletion src/admin-service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void AdminService::write_log(const std::string &name, const Data &data) {
}
}

auto AdminService::handle(Context *ctx, Message *req) -> Message* {
auto AdminService::handle(Context *ctx, Message *req) -> pjs::Object* {
static const std::string prefix_dump("/dump/");
static const std::string prefix_log("/log/");
static const std::string prefix_repo("/repo/");
Expand All @@ -189,6 +189,7 @@ auto AdminService::handle(Context *ctx, Message *req) -> Message* {
static const std::string prefix_api_v1_files("/api/v1/files/");
static const std::string prefix_api_v1_metrics("/api/v1/metrics/");
static const std::string prefix_api_v1_log("/api/v1/log/");
static const std::string prefix_admin("/admin/");
static const std::string text_html("text/html");

thread_local static pjs::ConstStr s_accept("accept");
Expand Down Expand Up @@ -448,6 +449,27 @@ auto AdminService::handle(Context *ctx, Message *req) -> Message* {
}
}

// Custom administration functionality
if (utils::starts_with(path, prefix_admin)) {
auto promise = pjs::Promise::make();
auto settler = pjs::Promise::Settler::make(promise);
settler->retain();
pjs::Ref<pjs::Str> path_str = pjs::Str::make(path);
WorkerManager::get().admin(
path_str, *body,
[=](const Data *res) {
InputContext ic;
if (res) {
settler->resolve(response(*res));
} else {
settler->resolve(m_response_not_found.get());
}
settler->release();
}
);
return promise;
}

// Static GUI content
if (method == "GET") {
http::File *f = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion src/admin-service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class AdminService : public pjs::RefCount<AdminService> {

pjs::Ref<Module> m_module;

auto handle(Context *ctx, Message *req) -> Message*;
auto handle(Context *ctx, Message *req) -> pjs::Object*;

Message* dump_GET();
Message* dump_GET(const std::string &path);
Expand Down
15 changes: 15 additions & 0 deletions src/api/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,12 @@ void Configuration::apply(JSModule *mod) {
worker->add_exit(p);
}
}

for (auto &i : m_admins) {
std::string name("Admin ");
auto p = make_pipeline(i.index, "", name + i.path, i);
worker->add_admin(i.path, p);
}
}

void Configuration::draw(Graph &g) {
Expand Down Expand Up @@ -863,6 +869,15 @@ void Configuration::draw(Graph &g) {
g.add_pipeline(std::move(p));
}
}

for (const auto &i : m_admins) {
Graph::Pipeline p;
p.index = i.index;
p.label = "Admin ";
p.label += i.path;
add_filters(p, i.filters);
g.add_pipeline(std::move(p));
}
}

auto Configuration::new_indexed_pipeline(const std::string &name, int &index) -> FilterConfigurator* {
Expand Down
93 changes: 92 additions & 1 deletion src/worker-thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,33 @@ void WorkerThread::reload_done(bool ok) {
}
}

void WorkerThread::admin(pjs::Str *path, SharedData *request, const std::function<void(SharedData*)> &respond) {
auto name = path->data()->retain();
request->retain();
m_net->post(
[=]() {
Data buf;
request->to_data(buf);
auto head = http::RequestHead::make();
head->path = pjs::Str::make(name);
pjs::Ref<Message> req = Message::make(head, Data::make(std::move(buf)));
if (!Worker::current()->admin(
req.get(),
[=](Message *response) {
auto body = response->body();
auto data = body ? SharedData::make(*body) : SharedData::make(Data());
data->retain();
Net::main().post([=]() { respond(data); data->release(); });
}
)) {
Net::main().post([=]() { respond(nullptr); });
}
request->release();
name->release();
}
);
}

bool WorkerThread::stop(bool force) {
if (force) {
m_net->post(
Expand Down Expand Up @@ -580,13 +607,21 @@ void WorkerManager::recycle() {
}

void WorkerManager::reload() {
if (m_reloading || m_querying_status || m_querying_stats) {
if (m_reloading || m_querying_status || m_querying_stats || !m_admin_requests.empty()) {
m_reloading_requested = true;
} else {
start_reloading();
}
}

bool WorkerManager::admin(pjs::Str *path, const Data &request, const std::function<void(const Data *)> &respond) {
if (m_reloading) return false;
if (m_worker_threads.empty()) return false;
new AdminRequest(this, path, request, respond);
next_admin_request();
return true;
}

void WorkerManager::check_reloading() {
if (m_reloading_requested) {
m_reloading_requested = false;
Expand Down Expand Up @@ -651,6 +686,14 @@ bool WorkerManager::stop(bool force) {
return true;
}

void WorkerManager::next_admin_request() {
if (auto r = m_admin_requests.head()) {
r->start();
} else {
check_reloading();
}
}

void WorkerManager::on_thread_done(int index) {
if (m_on_done) {
Net::main().post(
Expand All @@ -664,4 +707,52 @@ void WorkerManager::on_thread_done(int index) {
}
}

//
// WorkerManager::AdminRequest
//

WorkerManager::AdminRequest::AdminRequest(WorkerManager *manager, pjs::Str *path, const Data &request, const std::function<void(const Data *)> &respond)
: m_manager(manager)
, m_path(path)
, m_request(request)
, m_responses(manager->m_worker_threads.size())
, m_respond(respond)
{
manager->m_admin_requests.push(this);
}

WorkerManager::AdminRequest::~AdminRequest() {
m_manager->m_admin_requests.remove(this);
m_manager->next_admin_request();
}

void WorkerManager::AdminRequest::start() {
pjs::Ref<SharedData> req = SharedData::make(m_request);
for (size_t i = 0; i < m_manager->m_worker_threads.size(); i++) {
m_manager->m_worker_threads[i]->admin(
m_path, req, [=](SharedData *res) {
auto &r = m_responses[i];
if (res) {
res->to_data(r.data);
r.successful = true;
} else {
r.successful = false;
}
if (++m_response_count == m_responses.size()) {
Data response;
bool successful = false;
for (const auto &r : m_responses) {
if (r.successful) {
successful = true;
response.push(r.data);
}
}
m_respond(successful ? &response : nullptr);
delete this;
}
}
);
}
}

} // namespace pipy
23 changes: 23 additions & 0 deletions src/worker-thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#define WORKER_THREAD_HPP

#include "net.hpp"
#include "list.hpp"
#include "status.hpp"
#include "api/stats.hpp"

Expand Down Expand Up @@ -66,6 +67,7 @@ class WorkerThread {
void recycle();
void reload(const std::function<void(bool)> &cb);
void reload_done(bool ok);
void admin(pjs::Str *path, SharedData *request, const std::function<void(SharedData*)> &respond);
void exit(const std::function<void()> &cb);
bool stop(bool force = false);

Expand Down Expand Up @@ -118,11 +120,30 @@ class WorkerManager {
bool stats(const std::function<void(stats::MetricDataSum&)> &cb);
void recycle();
void reload();
bool admin(pjs::Str *path, const Data &request, const std::function<void(const Data *)> &respond);
auto concurrency() const -> int { return m_concurrency; }
auto active_pipeline_count() -> size_t;
bool stop(bool force = false);

private:
class AdminRequest : public List<AdminRequest>::Item {
public:
AdminRequest(WorkerManager *manager, pjs::Str *path, const Data &request, const std::function<void(const Data *)> &respond);
~AdminRequest();
void start();
private:
struct Response {
Data data;
bool successful = false;
};
WorkerManager* m_manager;
pjs::Ref<pjs::Str> m_path;
Data m_request;
std::vector<Response> m_responses;
size_t m_response_count = 0;
const std::function<void(const Data *)> m_respond;
};

std::vector<WorkerThread*> m_worker_threads;
Status m_status;
int m_status_counter = -1;
Expand All @@ -134,10 +155,12 @@ class WorkerManager {
bool m_reloading = false;
bool m_querying_status = false;
bool m_querying_stats = false;
List<AdminRequest> m_admin_requests;
std::function<void()> m_on_done;

void check_reloading();
void start_reloading();
void next_admin_request();
void on_thread_done(int index);

friend class WorkerThread;
Expand Down
11 changes: 10 additions & 1 deletion src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,15 @@ void Worker::stop(bool force) {
}
}

bool Worker::admin(Message *request, const std::function<void(Message*)> &respond) {
for (auto *admin : m_admins) {
if (admin->handle(request, respond)) {
return true;
}
}
return false;
}

auto Worker::new_module_index() -> int {
int index = 0;
while (index < m_modules.size() && m_modules[index]) index++;
Expand Down Expand Up @@ -544,7 +553,7 @@ Worker::Admin::~Admin() {
}

bool Worker::Admin::handle(Message *request, const std::function<void(Message*)> &respond) {
pjs::Ref<http::RequestHead> head = pjs::coerce<http::RequestHead>(request);
pjs::Ref<http::RequestHead> head = pjs::coerce<http::RequestHead>(request->head());
if (!utils::starts_with(head->path->str(), m_path)) return false;
new Handler(this, request, respond);
return true;
Expand Down

0 comments on commit 48bbfa9

Please sign in to comment.