Skip to content

Commit

Permalink
Merge master registries with cluster workers (siimon#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsnigel committed Jun 10, 2021
1 parent c7b9a9d commit d5310ac
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 23 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ label. (See `example/server.js` for an example using
Metrics are aggregated from the global registry by default. To use a different
registry, call
`client.AggregatorRegistry.setRegistries(registryOrArrayOfRegistries)` from the
worker processes.
master or worker processes.

## API

Expand Down
76 changes: 54 additions & 22 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/**
* Extends the Registry class with a `clusterMetrics` method that returns
* aggregated metrics for all workers.
* aggregated metrics for master and all workers.
*
* In cluster workers, listens for and responds to requests for metrics by the
* cluster master.
Expand Down Expand Up @@ -34,8 +34,8 @@ class AggregatorRegistry extends Registry {
}

/**
* Gets aggregated metrics for all workers. The optional callback and
* returned Promise resolve with the same value; either may be used.
* Gets aggregated metrics for master and all workers. The optional callback
* and returned Promise resolve with the same value; either may be used.
* @return {Promise<string>} Promise that resolves with the aggregated
* metrics.
*/
Expand All @@ -62,6 +62,27 @@ class AggregatorRegistry extends Registry {
};
requests.set(requestId, request);

// Get metrics from master
if (registries && registries.length > 0) {
Promise.all(registries.map(r => r.getMetricsAsJSON()))
.then(metrics => {
processMetricsResponse({
type: GET_METRICS_RES,
requestId,
metrics,
});
})
.catch(error => {
processMetricsResponse({
type: GET_METRICS_RES,
requestId,
error: error.message,
});
});
request.pending++;
}

// Get metrics from workers
const message = {
type: GET_METRICS_REQ,
requestId,
Expand All @@ -78,6 +99,7 @@ class AggregatorRegistry extends Registry {

if (request.pending === 0) {
// No workers were up
requests.delete(requestId);
clearTimeout(request.errorTimeout);
process.nextTick(() => done(null, ''));
}
Expand Down Expand Up @@ -145,6 +167,34 @@ class AggregatorRegistry extends Registry {
}
}

/**
* Adds metrics from master and worker to request and finalizes request when
* all metrics are collected.
* @param {object} message - GET_METRICS_RES message object containing metrics
* @return {void}
*/
function processMetricsResponse(message) {
const request = requests.get(message.requestId);

if (message.error) {
request.done(new Error(message.error));
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics().then(metrics => metrics.trim());
request.done(null, promString);
}
}

/**
* Adds event listeners for cluster aggregation. Idempotent (safe to call more
* than once).
Expand All @@ -158,25 +208,7 @@ function addListeners() {
// Listen for worker responses to requests for local metrics
cluster().on('message', (worker, message) => {
if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);

if (message.error) {
request.done(new Error(message.error));
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
processMetricsResponse(message);
}
});
}
Expand Down

0 comments on commit d5310ac

Please sign in to comment.