Skip to content

Typed Outcomes

Typed Step Outcomes: Outcome-First Architecture (FSD-008)

This document describes the outcome-first architecture and the remaining compatibility behavior at the core boundary.

Current State (Stabilized)

  • Backend/frame path is outcome-first: components exchange StepOutcome[StepResult] (Success, Failure, Paused, Chunk).
  • Legacy path remains StepResult-based for backward compatibility; the core unwraps outcomes for legacy callers.
  • Policies are native outcomes: Agent, Simple, Loop, Parallel, Conditional, Cache.
  • No outcomes adapters are used; ExecutorCore routes directly to policies.

Legacy Compatibility Boundary

Legacy callers that invoke execute(step, data, ...) (non-frame) receive StepResult. The core unwraps StepOutcome to StepResult uniformly. Backend/frame calls receive StepOutcome directly.

Writing Policies (Outcome-Only)

  • Implement execute(...) -> Awaitable[StepOutcome[StepResult]].
  • Build StepResult internally, then return Success(step_result=...) or Failure(..., step_result=...).
  • Do not raise PausedException; return Paused(message=...) instead.

Calling Policies

  • Backend path (frame) receives StepOutcome directly.
  • Legacy path is supported via core unwrapping; policy signatures are outcome-only.

Runner and Streaming

  • run_outcomes_async yields strictly StepOutcome values.
  • run_async remains legacy-compatible.

run_outcomes_async usage

from flujo.application.runner import Flujo
from flujo.domain.dsl.step import Step
from flujo.domain.dsl.pipeline import Pipeline
from flujo.domain.models import Success, Failure, Paused, Chunk

step = Step(name="echo", agent=MyAgent())
pipe = Pipeline.from_step(step)
runner = Flujo(pipe)

async for event in runner.run_outcomes_async("hi"):
    if isinstance(event, Chunk):
        handle_stream_chunk(event.data)
    elif isinstance(event, Success):
        print("final:", event.step_result.output)
    elif isinstance(event, Failure):
        log_error(event.feedback)
    elif isinstance(event, Paused):
        persist_for_hitl(event.message)

Policy contract

  • All policy execute(...) methods must return StepOutcome[StepResult].
  • Use to_outcome(sr) when normalizing a constructed StepResult.
  • Prefer returning Paused(message=...) over raising inside policies; raising is reserved at core/runner legacy boundaries.

Migration Notes

  • The system has completed migration to native-outcome policies. Any remaining hybrid handling exists only at the core boundary for legacy callers.
  • Deprecation warnings for legacy-only entry points can be enabled behind a flag in future releases.

Testing Expectations

  • Outcome-first paths covered in integration tests.
  • Legacy paths covered in regression tests to ensure no breakage during migration.

Typed Outcomes (FSD-008)

Flujo steps now support typed outcomes in the backend/runner path. Instead of returning raw StepResult directly, policies are adapted to return a StepOutcome[StepResult] on the ExecutionFrame path.

  • Success: Success(step_result=StepResult)
  • Failure: Failure(error=Exception, feedback=str | None, step_result=StepResult | None)
  • Paused: Paused(message=str) (control flow)

Key points: - Backward compatibility: Legacy callers continue to receive StepResult; the executor unwraps outcomes when not called with an ExecutionFrame. - Utilities: flujo/domain/outcomes.py provides to_outcome(sr) for normalizing legacy results inside policies when needed.

Which paths return typed outcomes? - Backend/runner calls use ExecutorCore.execute(frame: ExecutionFrame) → returns StepOutcome[StepResult]. - Legacy execute(step, data, ...) → returns StepResult (for tests and backward compatibility); the core unwraps.

Extending to new policies: 1. Implement the policy to return StepOutcome[StepResult]. 2. Build internal StepResult instances and wrap with Success or Failure. 3. Do not introduce adapters; route directly through ExecutorCore.