Skip to content

Commit

Permalink
deployment disabled run attempt event
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanbhughes committed Aug 29, 2024
1 parent 8004e65 commit 219536d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
14 changes: 13 additions & 1 deletion src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
from prefect.server.events.clients import PrefectServerEventsClient
from prefect.server.exceptions import MissingVariableError, ObjectNotFoundError
from prefect.server.models.deployments import mark_deployments_ready
from prefect.server.models.events import deployment_status_event
from prefect.server.models.events import (
deployment_status_event,
disabled_deployment_run_attempt_event,
)
from prefect.server.models.workers import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.server.schemas.responses import DeploymentPaginationResponse
from prefect.server.utilities.server import PrefectRouter
Expand Down Expand Up @@ -654,6 +657,15 @@ async def create_flow_run_from_deployment(
)

if deployment.disabled:
async with PrefectServerEventsClient() as events:
await events.emit(
await disabled_deployment_run_attempt_event(
session=session,
deployment_id=deployment_id,
occurred=pendulum.now("UTC"),
)
)

raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Error creating flow run: Deployment is disabled.",
Expand Down
57 changes: 47 additions & 10 deletions src/prefect/server/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,9 @@ def _timing_is_tight(
return False


async def deployment_status_event(
session: AsyncSession,
deployment_id: UUID,
status: DeploymentStatus,
occurred: pendulum.DateTime,
) -> Event:
deployment = await models.deployments.read_deployment(
session=session, deployment_id=deployment_id
)
assert deployment
async def _deployment_related_resources(
deployment: ORMDeployment, session: AsyncSession
):
flow = await models.flows.read_flow(session=session, flow_id=deployment.flow_id)
work_queue = (
await models.workers.read_work_queue(
Expand Down Expand Up @@ -344,6 +337,24 @@ async def deployment_status_event(
}
)

return related_work_queue_and_pool_info


async def deployment_status_event(
session: AsyncSession,
deployment_id: UUID,
status: DeploymentStatus,
occurred: pendulum.DateTime,
) -> Event:
deployment = await models.deployments.read_deployment(
session=session, deployment_id=deployment_id
)
assert deployment

related_work_queue_and_pool_info = await _deployment_related_resources(
deployment, session
)

return Event(
occurred=occurred,
event=f"prefect.deployment.{status.in_kebab_case()}",
Expand Down Expand Up @@ -434,3 +445,29 @@ def _get_recent_preceding_work_pool_event_id(
if time_since_last_event < timedelta(minutes=10)
else None
)


async def disabled_deployment_run_attempt_event(
session: AsyncSession,
deployment_id: UUID,
occurred: pendulum.DateTime,
) -> Event:
deployment = await models.deployments.read_deployment(
session=session, deployment_id=deployment_id
)
assert deployment

related_work_queue_and_pool_info = await _deployment_related_resources(
deployment, session
)

return Event(
occurred=occurred,
event="prefect.deployment.disabled-run-attempt",
resource={
"prefect.resource.id": f"prefect.deployment.{deployment.id}",
"prefect.resource.name": f"{deployment.name}",
},
related=related_work_queue_and_pool_info,
id=uuid4(),
)

0 comments on commit 219536d

Please sign in to comment.