-
Notifications
You must be signed in to change notification settings - Fork 3
Add additional info to alerts | Batch DB writes #489
Conversation
export let prismaActive = prisma; | ||
|
||
export const setPrisma = (newPrisma: any) => { | ||
prismaActive = newPrisma; | ||
}; | ||
|
||
type DbRecord = { | ||
model: 'deviationValue' | 'rPCFailures' | 'compoundValues' | 'dataFeedApiValue' | 'gatewayFailures'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if there's a better way of typing this or handling this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of anything that would be more elegant than this either (reminds me of the old telemetry 😄)
|
||
const addRecord = (record: DbRecord) => { | ||
// eslint-disable-next-line functional/immutable-data | ||
recordsToInsert.push(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We batch pending DB inserts now, which speeds things up and reduces DB load.
await Promise.allSettled( | ||
Object.entries(bufferedRecords).map(([model, data]) => | ||
// @ts-ignore | ||
prismaActive[model].createMany({ data: data.map((item) => item.record) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this can be done better...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this solution 😄, anything else would be a much longer utility function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the createMany's are quite fast so this could even be made sequential if we find DB load to bee still too much later on 🤷
).filter((result) => result.status === 'rejected'); | ||
|
||
if (results.length > 0) { | ||
logger.error(`DB Writer error: ${results}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we drop a sample it's not a big deal and if we drop everything, well, bigger issue that will be obvious.
if (!nodaryResponse.success) { | ||
await limitedSendToOpsGenieLowLevel( | ||
limitedSendToOpsGenieLowLevel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes we are limited by OG's rate limiting, in those case we don't want it to slow down the entire app. This was causing samples to be dropped.
@@ -189,6 +231,19 @@ export const recordRpcProviderResponseSuccess = async (contract: Api3ServerV1, s | |||
} | |||
}; | |||
|
|||
export const getBaseUrl = (fullUrl: string) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the base URL for (1) finding gateways and (2) hashing the gateway URL so it can be looked up later by Mertcan as part of his fault tracing.
{ | ||
message: `Dead gateway for Airnode Address ${airnodeAddress}`, | ||
priority: 'P3', | ||
alias: `dead-gateway-${airnodeAddress}${generateOpsGenieAlias(gatewayUrl)}`, | ||
alias: `dead-gateway-${airnodeAddress}${generateOpsGenieAlias(baseUrl)}`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're only interested in the base URL, not the URL+template ID (which is what we used before).
@@ -46,5 +46,5 @@ export const DIRECT_GATEWAY_MIN_TIME_DEFAULT_MS = 20; | |||
export const DIRECT_GATEWAY_MAX_CONCURRENCY_DEFAULT = 10; | |||
|
|||
// TODO: load these 2 from env var instead | |||
export const DATAFEED_READ_BATCH_SIZE = 100; | |||
export const DATAFEED_UPDATE_BATCH_SIZE = 10; | |||
export const DATAFEED_READ_BATCH_SIZE = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading more than 1 has been unreliable on Kava - and I'm tired of troubleshooting this app - if you can see why it's failing, LMK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow! This would be a problem for the main airseeker as well, right?
@@ -92,7 +78,7 @@ export const fetchBeaconData = async (beaconId: string) => { | |||
} | |||
|
|||
const goRes = await go(fetchFn, { | |||
retries: 2, | |||
retries: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need/want it to retry because if it fails we'll try the whole thing again very soon.
@@ -16,6 +16,10 @@ export const handleStopSignal = (signal: string) => { | |||
expireLimiterJobs(); | |||
updateState((state) => ({ ...state, stopSignalReceived: true })); | |||
heartbeatReporter(getState().config); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very annoying when the app doesn't quit quickly in response to ctrl+c. This fixes that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I didn't see any issues 🚀
await Promise.allSettled( | ||
Object.entries(bufferedRecords).map(([model, data]) => | ||
// @ts-ignore | ||
prismaActive[model].createMany({ data: data.map((item) => item.record) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this solution 😄, anything else would be a much longer utility function
await Promise.allSettled( | ||
Object.entries(bufferedRecords).map(([model, data]) => | ||
// @ts-ignore | ||
prismaActive[model].createMany({ data: data.map((item) => item.record) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the createMany's are quite fast so this could even be made sequential if we find DB load to bee still too much later on 🤷
export let prismaActive = prisma; | ||
|
||
export const setPrisma = (newPrisma: any) => { | ||
prismaActive = newPrisma; | ||
}; | ||
|
||
type DbRecord = { | ||
model: 'deviationValue' | 'rPCFailures' | 'compoundValues' | 'dataFeedApiValue' | 'gatewayFailures'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of anything that would be more elegant than this either (reminds me of the old telemetry 😄)
@@ -46,5 +46,5 @@ export const DIRECT_GATEWAY_MIN_TIME_DEFAULT_MS = 20; | |||
export const DIRECT_GATEWAY_MAX_CONCURRENCY_DEFAULT = 10; | |||
|
|||
// TODO: load these 2 from env var instead | |||
export const DATAFEED_READ_BATCH_SIZE = 100; | |||
export const DATAFEED_UPDATE_BATCH_SIZE = 10; | |||
export const DATAFEED_READ_BATCH_SIZE = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow! This would be a problem for the main airseeker as well, right?
This PR is currently being tested live.
This PR does two main things:
Closes https://github.com/api3dao/tasks/issues/219
Closes https://github.com/api3dao/tasks/issues/199