Fluid Forge
Get Started
See it run
  • Local (DuckDB)
  • Source-Aligned (Postgres → DuckDB)
  • AI Forge + Data Models
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
  • 11-Stage Production Pipeline
  • Catalog Forge End-to-End
CLI Reference
  • Overview
  • Quickstart
  • Examples
  • Your own CI
  • Your own scaffolding
  • Custom validator
  • Apply hook
  • Reference
Demos
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
Get Started
See it run
  • Local (DuckDB)
  • Source-Aligned (Postgres → DuckDB)
  • AI Forge + Data Models
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
  • 11-Stage Production Pipeline
  • Catalog Forge End-to-End
CLI Reference
  • Overview
  • Quickstart
  • Examples
  • Your own CI
  • Your own scaffolding
  • Custom validator
  • Apply hook
  • Reference
Demos
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
  • Introduction

    • Home
    • Getting Started
    • Snowflake Quickstart
    • See it run
    • Forge Data Model
    • Vision & Roadmap
    • Playground
    • FAQ
  • Concepts

    • Concepts
    • Builds, Exposes, Bindings
    • What is a contract?
    • Quality, SLAs & Lineage
    • Governance & Policy
    • Agent Policy (LLM/AI governance)
    • Providers vs Platforms
    • Fluid Forge vs alternatives
  • Data Products

    • Product Types — SDP, ADP, CDP
  • Walkthroughs

    • Walkthrough: Local Development
    • Source-Aligned: Postgres → DuckDB → Parquet
    • AI Forge And Data-Model Journeys
    • Walkthrough: Deploy to Google Cloud Platform
    • Walkthrough: Snowflake Team Collaboration
    • Declarative Airflow DAG Generation - The FLUID Way
    • Generating Orchestration Code from Contracts
    • Jenkins CI/CD for FLUID Data Products
    • Universal Pipeline
    • The 11-Stage Pipeline
    • End-to-End Walkthrough: Catalog → Contract → Transformation
  • CLI Reference

    • CLI Reference
    • fluid init
    • fluid demo
    • fluid forge
    • fluid skills
    • fluid status
    • fluid validate
    • fluid plan
    • fluid apply
    • fluid generate
    • fluid generate artifacts
    • fluid validate-artifacts
    • fluid verify-signature
    • fluid generate-airflow
    • fluid generate-pipeline
    • fluid viz-graph
    • fluid odps
    • fluid odps-bitol
    • fluid odcs
    • fluid export
    • fluid export-opds
    • fluid publish
    • fluid datamesh-manager
    • fluid market
    • fluid import
    • fluid policy
    • fluid policy check
    • fluid policy compile
    • fluid policy apply
    • fluid contract-tests
    • fluid contract-validation
    • fluid diff
    • fluid test
    • fluid verify
    • fluid product-new
    • fluid product-add
    • fluid workspace
    • fluid ide
    • fluid ai
    • fluid memory
    • fluid mcp
    • fluid scaffold-ci
    • fluid scaffold-composer
    • fluid scaffold-ide
    • fluid docs
    • fluid config
    • fluid split
    • fluid bundle
    • fluid auth
    • fluid doctor
    • fluid providers
    • fluid provider-init
    • fluid roadmap
    • fluid version
    • fluid runs
    • fluid retention
    • fluid secrets
    • fluid stats
    • fluid contract
    • fluid ship
    • fluid rollback
    • fluid schedule-sync
    • Catalog adapters

      • Source Catalog Integration (V1.5)
      • BigQuery Catalog
      • Snowflake Horizon Catalog
      • Databricks Unity Catalog
      • Google Dataplex Catalog
      • AWS Glue Data Catalog
      • DataHub Catalog
      • Data Mesh Manager Catalog
    • CLI by task

      • CLI by task
      • Add quality rules
      • Add agent governance
      • Debug a failed pipeline run
      • Switch clouds with one line
  • Recipes

    • Recipes
    • Recipe — add a quality rule
    • Recipe — switch clouds with one line
    • Recipe — tag PII in your schema
  • SDK & Plugins

    • SDK & Plugins
    • Quickstart — your first plugin
    • Examples

      • Runnable examples
      • Example: hello-scaffold — the minimal viable plugin
      • Example: gitlab-ci-scaffold — generate a complete CI project
      • Example: steward-validator — a custom governance rule
      • Example: prod-key-guard — apply-time invariant check
    • Journeys

      • Journeys
      • Your own CI/CD

        • You have your own CI/CD setup, no problem
        • GitLab CI — the bundle template
        • GitHub Actions — the bundle template
        • Jenkins — the bundle template
        • CircleCI — the bundle template
      • You have a strict project layout, no problem
      • You have governance rules, no problem
      • You want a check at apply time, no problem
    • Reference

      • Reference
      • Roles reference
      • Entry points reference
      • Trust model
      • Packaging
      • Companion packages
  • Providers

    • Providers
    • Provider Architecture
    • GCP Provider
    • AWS Provider
    • Snowflake Provider
    • Local Provider
    • Creating Custom Providers
    • Provider Roadmap
  • Advanced

    • Blueprints
    • Governance & Compliance
    • Airflow Integration
    • Built-in And Custom Forge Guidance
    • FLUID Forge Contract GPT Packet
    • Forge Discovery Guide
    • Forge Memory Guide
    • LLM Providers
    • Capability Warnings
    • LiteLLM Backend (opt-in)
    • MCP Server
    • Credential Resolver — Security Model
    • Cost Tracking
    • Agentic Primitives
    • Typed Errors
    • Typed CLI Errors
    • Authoring Forge Tools
    • Source-Aligned Acquisition
    • API Stability — fluid_build.api
    • Guided fluid forge UX
    • V1.5 Catalog Integration — Architecture Deep-Dive
    • V1.5 + V2 Hardening — Release Notes
  • Project

    • Contributing to Fluid Forge
    • Fluid Forge Docs Baseline: CLI 0.8.3
    • Fluid Forge Docs Baseline: CLI 0.8.0
    • Fluid Forge Docs Baseline: CLI 0.7.11
    • Fluid Forge Docs Baseline: CLI 0.7.9
    • Fluid Forge v0.7.1 - Multi-Provider Export Release

Source-Aligned: Postgres → DuckDB → Parquet

A minimal end-to-end walkthrough of a source-aligned Bronze (SDP) data product. We'll start a local Postgres with seeded data, run fluid validate and fluid apply against the included contract, and verify the output Parquet file. No Airbyte cluster, no Airflow, no cloud setup — DuckDB does the ingestion in-process.

The 60-second reel above runs the exact flow this walkthrough documents: fluid init --discover postgres://… → fluid validate --probe → fluid apply → fluid runs status.

Where this walkthrough lives

The exact contract, docker-compose, seed SQL, Makefile, and verification script for this walkthrough live in the forge-cli repo at examples/source-aligned-postgres-duckdb/. Schema 0.7.3 is delivered in the upcoming 0.7.3 release — install the 0.7.3 prerelease (or wait for the stable cut) to follow along.

What you'll build

A single contract that:

  1. Reads public.orders from a local Postgres
  2. Lands the rows as out/orders.parquet
  3. Runs the dlp_scan and quality_gate pre-land hooks
  4. Persists a run record under .fluid/runs/<product>/<build>/runs/<run-id>.json

Total wall time on the included fixture: under 3 seconds.

Prerequisites

  • Docker (for the Postgres container)
  • make (for the Makefile shortcuts)
  • Fluid Forge 0.7.3 (the upcoming release; schema 0.7.3 — install the prerelease from PyPI or wait for the stable cut)

The contract

fluidVersion: "0.7.3"
kind: DataProduct
id: bronze.crm_orders
name: CRM Orders Bronze
description: |
  Source-aligned Bronze data product. Acquires raw orders from a Postgres
  source via DuckDB's postgres_scan and lands them as Parquet.
domain: sales

metadata:
  layer: Bronze
  productType: SDP        # Bronze ↔ SDP — both shown for clarity
  owner:
    team: data-platform
    email: data-platform@co.example
  classification: confidential
  experimental: [acquisition]

retention:
  runState: P30D
  runLogs: P90D
  lineage: P365D
  dlq: P180D

builds:
  - id: ingest_orders
    description: Full-refresh copy of public.orders from Postgres.
    pattern: acquisition
    engine: duckdb
    capabilities:
      - full_refresh
      - schema_discovery
    properties:
      source:
        kind: postgres
        connection:
          host: "{{ env.PGHOST }}"
          port: "{{ env.PGPORT }}"
          database: "{{ env.PGDATABASE }}"
          user: "{{ env.PGUSER }}"
          password: "{{ env.PGPASSWORD }}"
        mode: full_refresh
        streams:
          - public.orders
      sink:
        format: parquet
      preLand:
        - dlp_scan
        - quality_gate
      quality:
        gates:
          - rule: not_null
            columns: [id]
            severity: error
        onError: route_to_dlq
      lineage:
        emit: true
    execution:
      trigger:
        type: schedule
        schedule: "0 */4 * * *"
    outputs:
      - orders_raw

exposes:
  - exposeId: orders_raw
    kind: table
    binding:
      platform: local
      format: parquet
      location:
        path: ./out/orders.parquet
    contract:
      schema: []
      schemaPolicy: discover_and_freeze

A few things worth noting:

  • Both metadata.layer and metadata.productType are set. Either one alone would also validate. Bronze ↔ SDP is the canonical pairing — see Product Types for the full mapping.
  • retention: is at the top level, not inside the build. It governs how long Forge keeps run records, logs, lineage events, and DLQ entries — sweep with fluid retention sweep.
  • {{ env.PGHOST }} placeholders resolve from environment variables at apply time; the contract is safe to commit.
  • pattern: acquisition + engine: duckdb triggers the embedded DuckDB runner — no external service needed.

Run it end-to-end

The Makefile shipped with the example wraps the steps:

cd forge-cli/examples/source-aligned-postgres-duckdb
make all

make all runs:

make up        # docker compose up: Postgres with seeded public.orders
make run       # fluid validate → fluid apply
make verify    # python verify.py: row-count + schema assertions

If you'd rather run the steps by hand:

# 1. Bring up Postgres (port 5432) with seeded fixture data
docker compose up -d

# Set the env vars the contract reads
export PGHOST=localhost PGPORT=5432
export PGDATABASE=acme PGUSER=acme PGPASSWORD=acme

# 2. Validate the contract
fluid validate contract.fluid.yaml

# 3. Apply (acquires from Postgres, writes Parquet)
fluid apply --build ingest_orders contract.fluid.yaml

# 4. Verify the output
python verify.py

Expected validate output:

Validating 1 product in workspace 'CRM Orders Bronze'...
  ✅ bronze.crm_orders     no errors

Result: 1 passed, 0 failed

Expected apply output:

▸ Materializing bronze.crm_orders → ingest_orders ...
  ▸ acquired schema   public.orders (5 columns, 8 rows)
  ▸ ran preLand hooks dlp_scan ✓ · quality_gate ✓
  ▸ wrote Parquet     ./out/orders.parquet (1 file, 1.2 KB)
  ▸ persisted run     .fluid/runs/bronze.crm_orders/ingest_orders/runs/2026-04-30T...json
  ✓ 1 build applied · 0 errors

What just happened

StageWhat ranWhere it's wired
ValidationJSON-schema check against fluid-schema-0.7.3.json + Bronze↔SDP consistencyfluid validate
PlanThe acquisition pattern compiles to one runner: duckdb actionInternal — DuckDB runner picks this up
LockSingle-flight lock acquired on (bronze.crm_orders, ingest_orders, default)_state.FileStateStore
Source readDuckDB loads the postgres extension and runs SELECT * FROM postgres_scan(...)DuckDB runner under build_runners/duckdb/
Pre-land hooksdlp_scan then quality_gate run on the in-memory result before writebuild_runners/hooks/{dlp_scan,quality_gate}.py
WriteCOPY (...) TO 'out/orders.parquet' (FORMAT 'parquet')DuckDB runner
Run recordStructured JSON written under .fluid/runs/..._state.FileStateStore
LineageOpenLineage RunEvent emitted (null emitter by default — local dev)_lineage.py

Day-2 — what to do after first apply

The acquisition layer is fully integrated with the day-2 ops commands. Once you have a successful run:

# Recent runs for this product
fluid runs status bronze.crm_orders --last 5

# Full logs from the most recent run
fluid runs logs bronze.crm_orders --component build

# Compare two runs (schema + row-count delta)
fluid runs diff bronze.crm_orders \
  --build ingest_orders \
  --run-a <run-id-1> --run-b <run-id-2>

# Sweep retention horizons
fluid retention sweep

See fluid runs, fluid retention, and fluid secrets for the full operator reference.

Why this matters

This is the smallest possible source-aligned data product. With the same shape:

  • Swap engine: duckdb for engine: dlt to use a Python-native dlt source — no contract changes besides the engine block
  • Swap engine: duckdb for engine: airbyte and add an imageSignature.cosign block to require Cosign-verified Airbyte connector images
  • Swap engine: duckdb for engine: debezium for CDC instead of full-refresh (changes the mode: to one of initial/schema_only/never/when_needed/always)
  • Move deployment.mode: embedded to deployment.mode: managed with platform: kubernetes to have Forge generate Helm + ExternalSecret + NetworkPolicy for the engine

The contract stays portable across all six engines — see Source-Aligned Acquisition for the full engine matrix and deployment mode options.

See also

  • Product Types — SDP, ADP, CDP — the vocabulary used in this contract
  • Source-Aligned Acquisition — the framework reference
  • fluid init --discover — auto-generate this contract by introspecting the source
  • fluid runs, fluid retention, fluid secrets — day-2 ops
Edit this page on GitHub
Last Updated: 5/17/26, 6:51 PM
Contributors: fas89, Claude Opus 4.7 (1M context)
Prev
Walkthrough: Local Development
Next
AI Forge And Data-Model Journeys