Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cluster metrics when AggregatorReg isn't created on workers #464

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ project adheres to [Semantic Versioning](http://semver.org/).

### Breaking

- When using the cluster module, you must now call
`promclient.setupClusterWorker()` from each cluster worker.

Long explanation: v13.2.0 introduced a change from v13.1.0 that broke cluster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like we should revert that change, release a patch, then a new major with that and this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I can release a v13.2.1 with it reverted.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like v13.2.1 was never released so would it be possible to get this MR merged? The cleaner API is nice.

metrics if an `AggregatorRegistry` was not instantiated on each cluster
worker. The example in `examples/cluster.js` shows instantiation of an
`AggregatorRegistry` in the workers, so users following that example were not
affected by this change. However, the example was not written as intended:
`new AggregatorRegistry()` should only be called on the cluster master.
`examples/cluster.js` has been updated to show the full, correct usage.

### Changed

### Added
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ register
});
```

Note that you must call `client.setupClusterWorker()` in each worker.

### Pushgateway

It is possible to push metrics via a
Expand Down
12 changes: 9 additions & 3 deletions example/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

const cluster = require('cluster');
const express = require('express');
const metricsServer = express();
const AggregatorRegistry = require('../').AggregatorRegistry;
const aggregatorRegistry = new AggregatorRegistry();
const prometheus = require('../');

if (cluster.isMaster) {
// Instantiate an AggregatorRegistry in the cluster master.
const aggregatorRegistry = new prometheus.AggregatorRegistry();

for (let i = 0; i < 4; i++) {
cluster.fork();
}

const metricsServer = express();
metricsServer.get('/cluster_metrics', async (req, res) => {
try {
// Aggregate metrics across all workers.
const metrics = await aggregatorRegistry.clusterMetrics();
res.set('Content-Type', aggregatorRegistry.contentType);
res.send(metrics);
Expand All @@ -27,5 +30,8 @@ if (cluster.isMaster) {
'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics',
);
} else {
// Set up the cluster worker.
prometheus.setupClusterWorker();
// Register metrics as usual.
require('./server.js');
}
6 changes: 6 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ export class AggregatorRegistry extends Registry {
static setRegistries(regs: Array<Registry> | Registry): void;
}

/**
* Sets up the cluster worker for cluster aggregation. Idempotent (safe to call
* more than once).
*/
export function setupClusterWorker(): void;

/**
* General metric type
*/
Expand Down
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ exports.exponentialBuckets = require('./lib/bucketGenerators').exponentialBucket
exports.collectDefaultMetrics = require('./lib/defaultMetrics');

exports.aggregators = require('./lib/metricAggregators').aggregators;
exports.AggregatorRegistry = require('./lib/cluster');
exports.AggregatorRegistry = require('./lib/cluster').AggregatorRegistry;
exports.setupClusterWorker = require('./lib/cluster').setupClusterWorker;
114 changes: 62 additions & 52 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ const GET_METRICS_RES = 'prom-client:getMetricsRes';

let registries = [Registry.globalRegistry];
let requestCtr = 0; // Concurrency control
let listenersAdded = false;
let masterListenersAdded = false;
let workerListenersAdded = false;
const requests = new Map(); // Pending requests for workers' local metrics.

class AggregatorRegistry extends Registry {
constructor() {
super();
addListeners();
setupMasterListeners();
}

/**
Expand Down Expand Up @@ -146,63 +147,72 @@ class AggregatorRegistry extends Registry {
}

/**
* Adds event listeners for cluster aggregation. Idempotent (safe to call more
* than once).
* Adds the cluster master's event listeners for cluster aggregation. Idempotent
* (safe to call more than once).
* @return {void}
*/
function addListeners() {
if (listenersAdded) return;
listenersAdded = true;

if (cluster().isMaster) {
// Listen for worker responses to requests for local metrics
cluster().on('message', (worker, message) => {
if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);

if (message.error) {
request.done(new Error(message.error));
return;
}
function setupMasterListeners() {
if (masterListenersAdded) return;
masterListenersAdded = true;

// Listen for worker responses to requests for local metrics
cluster().on('message', (worker, message) => {
if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);

if (message.error) {
request.done(new Error(message.error));
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;
message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);
if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
});
}
}
});
}

if (cluster().isWorker) {
// Respond to master's requests for worker's local metrics.
process.on('message', message => {
if (message.type === GET_METRICS_REQ) {
Promise.all(registries.map(r => r.getMetricsAsJSON()))
.then(metrics => {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
metrics,
});
})
.catch(error => {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
error: error.message,
});
/**
* Adds the cluster worker's event listeners for cluster aggregation. Idempotent
* (safe to call more than once).
* @return {void}
*/
function setupClusterWorker() {
if (workerListenersAdded) return;
workerListenersAdded = true;

// Respond to master's requests for worker's local metrics.
process.on('message', message => {
if (message.type === GET_METRICS_REQ) {
Promise.all(registries.map(r => r.getMetricsAsJSON()))
.then(metrics => {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
metrics,
});
}
});
}
})
.catch(error => {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
error: error.message,
});
});
}
});
}

module.exports = AggregatorRegistry;
module.exports = {
AggregatorRegistry,
setupClusterWorker,
};
6 changes: 3 additions & 3 deletions test/clusterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ describe('AggregatorRegistry', () => {

describe('aggregatorRegistry.clusterMetrics()', () => {
it('works properly if there are no cluster workers', async () => {
const AggregatorRegistry = require('../lib/cluster');
const { AggregatorRegistry } = require('../lib/cluster');
const ar = new AggregatorRegistry();
const metrics = await ar.clusterMetrics();
expect(metrics).toEqual('');
});
});

describe('AggregatorRegistry.aggregate()', () => {
const Registry = require('../lib/cluster');
const { AggregatorRegistry } = require('../lib/cluster');
// These mimic the output of `getMetricsAsJSON`.
const metricsArr1 = [
{
Expand Down Expand Up @@ -159,7 +159,7 @@ describe('AggregatorRegistry', () => {
},
];

const aggregated = Registry.aggregate([metricsArr1, metricsArr2]);
const aggregated = AggregatorRegistry.aggregate([metricsArr1, metricsArr2]);

it('defaults to summation, preserves histogram bins', async () => {
const histogram = aggregated.getSingleMetric('test_histogram').get();
Expand Down