Pull yesterday's sales from a vendor REST API at 03:00 UTC, detect stale snapshots via content hash, and upsert into the Postgres warehouse partition — fully automated, idempotent end to end.
No interactive user. A scheduled trigger fires nightly; the data engineering team consumes the result by querying the warehouse the next morning and by inspecting the audit/event stream when something looks off.
Ingress type: scheduled_trigger. A cron schedule (0 3 * * * UTC) issues the command; the runtime adapter's schedule capability owns delivery.
Duplicate triggers must collapse to one command — defensive against double-fire of the scheduler.
idempotency_key template: sync_vendor_sales:{vendor_name}:{target_date}target_date = the business day being pulled (UTC, the day before the cron fires)commands.idempotency_key guarantees second-fire becomes a no-opSingle command type. No fan-in, no dependencies on other commands.
command_type: sync_vendor_sales
requested_by: system:scheduler
ingress: scheduled_trigger
payload:
vendor_name: "acme_sales"
target_date: "2026-06-01" # yesterday at fire time
vendor_base_url: "https://api.acme.example/v1"
context:
workspace_id: null # single-tenant; see Constraints
trace_id: <runtime-supplied>
cancellation_mode: graceful
idempotency_key: "sync_vendor_sales:acme_sales:2026-06-01"
dbos_workflow_id: <runtime-supplied>
Status path used: created → validated → queued → running → succeeded | failed | cancelled. compensated and expired are not used — there is nothing to compensate (idempotent upsert) and no expiry deadline.
The policy chain evaluated at validated:
permission — the scheduler service principal must hold warehouse.write on the target partition and connector.acme_sales.read.connector_scope — restricts the vendor connector to GET /sales/daily and the Postgres connector to upserts on warehouse.sales_daily.rate_limit — bucket cap on vendor calls (handful per day, but caps protect against retry storms).cost — soft cap on retry budget; if exhausted, escalate rather than burn budget.No data_safety, external_sharing, destructive_action, memory_consent, agent_risk, or approval_requirement — none apply (no PII flagged, no human gate, no agent).
Async deterministic. Five steps, all connector_call or sync_function. No agent, no human task.
| step | execution_mode | queue | side effect? |
|---|---|---|---|
| fetch_vendor_snapshot | connector_call | connector_calls | no (read) |
| compute_snapshot_hash | sync_function | — | no |
| load_last_hash | connector_call | connector_calls | no (read) |
| upsert_warehouse_partition | connector_call | connector_calls | yes |
| record_hash | connector_call | connector_calls | incidental (state bookkeeping) |
The single declared effect (warehouse_partition.write) is the upsert in step 4b. Step 5 stores the hash for next-day comparison; treated as bookkeeping, not a domain effect.
| effect_type | side_effect? | idempotency_key | compensation | counter_pure? |
|---|---|---|---|---|
warehouse_partition.write |
yes | sync_vendor_sales:acme_sales:2026-06-01 |
none (idempotent upsert) | n/a |
The warehouse write is an upsert keyed by (vendor_name, target_date). A retry or re-run overwrites the same partition row-for-row; partial failure leaves the partition either unchanged or in the latest good state. Nothing to reverse.
Skipped — workflow is stateless across runs. The previous-day snapshot hash needed for staleness detection is stored in an operational table (vendor_sync_state), not Concord Memory: it's not a learned preference or user-scoped fact, it's deterministic bookkeeping for an idempotent connector.
Skipped — no human-readable durable output. The warehouse partition itself is queried directly by downstream consumers; it's an effect, not an Artifact.
Skipped — fully automated. No human gate anywhere in the plan; the user explicitly opted out of approvals.
Skipped — plan is deterministic. No AgentRun or SwarmRun; no LLM in the loop.
All rows land in the single append-only domain_events table. purpose distinguishes them.
Retention: 90 days for purpose=audit. purpose=event rows used for operational metrics may be aged out sooner via policy.
cancellation_mode: graceful. Transitions: running → cancelling → cancelled. Each step's prelude checks the cancellation flag and bails before doing connector work.
Graceful is safe here because the only effect is an idempotent upsert. If an upsert is mid-flight when cancel arrives, completing it leaves the partition in a valid state — letting it finish is cheaper than tearing down.
Skipped — no declared compensation. The single effect (warehouse_partition.write) is an idempotent upsert keyed by (vendor_name, target_date); re-running the workflow overwrites in place. requires_compensation=false at registration. The compensation graph validator will accept this only because declared_compensation is intentionally None with the idempotent-upsert justification recorded.
DBOS. Required capabilities: DURABLE_WORKFLOWS, DURABLE_STEPS, QUEUES, SCHEDULES. Not required: SUBWORKFLOWS, SIGNALS (cancel uses the flag pattern, not a runtime signal), SAGA_COMPENSATION_NATIVE, WORKFLOW_VERSIONING.
All connector calls (vendor fetch, warehouse upsert, hash read/write) ride connector_calls. The cron itself dispatches via the runtime's schedule capability — no app-level queue needed for the trigger.
Wired per-operation through compile_policy(operation) -> dict for the runtime adapter. Error classes split into retryable and non-retryable:
For fetch_vendor_snapshot:
RetryPolicy(
operation="fetch_vendor_snapshot",
retryable=["transient_connector_error", "rate_limited", "timeout"],
max_attempts=4, # initial + 3 retries
backoff_seconds=[30, 120, 600], # 30s · 2min · 10min
requires_idempotency_key=False, # GET is naturally idempotent
)
Vendor 4xx bad-request responses map to validation_error (or permanent_connector_error for 401/403/404) — non-retryable, fail fast, route to command.failed.
Cron 0 3 * * * UTC, single instance lock (idempotency_key blocks double-fire even if the scheduler misbehaves).
compute_snapshot_hash — SHA256 over latest_snapshot_id stable across whitespace / key order.is_stale(current_hash, last_hash) — true iff bytes equal; treats None last-hash as "fresh".transient_connector_error; 400/422 → validation_error; 401/403/404 → permanent_connector_error; 429 → rate_limited.RetryPolicy compilation: backoff list yields [30, 120, 600] in order.planned → executing → succeeded rows inserted for the upsert.event, audit, agent_step rows land in domain_events with the correct purpose.command.succeeded → warehouse row visible.snapshot.stale_detected emitted → command.succeeded.command.failed with validation_error.command.cancelled.connector_scope policy denies.max_attempts → no further retries, command.failed, cost policy emits cost-cap audit row.concord_boundary_check.py rejects any import dbos outside the runtime adapter module.For 30 consecutive UTC days, at 03:00 UTC ± 5 min the workflow produces either (a) a warehouse partition matching the vendor's reported daily totals within tolerance, or (b) a snapshot.stale_detected audit row with no warehouse write. Zero duplicate writes; zero unretried 5xx that should have retried; zero retries on 4xx.
latest_snapshot_id alone trusts the vendor to bump that field. If the vendor occasionally bumps the id without changing the payload (or vice versa), we either skip a real update or write a no-op. Consider hashing a content-derived field too (e.g. row count + total revenue) as a second signal.target_date (same idempotency_key shape).workspace_id: null). If multi-tenancy lands, idempotency_key must include workspace and the partition key needs a tenant dimension.permanent_connector_error (401/403) and pages on-call — verify the alerting wiring.