Fluid Forge
Get Started
See it run
  • Local (DuckDB)
  • Source-Aligned (Postgres → DuckDB)
  • AI Forge + Data Models
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
  • 11-Stage Production Pipeline
  • Catalog Forge End-to-End
CLI Reference
  • Overview
  • Quickstart
  • Examples
  • Your own CI
  • Your own scaffolding
  • Custom validator
  • Apply hook
  • Reference
Demos
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
Get Started
See it run
  • Local (DuckDB)
  • Source-Aligned (Postgres → DuckDB)
  • AI Forge + Data Models
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
  • 11-Stage Production Pipeline
  • Catalog Forge End-to-End
CLI Reference
  • Overview
  • Quickstart
  • Examples
  • Your own CI
  • Your own scaffolding
  • Custom validator
  • Apply hook
  • Reference
Demos
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
  • Introduction

    • Home
    • Getting Started
    • Snowflake Quickstart
    • See it run
    • Forge Data Model
    • Vision & Roadmap
    • Playground
    • FAQ
  • Concepts

    • Concepts
    • Builds, Exposes, Bindings
    • What is a contract?
    • Quality, SLAs & Lineage
    • Governance & Policy
    • Agent Policy (LLM/AI governance)
    • Providers vs Platforms
    • Fluid Forge vs alternatives
  • Data Products

    • Product Types — SDP, ADP, CDP
  • Walkthroughs

    • Walkthrough: Local Development
    • Source-Aligned: Postgres → DuckDB → Parquet
    • AI Forge And Data-Model Journeys
    • Walkthrough: Deploy to Google Cloud Platform
    • Walkthrough: Snowflake Team Collaboration
    • Declarative Airflow DAG Generation - The FLUID Way
    • Generating Orchestration Code from Contracts
    • Jenkins CI/CD for FLUID Data Products
    • Universal Pipeline
    • The 11-Stage Pipeline
    • End-to-End Walkthrough: Catalog → Contract → Transformation
  • CLI Reference

    • CLI Reference
    • fluid init
    • fluid demo
    • fluid forge
    • fluid skills
    • fluid status
    • fluid validate
    • fluid plan
    • fluid apply
    • fluid generate
    • fluid generate artifacts
    • fluid validate-artifacts
    • fluid verify-signature
    • fluid generate-airflow
    • fluid generate-pipeline
    • fluid viz-graph
    • fluid odps
    • fluid odps-bitol
    • fluid odcs
    • fluid export
    • fluid export-opds
    • fluid publish
    • fluid datamesh-manager
    • fluid market
    • fluid import
    • fluid policy
    • fluid policy check
    • fluid policy compile
    • fluid policy apply
    • fluid contract-tests
    • fluid contract-validation
    • fluid diff
    • fluid test
    • fluid verify
    • fluid product-new
    • fluid product-add
    • fluid workspace
    • fluid ide
    • fluid ai
    • fluid memory
    • fluid mcp
    • fluid scaffold-ci
    • fluid scaffold-composer
    • fluid scaffold-ide
    • fluid docs
    • fluid config
    • fluid split
    • fluid bundle
    • fluid auth
    • fluid doctor
    • fluid providers
    • fluid provider-init
    • fluid roadmap
    • fluid version
    • fluid runs
    • fluid retention
    • fluid secrets
    • fluid stats
    • fluid contract
    • fluid ship
    • fluid rollback
    • fluid schedule-sync
    • Catalog adapters

      • Source Catalog Integration (V1.5)
      • BigQuery Catalog
      • Snowflake Horizon Catalog
      • Databricks Unity Catalog
      • Google Dataplex Catalog
      • AWS Glue Data Catalog
      • DataHub Catalog
      • Data Mesh Manager Catalog
    • CLI by task

      • CLI by task
      • Add quality rules
      • Add agent governance
      • Debug a failed pipeline run
      • Switch clouds with one line
  • Recipes

    • Recipes
    • Recipe — add a quality rule
    • Recipe — switch clouds with one line
    • Recipe — tag PII in your schema
  • SDK & Plugins

    • SDK & Plugins
    • Quickstart — your first plugin
    • Examples

      • Runnable examples
      • Example: hello-scaffold — the minimal viable plugin
      • Example: gitlab-ci-scaffold — generate a complete CI project
      • Example: steward-validator — a custom governance rule
      • Example: prod-key-guard — apply-time invariant check
    • Journeys

      • Journeys
      • Your own CI/CD

        • You have your own CI/CD setup, no problem
        • GitLab CI — the bundle template
        • GitHub Actions — the bundle template
        • Jenkins — the bundle template
        • CircleCI — the bundle template
      • You have a strict project layout, no problem
      • You have governance rules, no problem
      • You want a check at apply time, no problem
    • Reference

      • Reference
      • Roles reference
      • Entry points reference
      • Trust model
      • Packaging
      • Companion packages
  • Providers

    • Providers
    • Provider Architecture
    • GCP Provider
    • AWS Provider
    • Snowflake Provider
    • Local Provider
    • Creating Custom Providers
    • Provider Roadmap
  • Advanced

    • Blueprints
    • Governance & Compliance
    • Airflow Integration
    • Built-in And Custom Forge Guidance
    • FLUID Forge Contract GPT Packet
    • Forge Discovery Guide
    • Forge Memory Guide
    • LLM Providers
    • Capability Warnings
    • LiteLLM Backend (opt-in)
    • MCP Server
    • Credential Resolver — Security Model
    • Cost Tracking
    • Agentic Primitives
    • Typed Errors
    • Typed CLI Errors
    • Authoring Forge Tools
    • Source-Aligned Acquisition
    • API Stability — fluid_build.api
    • Guided fluid forge UX
    • V1.5 Catalog Integration — Architecture Deep-Dive
    • V1.5 + V2 Hardening — Release Notes
  • Project

    • Contributing to Fluid Forge
    • Fluid Forge Docs Baseline: CLI 0.8.3
    • Fluid Forge Docs Baseline: CLI 0.8.0
    • Fluid Forge Docs Baseline: CLI 0.7.11
    • Fluid Forge Docs Baseline: CLI 0.7.9
    • Fluid Forge v0.7.1 - Multi-Provider Export Release

Source-Aligned Acquisition

Schema 0.7.3 introduces a first-class acquisition build pattern for source-aligned (Bronze / SDP) data products. Instead of writing imperative ingestion code or stitching together a 200-line Airflow DAG, you describe what to ingest and how to deliver it; Forge picks the right engine and runs it under a uniform protocol.

Six months of Airbyte cluster setup, or sixty seconds of fluid init --discover. The reel above shows the full flow; this page covers the framework underneath.

Where this fits

This page covers the framework that makes source-aligned ingestion declarative — engines, deployment modes, delivery guarantees, schema evolution, quality gates, and lineage emission. Pair it with Product Types (the SDP/ADP/CDP vocabulary) and the Postgres → DuckDB walkthrough (a worked example).

The acquisition build pattern

A source-aligned contract has one builds[] entry whose pattern: acquisition. Everything else hangs off the properties: block of that build:

fluidVersion: 0.7.3
kind: DataProduct
metadata:
  name: customer-orders-bronze
  productType: SDP        # or layer: Bronze — either is sufficient
  owner: { team: ingestion }

builds:
  - id: ingest_orders
    pattern: acquisition
    properties:
      source:              # WHERE to read from
        kind: postgres
        connection: { url: ${POSTGRES_URL} }
        tables: [public.orders, public.customers]
      sink:                # WHERE to land it
        format: parquet
        location: s3://acme-bronze/customer-orders/
      engine:              # WHICH engine runs the ingestion
        type: duckdb       # duckdb | dlt | meltano | airbyte | kafka-connect | debezium
      delivery:            # Delivery guarantee
        mode: at_least_once
      schemaEvolution:
        policy: discover_and_freeze
      quality:
        rules: [...]
        maxAllowedErrors: 100
      cost:
        budget: { monthly: 50 }
        onExceed: warn
      catalog:
        registrar: snowflake_horizon
      lineage:
        enabled: true
      preLand:
        hooks: [tokenize_pii, dlp_scan]
      deployment:
        mode: embedded

The schema constraints, sub-defs, and engine-specific properties are catalogued in fluid_build/schemas/fluid-schema-0.7.3.json under $defs/acquisitionPattern.

Six ingestion engines, one protocol

All six engines conform to the public Runner Protocol in fluid_build.api.runner (see API Stability). They share the same validate → plan → apply lifecycle, the same exit-code contract, and the same run-record JSON shape, so day-2 ops (fluid runs status, fluid runs logs, fluid runs diff) work identically across all of them.

EngineBest forShips with
duckdbZero-infra ingestion from CSV, Parquet, JSON, Postgres, MySQL, SQLite, HTTPEmbedded — no extra service to run
dltPython-native sources, custom @dlt.source modules, plus dlt verified sources (filesystem, sql_database)pip install 'data-product-forge[dlt]'
meltanoSinger-protocol ELT — one runner unlocks 600+ Singer tapspip install 'data-product-forge[meltano]'
airbyteAirbyte OSS / Cloud connectors with REST control + Cosign image signature verificationpip install 'data-product-forge[airbyte]'
kafka-connectStream ingestion via Kafka Connect — JDBC / S3 / Salesforce / Mongo sources, JDBC / S3 / Snowflake / Iceberg / BigQuery sinkspip install 'data-product-forge[kafka-connect]'
debeziumCDC from Postgres / MySQL / MongoDB / SQL Server / Oracle in Kafka Connect or Debezium Server modepip install 'data-product-forge[debezium]'

Pick the engine via properties.engine.type. The generated plan and apply behavior is the same shape; only the engine-specific config under properties.engine.properties differs.

Three deployment modes

properties.deployment.mode decides how the engine runs at apply time:

ModeRuns whereUse it when
embedded (default)Inside the fluid process — no extra serviceLocal dev, CI, simple Bronze ingestion. Lowest latency, no infra to maintain.
bring-your-ownExisting Airbyte / Meltano / Kafka Connect cluster you already operateYou already run one of these stacks and want Forge to drive it via REST.
managedForge generates infra (Docker Compose / Helm / OpenTofu) and provisions a managed runnerNew project, willing to let Forge own the engine lifecycle.

For managed, the platform sub-mode picks the artifact:

deployment:
  mode: managed
  managed:
    platform: docker      # docker | kubernetes | terraform

The infra layer is hyperscaler-agnostic — no boto3, google.cloud, or azure imports. kubernetes mode emits Helm with Flux-style HelmRelease CRs and ExternalSecret + NetworkPolicy resources; terraform mode emits OpenTofu modules. Sovereignty constraints (metadata.dataResidency) propagate into the values overlay automatically.

Delivery guarantees

delivery:
  mode: at_least_once     # at_most_once | at_least_once | exactly_once
  dlq:
    maxRecordsBeforeAbort: 1000
    alertOn: [schemaDrift, infraFailure]

Each engine declares which guarantees it supports via its capability set (RunnerCapability enum on the public API). At plan time, Forge negotiates: if the contract asks for exactly_once against a runner that only supports at_least_once, the validator raises a typed CapabilityMismatchError rather than silently degrading.

The DLQ block is honored uniformly across all engines: records that fail to land are written to the configured dead-letter location; if maxRecordsBeforeAbort is exceeded, the run aborts with a DLQOverflowError instead of silently dropping records or hanging.

Schema evolution

schemaEvolution:
  policy: discover_and_freeze    # strict | discover_and_freeze | evolve_safe | evolve_all
PolicyBehavior on a schema change in the source
strict (default)Run aborts with SchemaDriftError. The drift fingerprint is logged for review.
discover_and_freezeFirst-ever run discovers the schema and pins it; subsequent runs require strict matching.
evolve_safeAdditive changes (new nullable columns, widened types) are accepted; breaking changes (column drop, type narrow) abort.
evolve_allEvery schema change is accepted. Type narrows are handled by casting the affected values, and any rows that fail the cast are routed to the DLQ rather than aborting the run. The most permissive policy — use it only when downstream consumers tolerate schema churn.

Decisions are deterministic — the same input fingerprint always yields the same accept/abort decision, so CI replays are reproducible.

Quality gates

quality:
  rules:
    - column: order_id
      check: not_null
    - column: amount
      check: numeric_in_range
      args: { min: 0, max: 1_000_000 }
  maxAllowedErrors: 100
  anomaly:
    method: iqr            # ewma | iqr | exact
    onAnomaly: warn        # warn | abort

Quality runs before records land, via the pre-land hook chain (preLand.hooks). Built-in hooks ship in fluid_build/build_runners/hooks/:

  • quality_gate — Apply DQ rules, abort or warn per maxAllowedErrors
  • dlp_scan — Scan for PII / sensitive data
  • tokenize_pii — Tokenize PII columns before they hit the sink
  • emit_lineage_input — Emit OpenLineage RunEvent for the input read

Anomaly detection (quality.anomaly) compares this run's stats (row count, null rate, distinct values) against the historical baseline. Three methods: EWMA (smoothed), IQR (interquartile range), or exact (deterministic match). Useful for catching upstream regressions early.

Cost tracking + budget gates

cost:
  budget: { monthly: 100 }
  onExceed: fail           # fail | warn
  chargeback:
    label: ingestion-team

Every run records cost in the run-record JSON (fluid_build.api.cost.CostTracker). fluid stats (page) aggregates across runs by provider, type, or engine. The monthly budget is enforced at run start — if the projected total would exceed budget and onExceed: fail, the run aborts with BudgetExceededError.

Chargeback labels propagate into cloud tags and catalog metadata for finance teams to slice cost by team.

Lineage emission

lineage:
  enabled: true
  emitter:
    type: openlineage_http
    endpoint: https://marquez.example.com/api/v1

Every acquisition run emits an OpenLineage RunEvent covering inputs (sources read), outputs (sink written), and run state (start, complete, fail). Three built-in emitters live in fluid_build/build_runners/_lineage.py:

EmitterWhen to use
null (default)No emission. Useful for local dev when you don't run a Marquez.
bufferedCollects events in-memory; useful in tests or when you want to inspect via the run record.
httpPOSTs to an OpenLineage HTTP endpoint (Marquez, DataHub, OpenMetadata, etc.).

Out-of-tree emitters subclass LineageEmitter from fluid_build.api.lineage (public API).

Image signature verification (Cosign)

For airbyte and any other engine that runs container images, you can require Cosign signature verification:

imageSignature:
  required: true
  cosign:
    publicKey: ${COSIGN_PUBLIC_KEY}
  slsa:
    required: true

Five verification paths are supported and tested: signed / unsigned / wrong-key / SLSA-provenance-required-and-missing / SLSA-provenance-required-and-present. Failed verification aborts the run before any pull or apply happens.

Catalog registration

catalog:
  registrar: snowflake_horizon  # datahub | openmetadata | unity | glue | snowflake_horizon | datamesh_manager
  options: { ... }

After a successful apply, Forge registers the produced dataset with the configured catalog. Five built-in registrars cover the major catalogs:

RegistrarWireWhat it propagates
datahubGMS RESTGlossary terms, classifications, lineage
openmetadataOpenMetadata RESTTags, classifications, lineage
unityDatabricks RESTTable parameters, column tags
glueAWS Glue HTTP (no boto3 dep)Table parameters, column comments
snowflake_horizonSnowflake SQLObject tags, classifications
datamesh_managerData Mesh Manager RESTODPS-Bitol entries, ownership

Out-of-tree registrars implement the CatalogRegistrar Protocol from fluid_build.api.catalog.

Concurrency + state

concurrency:
  lock: { onContended: queue }     # queue | fail

Forge holds a single-flight lock per (product, build, env) triple. If a run is already in flight, onContended: queue waits for the lock; onContended: fail raises LockHeldError immediately. Useful in CI to prevent two GitHub Actions runs from clobbering each other.

The state store backing locks, watermarks, and run records is FileStateStore by default (under .fluid/runs/). Out-of-tree state stores (Redis, Postgres) implement the StateStore Protocol from fluid_build.api.state.

Top-level retention

The 0.7.3 contract gains a top-level retention: block that controls how long Forge keeps run records, logs, lineage events, and DLQ entries:

retention:
  runState: 90d
  runLogs: 30d
  lineage: 365d
  dlq: 14d

Sweep with fluid retention sweep. The sweeper deletes any record older than its respective horizon and emits a structured summary.

Authoring an acquisition contract

You don't have to write the YAML by hand. The flagship onboarding path for source-aligned ingestion is fluid init --discover:

fluid init --discover postgres://user:pass@host:5432/dbname
fluid init --discover mysql://user:pass@host:3306/dbname
fluid init --discover file:///path/to/csv-tree

Discover introspects the source — runs \dt on Postgres, SHOW TABLES on MySQL, walks the directory tree for filesystem sources — and emits a deterministic, valid 0.7.3 SDP contract per discovered stream. Secrets are auto-redacted to ${ENV_VAR} placeholders.

For migrating existing tooling, fluid import converts Meltano projects, Airbyte workspaces, dlt pipelines, or Singer tap configs into FLUID acquisition contracts.

See also

  • Product Types — SDP, ADP, CDP — the vocabulary that gates composition
  • Postgres → DuckDB walkthrough — end-to-end worked example
  • fluid init --discover — flagship onboarding for source-aligned ingestion
  • fluid import — Meltano / Airbyte / dlt / Singer importers
  • fluid runs, fluid retention, fluid secrets — day-2 ops for acquisition pipelines
  • API Stability — fluid_build.api v1.0 surface for out-of-tree runners and registrars
  • Typed CLI Errors — error catalog (CapabilityMismatchError, SchemaDriftError, BudgetExceededError, etc.)
Edit this page on GitHub
Last Updated: 5/17/26, 6:51 PM
Contributors: fas89, Claude Opus 4.7 (1M context)
Prev
Authoring Forge Tools
Next
API Stability — fluid_build.api