Concord design

Vendor sales daily sync

Pull yesterday's sales from a vendor REST API at 03:00 UTC, detect stale snapshots via content hash, and upsert into the Postgres warehouse partition — fully automated, idempotent end to end.

Triggerscheduled_trigger Planasync deterministic Cancellationgraceful Approvals0 Effects1
02

User journey

No interactive user. A scheduled trigger fires nightly; the data engineering team consumes the result by querying the warehouse the next morning and by inspecting the audit/event stream when something looks off.

Operator journey
flowchart LR Cron([03:00 UTC cron]) --> Cmd[sync_vendor_sales command] Cmd --> Run[Workflow runs
5-15 min] Run --> Outcome{Outcome} Outcome -->|fresh data| Warehouse[(Warehouse partition
yyyy-mm-dd upserted)] Outcome -->|stale snapshot| Skipped[Skip write
emit skipped event] Outcome -->|bad request| Failed[command.failed
page on-call] Warehouse --> Morning([DE team queries 09:00]) Skipped --> Morning Failed --> Pager([On-call pager]) style Outcome fill:#F5E0D2,stroke:#D97757 style Skipped fill:#F1F2EC,stroke:#6B7B5A style Warehouse fill:#F1F2EC,stroke:#6B7B5A
03

Trigger / ingress

Ingress type: scheduled_trigger. A cron schedule (0 3 * * * UTC) issues the command; the runtime adapter's schedule capability owns delivery.

Idempotency at ingress

Duplicate triggers must collapse to one command — defensive against double-fire of the scheduler.

04

Command(s)

Single command type. No fan-in, no dependencies on other commands.

command_type: sync_vendor_sales
requested_by: system:scheduler
ingress: scheduled_trigger
payload:
  vendor_name: "acme_sales"
  target_date: "2026-06-01"     # yesterday at fire time
  vendor_base_url: "https://api.acme.example/v1"
context:
  workspace_id: null            # single-tenant; see Constraints
  trace_id: <runtime-supplied>
cancellation_mode: graceful
idempotency_key: "sync_vendor_sales:acme_sales:2026-06-01"
dbos_workflow_id: <runtime-supplied>

Status path used: created → validated → queued → running → succeeded | failed | cancelled. compensated and expired are not used — there is nothing to compensate (idempotent upsert) and no expiry deadline.

05

Policy stack

The policy chain evaluated at validated:

permission
connector_scope
rate_limit
cost

No data_safety, external_sharing, destructive_action, memory_consent, agent_risk, or approval_requirement — none apply (no PII flagged, no human gate, no agent).

06

Execution plan

Async deterministic. Five steps, all connector_call or sync_function. No agent, no human task.

Execution plan
flowchart TB Start([scheduled_trigger fires
03:00 UTC]) --> Fetch["1 · fetch_vendor_snapshot
connector_call · connector_calls queue
retry on 5xx · 30s/2m/10m"] Fetch --> Hash["2 · compute_snapshot_hash
sync_function · SHA256 of latest_snapshot_id"] Hash --> LoadPrev["3 · load_last_hash
connector_call · postgres read"] LoadPrev --> Compare{"hash == previous?"} Compare -->|yes · stale| Skip["4a · record_skipped
sync_function · emit skipped event"] Compare -->|no · fresh| Write["4b · upsert_warehouse_partition
connector_call · idempotent upsert
connector_calls queue"] Write --> Record["5 · record_hash
connector_call · persist new hash"] Skip --> Done([command.succeeded]) Record --> Done style Compare fill:#F5E0D2,stroke:#D97757 style Skip fill:#F1F2EC,stroke:#6B7B5A style Write fill:#F1F2EC,stroke:#6B7B5A style Done fill:#F1F2EC,stroke:#6B7B5A

Step table

stepexecution_modequeueside effect?
fetch_vendor_snapshotconnector_callconnector_callsno (read)
compute_snapshot_hashsync_functionno
load_last_hashconnector_callconnector_callsno (read)
upsert_warehouse_partitionconnector_callconnector_callsyes
record_hashconnector_callconnector_callsincidental (state bookkeeping)

The single declared effect (warehouse_partition.write) is the upsert in step 4b. Step 5 stores the hash for next-day comparison; treated as bookkeeping, not a domain effect.

07

Effects table

effect_typeside_effect?idempotency_keycompensationcounter_pure?
warehouse_partition.write yes sync_vendor_sales:acme_sales:2026-06-01 none (idempotent upsert) n/a
Why no compensation

The warehouse write is an upsert keyed by (vendor_name, target_date). A retry or re-run overwrites the same partition row-for-row; partial failure leaves the partition either unchanged or in the latest good state. Nothing to reverse.

08

Memory

Skipped — workflow is stateless across runs. The previous-day snapshot hash needed for staleness detection is stored in an operational table (vendor_sync_state), not Concord Memory: it's not a learned preference or user-scoped fact, it's deterministic bookkeeping for an idempotent connector.

09

Artifacts

Skipped — no human-readable durable output. The warehouse partition itself is queried directly by downstream consumers; it's an effect, not an Artifact.

10

Approvals

Skipped — fully automated. No human gate anywhere in the plan; the user explicitly opted out of approvals.

11

Agent / swarm config

Skipped — plan is deterministic. No AgentRun or SwarmRun; no LLM in the loop.

12

Audit events

All rows land in the single append-only domain_events table. purpose distinguishes them.

command.created
command.validated
command.running
vendor.fetch.started
vendor.fetch.succeeded
vendor.fetch.retried
snapshot.hashed
snapshot.stale_detected
snapshot.fresh_detected
warehouse.upsert.started
warehouse.upsert.succeeded
command.succeeded
command.failed
command.cancelled

Retention: 90 days for purpose=audit. purpose=event rows used for operational metrics may be aged out sooner via policy.

13

Cancellation model

cancellation_mode: graceful. Transitions: running → cancelling → cancelled. Each step's prelude checks the cancellation flag and bails before doing connector work.

Cancellation cascade
flowchart LR CancelReq([cancel signal]) --> Flag["set cancellation flag
command.status = cancelling"] Flag --> Probe{"current step?"} Probe -->|fetch in flight| Finish["let fetch finish
discard result"] Probe -->|between steps| Exit["exit cleanly
no upsert"] Probe -->|upsert in flight| Complete["let upsert finish
partition is idempotent"] Finish --> Cancelled([command.cancelled]) Exit --> Cancelled Complete --> Cancelled style Probe fill:#F5E0D2,stroke:#D97757
Rule

Graceful is safe here because the only effect is an idempotent upsert. If an upsert is mid-flight when cancel arrives, completing it leaves the partition in a valid state — letting it finish is cheaper than tearing down.

14

Compensation graph

Skipped — no declared compensation. The single effect (warehouse_partition.write) is an idempotent upsert keyed by (vendor_name, target_date); re-running the workflow overwrites in place. requires_compensation=false at registration. The compensation graph validator will accept this only because declared_compensation is intentionally None with the idempotent-upsert justification recorded.

15

Runtime config

Adapter

DBOS. Required capabilities: DURABLE_WORKFLOWS, DURABLE_STEPS, QUEUES, SCHEDULES. Not required: SUBWORKFLOWS, SIGNALS (cancel uses the flag pattern, not a runtime signal), SAGA_COMPENSATION_NATIVE, WORKFLOW_VERSIONING.

Queues

connector_calls
scheduled_maintenance

All connector calls (vendor fetch, warehouse upsert, hash read/write) ride connector_calls. The cron itself dispatches via the runtime's schedule capability — no app-level queue needed for the trigger.

Retry policy

Wired per-operation through compile_policy(operation) -> dict for the runtime adapter. Error classes split into retryable and non-retryable:

transient_connector_errorretryable
rate_limitedretryable
timeoutretryable
validation_errorno retry
permanent_connector_errorno retry
policy_deniedno retry
permission_deniedno retry

For fetch_vendor_snapshot:

RetryPolicy(
  operation="fetch_vendor_snapshot",
  retryable=["transient_connector_error", "rate_limited", "timeout"],
  max_attempts=4,                       # initial + 3 retries
  backoff_seconds=[30, 120, 600],       # 30s · 2min · 10min
  requires_idempotency_key=False,       # GET is naturally idempotent
)

Vendor 4xx bad-request responses map to validation_error (or permanent_connector_error for 401/403/404) — non-retryable, fail fast, route to command.failed.

Schedule

Cron 0 3 * * * UTC, single instance lock (idempotency_key blocks double-fire even if the scheduler misbehaves).

16

Test plan

Unit (functional core)

Integration (Postgres + DBOS adapter)

Workflow (end-to-end)

Safety

Boundary discipline

Acceptance criterion

For 30 consecutive UTC days, at 03:00 UTC ± 5 min the workflow produces either (a) a warehouse partition matching the vendor's reported daily totals within tolerance, or (b) a snapshot.stale_detected audit row with no warehouse write. Zero duplicate writes; zero unretried 5xx that should have retried; zero retries on 4xx.

17

Open questions / risks