Skip to content

Commit

Permalink
setup prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mschneider committed Nov 30, 2024
1 parent 8035330 commit 24b9478
Show file tree
Hide file tree
Showing 4 changed files with 727 additions and 26 deletions.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,12 @@
"bs58": "^5.0.0",
"cross-fetch": "^3.1.5",
"dotenv": "^16.0.3",
"express": "^4.18.2",
"express-prom-bundle": "^8.0.0",
"fast-copy": "^3.0.1",
"lodash": "^4.17.21",
"node-kraken-api": "^2.2.2",
"prom-client": "^15.1.3",
"switchboard-anchor": "npm:@coral-xyz/[email protected]"
},
"resolutions": {
Expand Down
96 changes: 74 additions & 22 deletions ts/client/scripts/sb-on-demand-crank.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ import {
getOraclesForMangoGroup,
OraclesFromMangoGroupInterface,
} from './sb-on-demand-crank-utils';
import {
cycleDurationHistogram,
exposePromMetrics,
fetchIxDurationHistogram,
fetchIxFailureCounter,
fetchIxSuccessCounter,
refreshFailureCounter,
refreshSuccessCounter,
relativeSlotsSinceLastUpdateHistogram,
relativeVarianceSinceLastUpdateHistogram,
sendTxCounter,
sendTxErrorCounter,
updateBlockhashSlotDurationHistogram,
updateOracleAiDurationHistogram,
} from './sb-on-demand-metrics';

const CLUSTER: Cluster =
(process.env.CLUSTER_OVERRIDE as Cluster) || 'mainnet-beta';
Expand Down Expand Up @@ -107,14 +122,17 @@ async function setupBackgroundRefresh(
const refreshGroup = async function (): Promise<void> {
try {
await group.reloadAll(client);
refreshSuccessCounter.labels({ label: 'group.reloadAll' }).inc(1);
result.oracles = await prepareCandidateOracles(
client,
group,
sbOnDemandProgram,
crossbarClient,
);
} catch (e) {
console.error('[group]', e);
refreshSuccessCounter.labels({ label: 'prepareCandidateOracles' }).inc(1);
} catch (err) {
console.error('[refresh]', err);
refreshFailureCounter.label({ err }).inc(1);
}
setTimeout(refreshGroup, GROUP_REFRESH_INTERVAL);
};
Expand Down Expand Up @@ -150,19 +168,28 @@ async function setupBackgroundRefresh(
client.connection.getSlot('processed'),
]);

// refresh oracle accounts to know when each oracle was late updated
refreshSuccessCounter.labels({ label: 'getLatestBlockhash+getSlot' }).inc(1);
const blockhashSlotUpdateAt = Date.now();
updateBlockhashSlotDurationHistogram.observe(
blockhashSlotUpdateAt - startedAt,
);

// refresh oracle accounts to know when each oracle was last updated
// updates oracle in place
await updateFilteredOraclesAis(client, sbOnDemandProgram, oracles);

refreshSuccessCounter.labels({ label: 'updateFilteredOraclesAis' }).inc(1);
const aisUpdatedAt = Date.now();
updateOracleAiDurationHistogram.observe(
aisUpdatedAt - blockhashSlotUpdateAt,
);

const staleOracles = await filterForStaleOracles(
oracles,
client,
group,
slot,
);

refreshSuccessCounter.labels({ label: 'filterForStaleOracles' }).inc(1);
const staleFilteredAt = Date.now();

const crossBarSims = await Promise.all(
Expand All @@ -172,12 +199,15 @@ async function setupBackgroundRefresh(
]),
),
);

refreshSuccessCounter.labels({ label: 'simulateFeeds' }).inc(1);
const simulatedAt = Date.now();
fetchIxDurationHistogram.observe(simulatedAt - staleFilteredAt);

const varianceThresholdCrossedOracles =
await filterForVarianceThresholdOracles(oracles, client, crossBarSims);

refreshSuccessCounter.labels({ label: 'filterForVarianceThresholdOracles' }).inc(
1,
);
const varianceFilteredAt = Date.now();

const oraclesToCrank: OracleMetaInterface[] = uniqWith(
Expand All @@ -203,7 +233,8 @@ async function setupBackgroundRefresh(

const numSignatures = 2;
try {
// TODO: don't ignore LUTS
// TODO: don't ignore LUTS
// TODO: investigate if more data can be prefetced (as was before)
const [pullIx, _luts] = await PullFeed.fetchUpdateManyIx(
sbOnDemandProgram as any,
{
Expand All @@ -213,8 +244,11 @@ async function setupBackgroundRefresh(
payer: user.publicKey,
},
);

for (const oracle of oracleChunk) {
fetchIxSuccessCounter.labels({ oracle: oracle.oracle.name }).inc(1);
}
const ixPreparedAt = Date.now();
fetchIxDurationHistogram.observe(ixPreparedAt - simulatedAt);

const lamportsPerCu_ = Math.min(
Math.max(lamportsPerCu ?? 150_000, 150_000),
Expand Down Expand Up @@ -267,15 +301,20 @@ async function setupBackgroundRefresh(
},
callbacks: {
afterEveryTxSend: function (data) {
sendTxCounter.inc(1);
const sentAt = Date.now();
const total = (sentAt - startedAt) / 1000;
const aiUpdate = (aisUpdatedAt - startedAt) / 1000;
cycleDurationHistogram.observe(sentAt - startedAt);
const blockhashSlotUpdate =
(blockhashSlotUpdateAt - startedAt) / 1000;
const aiUpdate = (aisUpdatedAt - blockhashSlotUpdateAt) / 1000;
const staleFilter = (staleFilteredAt - aisUpdatedAt) / 1000;
const simulate = (simulatedAt - staleFilteredAt) / 1000;
const varianceFilter =
(varianceFilteredAt - simulatedAt) / 1000;
const ixPrepare = (ixPreparedAt - varianceFilteredAt) / 1000;
const timing = {
blockhashSlotUpdate,
aiUpdate,
staleFilter,
simulate,
Expand All @@ -287,35 +326,42 @@ async function setupBackgroundRefresh(
`[tx send] https://solscan.io/tx/${data['txid']}, in ${total}s, lamportsPerCu_ ${lamportsPerCu_}, lamportsPerCu ${lamportsPerCu}, timiming ${JSON.stringify(timing)}`,
);
},
onError: function (e, notProcessedTransactions) {
onError: function (err, notProcessedTransactions) {
sendTxErrorCounter.labels(err).inc(1);
console.error(
`[tx send] ${notProcessedTransactions.length} error(s) after ${(Date.now() - ixPreparedAt) / 1000}s ${JSON.stringify(e)}`,
`[tx send] ${notProcessedTransactions.length} error(s) after ${(Date.now() - ixPreparedAt) / 1000}s ${JSON.stringify(err)}`,
);
},
},
}).catch((reason) =>
}).catch((reason) => {
sendTxErrorCounter
.labels({ err: `prom rejected: ${JSON.stringify(reason)}` })
.inc(1);
console.error(
`[tx send] promise rejected after ${(Date.now() - ixPreparedAt) / 1000}s ${JSON.stringify(reason)}`,
),
);
} catch (e) {
);
});
} catch (err) {
console.error(
`[ix fetch] error after ${(Date.now() - varianceFilteredAt) / 1000}s ${JSON.stringify(e)}`,
`[ix fetch] error after ${(Date.now() - varianceFilteredAt) / 1000}s ${JSON.stringify(err)}`,
);
fetchIxFailureCounter.labels({ err }).inc(1);
}
});

await new Promise((r) => setTimeout(r, SLEEP_MS));
} catch (error) {
console.error('[main]', error);
} catch (err) {
console.error('[main]', err);
refreshFailureCounter.labels({ err }).inc(1);
}
}
})();
exposePromMetrics(Number(process.env.PORT!), process.env.BIND);

/**
* prepares the instruction to update an individual oracle using the cached data on oracle
*/
async function preparePullIx(
async function _preparePullIx(
sbOnDemandProgram,
oracle: OracleMetaInterface,
recentSlothashes?: Array<[BN, string]>,
Expand Down Expand Up @@ -377,6 +423,9 @@ async function filterForVarianceThresholdOracles(

const changePct = (Math.abs(res.price - simPrice) * 100) / res.price;
const thresholdPct = VARIANCE_THRESHOLD_PCT_BY_TIER[item.oracle.tier];
relativeVarianceSinceLastUpdateHistogram
.labels({ oracle: item.oracle.name })
.observe(changePct / thresholdPct);
if (changePct > thresholdPct) {
console.log(
`[filter variance] ${item.oracle.name}, candidate: ${thresholdPct} < ${changePct}, ${simPrice}, ${res.price}`,
Expand Down Expand Up @@ -412,9 +461,12 @@ async function filterForStaleOracles(
const safetySeconds = 20 * 3 + 10;
const safetySlots = safetySeconds * 2.5;
const slotsUntilUpdate = item.decodedPullFeed.maxStaleness - safetySlots;
relativeSlotsSinceLastUpdateHistogram
.labels({ oracle: item.oracle.name })
.observe(slotsSinceLastUpdate / slotsUntilUpdate);
if (slotsSinceLastUpdate > slotsUntilUpdate) {
console.log(
`[filter stale] ${item.oracle.name}, candidate, ${item.decodedPullFeed.maxStaleness}, ${lastProcessedSlot}, ${res.lastUpdatedSlot}, ${slotsSinceLastUpdate}`,
`[filter stale] ${item.oracle.name}, candidate, ${slotsSinceLastUpdate} > ${slotsUntilUpdate}, ${lastProcessedSlot}`,
);

// check if oracle is fallback and primary is not stale
Expand All @@ -436,7 +488,7 @@ async function filterForStaleOracles(
staleOracles.push(item);
} else {
console.log(
`[filter stale] ${item.oracle.name}, non-candidate, ${item.decodedPullFeed.maxStaleness}, ${lastProcessedSlot}, ${res.lastUpdatedSlot}, ${slotsSinceLastUpdate}`,
`[filter stale] ${item.oracle.name}, non-candidate, ${slotsSinceLastUpdate} < ${slotsUntilUpdate}, ${lastProcessedSlot}`
);
}
}
Expand Down
104 changes: 104 additions & 0 deletions ts/client/scripts/sb-on-demand-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import express from "express";
import * as prom from "prom-client";
import promBundle from "express-prom-bundle";

const collectDefaultMetrics = prom.collectDefaultMetrics;
collectDefaultMetrics({
labels: {
app: process.env.FLY_APP_NAME,
instance: process.env.FLY_ALLOC_ID,
},
});

export const refreshSuccessCounter = new prom.Counter({
name: "refresh_success_count",
help: "number of successful refreshes",
labelNames: ["label"],
});

export const refreshFailureCounter = new prom.Counter({
name: "refresher_failure_count",
help: "number of failed refreshes",
labelNames: ["err"],
});

export const relativeSlotsSinceLastUpdateHistogram = new prom.Histogram({
name: "relative_slots_since_last_update",
help: "distribution of the relative slot since last update during filterForStaleOracles",
labelNames: ["oracle"],
buckets: [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.75, 2.0, 3.0, 4.0, 5.0],
});

export const relativeVarianceSinceLastUpdateHistogram = new prom.Histogram({
name: "relative_variance_since_last_update",
help: "distribution of the relative variance since last update during filterForVarianceThresholdOracles",
labelNames: ["oracle"],
buckets: [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.75, 2.0, 3.0, 4.0, 5.0],
});

export const fetchIxSuccessCounter = new prom.Counter({
name: "fetch_ix_success_count",
help: "number of successful oracle instructions fetched",
labelNames: ["oracle"],
});

export const fetchIxFailureCounter = new prom.Counter({
name: "fetch_ix_failure_count",
help: "number of failed ix fetches",
labelNames: ["err"],
});

export const fetchIxDurationHistogram = new prom.Histogram({
name: "fetch_ix_duration_histogram",
help: "duration of fetchIx calls in ms",
buckets: [100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 25_600, 51_200, 102_400],
});


export const simulateFeedDurationHistogram = new prom.Histogram({
name: "simulate_feed_duration_histogram",
help: "duration of simulateFeeds calls in ms",
buckets: [100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 25_600, 51_200, 102_400],
});

export const updateOracleAiDurationHistogram = new prom.Histogram({
name: "update_oracle_ai_duration_histogram",
help: "duration of updateFilteredOraclesAis calls in ms",
buckets: [100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 25_600, 51_200, 102_400],
});

export const updateBlockhashSlotDurationHistogram = new prom.Histogram({
name: "update_blockhash_slot_duration_histogram",
help: "duration of getLatestBlockhash+getSlot calls in ms",
buckets: [100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 25_600, 51_200, 102_400],
});

export const cycleDurationHistogram = new prom.Histogram({
name: "cycle_duration_histogram",
help: "duration of full update cycle in ms",
buckets: [100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 25_600, 51_200, 102_400],
});

export const sendTxCounter = new prom.Counter({
name: "send_tx_counter",
help: "number of oracle update transactions sent",
});

export const sendTxErrorCounter = new prom.Counter({
name: "send_tx_error_counter",
help: "number of failed oracle update transactions",
labelNames: ["err"],
});


export const metricsMiddleware = promBundle({ includeMethod: true });


export function exposePromMetrics(port: number, bind: string | undefined): void {
const app = express();
app.use(metricsMiddleware);
app.listen(port, bind, () => {
console.log(`prom metrics exposed on ${bind}:${port}`);
});
}

Loading

0 comments on commit 24b9478

Please sign in to comment.