Fluid Forge
Get Started
See it run
  • Local (DuckDB)
  • Source-Aligned (Postgres → DuckDB)
  • AI Forge + Data Models
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
  • 11-Stage Production Pipeline
  • Catalog Forge End-to-End
CLI Reference
  • Overview
  • Quickstart
  • Examples
  • Your own CI
  • Your own scaffolding
  • Custom validator
  • Apply hook
  • Reference
Demos
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
Get Started
See it run
  • Local (DuckDB)
  • Source-Aligned (Postgres → DuckDB)
  • AI Forge + Data Models
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
  • 11-Stage Production Pipeline
  • Catalog Forge End-to-End
CLI Reference
  • Overview
  • Quickstart
  • Examples
  • Your own CI
  • Your own scaffolding
  • Custom validator
  • Apply hook
  • Reference
Demos
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
  • Introduction

    • Home
    • Getting Started
    • Snowflake Quickstart
    • See it run
    • Forge Data Model
    • Vision & Roadmap
    • Playground
    • FAQ
  • Concepts

    • Concepts
    • Builds, Exposes, Bindings
    • What is a contract?
    • Quality, SLAs & Lineage
    • Governance & Policy
    • Agent Policy (LLM/AI governance)
    • Providers vs Platforms
    • Fluid Forge vs alternatives
  • Data Products

    • Product Types — SDP, ADP, CDP
  • Walkthroughs

    • Walkthrough: Local Development
    • Source-Aligned: Postgres → DuckDB → Parquet
    • AI Forge And Data-Model Journeys
    • Walkthrough: Deploy to Google Cloud Platform
    • Walkthrough: Snowflake Team Collaboration
    • Declarative Airflow DAG Generation - The FLUID Way
    • Generating Orchestration Code from Contracts
    • Jenkins CI/CD for FLUID Data Products
    • Universal Pipeline
    • The 11-Stage Pipeline
    • End-to-End Walkthrough: Catalog → Contract → Transformation
  • CLI Reference

    • CLI Reference
    • fluid init
    • fluid demo
    • fluid forge
    • fluid skills
    • fluid status
    • fluid validate
    • fluid plan
    • fluid apply
    • fluid generate
    • fluid generate artifacts
    • fluid validate-artifacts
    • fluid verify-signature
    • fluid generate-airflow
    • fluid generate-pipeline
    • fluid viz-graph
    • fluid odps
    • fluid odps-bitol
    • fluid odcs
    • fluid export
    • fluid export-opds
    • fluid publish
    • fluid datamesh-manager
    • fluid market
    • fluid import
    • fluid policy
    • fluid policy check
    • fluid policy compile
    • fluid policy apply
    • fluid contract-tests
    • fluid contract-validation
    • fluid diff
    • fluid test
    • fluid verify
    • fluid product-new
    • fluid product-add
    • fluid workspace
    • fluid ide
    • fluid ai
    • fluid memory
    • fluid mcp
    • fluid scaffold-ci
    • fluid scaffold-composer
    • fluid scaffold-ide
    • fluid docs
    • fluid config
    • fluid split
    • fluid bundle
    • fluid auth
    • fluid doctor
    • fluid providers
    • fluid provider-init
    • fluid roadmap
    • fluid version
    • fluid runs
    • fluid retention
    • fluid secrets
    • fluid stats
    • fluid contract
    • fluid ship
    • fluid rollback
    • fluid schedule-sync
    • Catalog adapters

      • Source Catalog Integration (V1.5)
      • BigQuery Catalog
      • Snowflake Horizon Catalog
      • Databricks Unity Catalog
      • Google Dataplex Catalog
      • AWS Glue Data Catalog
      • DataHub Catalog
      • Data Mesh Manager Catalog
    • CLI by task

      • CLI by task
      • Add quality rules
      • Add agent governance
      • Debug a failed pipeline run
      • Switch clouds with one line
  • Recipes

    • Recipes
    • Recipe — add a quality rule
    • Recipe — switch clouds with one line
    • Recipe — tag PII in your schema
  • SDK & Plugins

    • SDK & Plugins
    • Quickstart — your first plugin
    • Examples

      • Runnable examples
      • Example: hello-scaffold — the minimal viable plugin
      • Example: gitlab-ci-scaffold — generate a complete CI project
      • Example: steward-validator — a custom governance rule
      • Example: prod-key-guard — apply-time invariant check
    • Journeys

      • Journeys
      • Your own CI/CD

        • You have your own CI/CD setup, no problem
        • GitLab CI — the bundle template
        • GitHub Actions — the bundle template
        • Jenkins — the bundle template
        • CircleCI — the bundle template
      • You have a strict project layout, no problem
      • You have governance rules, no problem
      • You want a check at apply time, no problem
    • Reference

      • Reference
      • Roles reference
      • Entry points reference
      • Trust model
      • Packaging
      • Companion packages
  • Providers

    • Providers
    • Provider Architecture
    • GCP Provider
    • AWS Provider
    • Snowflake Provider
    • Local Provider
    • Creating Custom Providers
    • Provider Roadmap
  • Advanced

    • Blueprints
    • Governance & Compliance
    • Airflow Integration
    • Built-in And Custom Forge Guidance
    • FLUID Forge Contract GPT Packet
    • Forge Discovery Guide
    • Forge Memory Guide
    • LLM Providers
    • Capability Warnings
    • LiteLLM Backend (opt-in)
    • MCP Server
    • Credential Resolver — Security Model
    • Cost Tracking
    • Agentic Primitives
    • Typed Errors
    • Typed CLI Errors
    • Authoring Forge Tools
    • Source-Aligned Acquisition
    • API Stability — fluid_build.api
    • Guided fluid forge UX
    • V1.5 Catalog Integration — Architecture Deep-Dive
    • V1.5 + V2 Hardening — Release Notes
  • Project

    • Contributing to Fluid Forge
    • Fluid Forge Docs Baseline: CLI 0.8.3
    • Fluid Forge Docs Baseline: CLI 0.8.0
    • Fluid Forge Docs Baseline: CLI 0.7.11
    • Fluid Forge Docs Baseline: CLI 0.7.9
    • Fluid Forge v0.7.1 - Multi-Provider Export Release

Agentic Primitives

Forge-cli's staged pipeline is built from a small set of reusable primitives that any new agent or external observer can compose. This page is the operator + contributor reference for each one.

Audience: Contributors building a new staged agent, operators debugging unexpected pipeline behavior, integrators wiring an external observability dashboard.

The seven primitives

PrimitiveModulePurpose
StageSessionfluid_build.copilot.agents.basePer-run shared state passed to every agent
Scratchpadfluid_build.copilot.scratchpadTyped inter-agent shared state (critic findings, RAG retrievals, stage feedback)
EventBusfluid_build.copilot.eventsIn-process pub/sub for run-level signals
RunCostTrackerfluid_build.copilot.costToken / cost / per-agent attribution
CriticAgentfluid_build.copilot.agents.critic_agentHeuristic reviewer between staged outputs
ConformanceAgentfluid_build.copilot.agents.conformance_agentPre-emit lint against Fluid + OSI + ODCS / DCS readiness
retrieve_similar_modelsfluid_build.copilot.retrievalRAG helper against memory/semantic

StageSession

The session object every agent receives. Carries the store, LLM config, capability matrix, scratchpad, and metadata flags (fallback_used, repair_used, tiered, no_cache).

from fluid_build.copilot.agents.base import StageSession
from fluid_build.copilot.store.backends.file import FileBackend

session = StageSession(
    store=FileBackend(root="~/.fluid/store", workspace_root="."),
    llm_config=...,
    active_provider="anthropic",
)
# Lazy scratchpad accessor — never construct directly.
scratchpad = session.get_scratchpad()

Key invariants:

  • active_provider is set automatically from llm_config.provider on construction. The coordinator asserts every staged call's resolved provider matches active_provider — single-provider-per-run.
  • capability_matrix is a free-form dict that flows through the cache key. Flipping any flag (extended thinking, RAG limit, prompt-cache mode) invalidates the cache cleanly.
  • record_fallback() / record_repair() capture pipeline events for the audit trail without coupling agents to the recorder.

Scratchpad

Typed shared state for inter-agent signal passing. The agentic backbone for v1.5+. Lives on session.scratchpad; lazy-created on first session.get_scratchpad() call.

Slots

SlotTypeUsed by
critic_findingslist[CriticFinding]CriticAgent writes; modeler / builder read
retrievalslist[RetrievalResult]retrieve_similar_models writes; modeler reads
feedbacklist[StageFeedback]Coordinator validator writes; rerun path reads
rawdict[str, Any]Free-form ad-hoc slots

Reading

from fluid_build.copilot.scratchpad import CriticFinding, StageFeedback

# Critic findings addressed to a specific stage:
findings = scratchpad.critic_findings_for_stage("logical")
errors = [f for f in findings if f.severity == "error"]

# Validator feedback for the next builder run:
feedback = scratchpad.feedback_for_stage("builder")

# Top-3 retrievals from memory/semantic (sorted by similarity):
top = scratchpad.top_retrievals(limit=3, namespace="memory/semantic")

Writing

scratchpad.add_critic_finding(CriticFinding(
    stage="logical",
    severity="warning",
    message="hub_customer has no business_key_columns",
    suggestion="set business_key_columns to ['customer_id']",
    target="dv2.hubs.hub_customer.business_key_columns",
))

scratchpad.add_feedback(StageFeedback(
    source_stage="validator",
    target_stage="builder",
    summary="3 errors in metadata; retry with corrections",
    structured={"errors": ["domain missing", ...]},
))

Thread safety

add_* calls take an internal lock so parallel-physical fanout (three threads writing concurrently) doesn't drop events. Reads return defensive snapshots — caller mutations don't pollute the scratchpad.

EventBus

Process-wide pub/sub for run-level signals. Decomposes the previously god-class RunCostTracker into a publisher and N subscribers; new observers (telemetry exporters, audit writers, operator dashboards) subscribe without touching the tracker's internals.

Event types emitted today

EventPayloadEmitter
llm.call_completedprovider, model, input_tokens, output_tokens, stage, agent_class, missing_usageRunCostTracker.record_call
llm.usage_missing{}RunCostTracker.record_missing_usage
validator.variant_lintvariant, warning_countRunCostTracker.record_variant_lint
catalog.fetch_completedcatalog_name, duration_msRunCostTracker.record_catalog_fetch

Subscribing

from fluid_build.copilot.events import Event, get_event_bus

bus = get_event_bus()

def my_observer(event: Event) -> None:
    if event.event_type == "llm.call_completed":
        print(f"{event.payload['stage']} cost: "
              f"{event.payload['input_tokens']} + "
              f"{event.payload['output_tokens']} tokens")

unsubscribe = bus.subscribe(my_observer)
# ... when done:
unsubscribe()

Failure isolation

A subscriber that raises is logged at DEBUG and otherwise ignored — one bad observer can never break the rest of the pipeline.

RunCostTracker

Per-run token / cost accumulator. Module-level singleton because it has to be writable from threads (parallel-physical fanout) without threading a context object through the entire stack.

Five state dimensions

DimensionSurfaced via
Per-(provider, model) token countsbreakdown.rows
Per-(stage, agent_class) attributionbreakdown.agent_rows (Missing #5)
missing_usage_calls counterFooter warning
variant_lint_findings per variantFooter warning
catalog_fetch_ms per catalogFooter line

Cost ceiling

Set a per-run budget cap:

export FLUID_COST_LIMIT_USD=5.00
fluid forge data-model from-source --source snowflake --credential-id snowflake-prod ...

OR via ~/.fluid/config.yaml:

behavior:
  cost_limit_usd_per_run: 5.00

When the running total exceeds the limit, the forge aborts with CostLimitExceeded($X.XX > $Y.YY). Defends against runaway agentic runs from blowing through your budget.

Token-budget pre-flight & compaction

Before every staged LLM call, the agent layer counts the system + user prompt against the model's context window and refuses prompts that won't fit. If you submit something already too big, the legacy path would eat a 4xx + a useless retry storm; pre-flight raises ContextOverflowError instead, which the typed-error retry envelope treats as non-retryable so it surfaces immediately.

The token counter is the pure-Python len(text) / 3.5 heuristic — slightly over-estimating so the bias is "fail fast", not "bill the user for a doomed call". The CLI does NOT ship a Rust-extension tokenizer; modern context windows are 128K–1M tokens and the ~10–20% heuristic error is far below the precision required to make a different decision.

When the multi-turn agent loop runs longer than FLUID_AGENT_COMPACT_AFTER iterations (default 6), middle messages get compacted to keep total tokens under the budget. Three strategies:

StrategySet viaBehaviour
truncate (default)FLUID_COMPACTION_STRATEGY=truncate (or unset)Char-aware truncation that preserves head + last-N tail messages, clips middle text content, and shrinks tool_use argument blobs into a structured _truncated/_preview/_total_chars marker. Tool names stay visible so the LLM still knows what it called.
summarizeFLUID_COMPACTION_STRATEGY=summarizeLLM-backed compression. Calls your provider's fast tier (Haiku for Anthropic, gpt-4.1-nano for OpenAI, gemini-2.5-flash for Gemini) once per compaction trigger via LlmProvider.build_request + httpx. No new SDK dependencies — reuses the same HTTP shape, headers, and credential resolution as the main run.
hybridFLUID_COMPACTION_STRATEGY=hybridTruncate first, then summarize the truncated middle if a summarizer is configured. Useful for very long agent loops where naive truncation alone discards too much context.

The summarizer is wired into run_copilot_agent_loop automatically when you enable it — no extra setup. Worst-case extra cost is one fast-tier call per long agent loop. Synthetic transcripts compress ~5 messages of tool-call/result bytes into a 3–5-sentence English summary that preserves the user's goal, the tools called, and the agent's current decision state.

The compaction layer is provider-agnostic — you can plug your own Callable[[str], str] summarizer if the default fast-tier route isn't what you want.

CriticAgent

Heuristic (LLM-free) reviewer between staged outputs. Three review surfaces:

MethodReviewsSample findings
review_logical(logical, scratchpad=...)DV2 hubs / links + dimensional facts / dimensions + conceptual orphans"hub_X has no business_key_columns" / "fact has no measures"
review_contract(contract, scratchpad=...)Fluid contract dict"exposes is empty" / "metadata.domain missing"
review_transform(transform_plan, logical, scratchpad=...)dbt build-graph cycles"circular dependency: a → b → c → a"

Findings land on scratchpad.critic_findings. Severity levels:

  • error — critical issue. Triggers the repair loop via _escalate_critic_errors_into_report.
  • warning — informational; surfaces in the receipt but doesn't block.
  • info — observational; helps operator triage.

Adding a new critic rule

# In your fork or a future PR to fluid_build/copilot/agents/critic_agent.py:

def review_logical(self, logical, *, scratchpad):
    findings = []
    # ... existing rules ...
    # NEW: warn when a hub's mapped_source_tables is empty.
    for hub in (getattr(logical.dv2, "hubs", []) or []):
        if not hub.mapped_source_tables:
            findings.append(CriticFinding(
                stage="logical",
                severity="warning",
                message=f"Hub {hub.entity_name!r} has no mapped_source_tables",
                target=f"dv2.hubs.{hub.entity_name}.mapped_source_tables",
            ))
    for f in findings:
        scratchpad.add_critic_finding(f)
    return findings

The coordinator's _run_logical_critic already calls review_logical after every Logical stage — your new rule fires automatically.

ConformanceAgent

Pre-emit lint against four standards in parallel:

StandardImplementation
fluidFull Fluid 0.7.2 schema validator
osiFull OSI v0.1.1 Pydantic validator
odcs_translation_readinessChecks the contract carries the fields a future ODCS exporter needs
dcs_translation_readinessSame shape, for DCS
from fluid_build.copilot.agents.conformance_agent import ConformanceAgent

agent = ConformanceAgent()
report = agent.run(logical=logical, contract=contract)
print(report.summary())
# "conformance: ✓ all 2 standards clean"

Dialect mapper integration

ConformanceAgent.apply_dialect_mapper(logical) runs the deterministic multi-dialect type mapper over OSI expression.dialects[] arrays and back-fills missing dialects. Defaults to OSI_SUPPORTED_DIALECTS so back-fill never produces values that fail OSI's Literal enum.

retrieve_similar_models (RAG helper)

Single canonical entry point for memory/semantic retrieval:

from fluid_build.copilot.retrieval import (
    retrieve_similar_models,
    RetrievalConfig,
)

results = retrieve_similar_models(
    "customer orders data vault",
    store=session.store,
    scratchpad=session.get_scratchpad(),
    config=RetrievalConfig(limit=3, mode="hybrid"),
)

Best-effort:

  • NullBackend returns empty.
  • Vector index errors return empty.
  • Empty query short-circuits to empty without hitting the store.

The modeler's private _retrieve_prior_similar_models delegates here so there's one canonical retrieval code path.

Unified config

Replace four scattered files with one ~/.fluid/config.yaml:

schema_version: 1
llm:
  provider: anthropic
  model: claude-sonnet-4-6
  tiered: false
sources:
  sources:
    snowflake-prod:
      catalog: snowflake
      auth_method: key_pair
      account: myorg-abc12345
prices:
  prices:
    claude-sonnet-4-6: [2.40, 12.00]
behavior:
  quiet: false
  deterministic: false
  cost_limit_usd_per_run: 5.00

Migrate from existing per-feature files:

fluid config migrate            # if shipped; otherwise:
python -c "from fluid_build.copilot.unified_config import migrate_legacy_to_unified; print(migrate_legacy_to_unified())"

Per-feature legacy files (ai_config.json, sources.yaml, prices.json) continue to be read as fallback so v1.5 installs work unchanged.

Composing them: the staged pipeline

┌─ StageSession (per-run shared state)
│   ├─ store: Store backend
│   ├─ llm_config + active_provider
│   ├─ scratchpad: Scratchpad ───── ← typed inter-agent state
│   └─ capability_matrix
│
├─ EventBus (process-wide pub/sub)
│   ├─ llm.call_completed         ← RunCostTracker
│   ├─ catalog.fetch_completed    ← LogicalAgent (catalog path)
│   ├─ validator.variant_lint     ← FluidContractValidator
│   └─ llm.usage_missing          ← BaseStageAgent
│
└─ Pipeline order:
   LogicalAgent ──→ CriticAgent.review_logical ──→ scratchpad
       │
       ├──→ retrieve_similar_models ──→ scratchpad.retrievals
       │
   BuilderAgent (reads scratchpad → provenance)
       │
   _run_pre_emit_conformance ──→ ConformanceAgent.run
       │                          + apply_dialect_mapper
   ValidatorAgent
       │
   _escalate_critic_errors_into_report (C8)
       │
   _maybe_repair_physical (rerun stages with feedback)
       │
   _record_forge_episode ──→ memory/episodic
   write_semantic_record ──→ memory/semantic

See also

  • V1.5 architecture deep-dive
  • Cost tracking details
  • Credential resolver
  • V1.5 release notes
Edit this page on GitHub
Last Updated: 5/17/26, 6:51 PM
Contributors: fas89, Claude Opus 4.7, Claude Opus 4.7 (1M context)
Prev
Cost Tracking
Next
Typed Errors