Defining Pipelines
A pipeline consists of steps and the transitions between them. Ductwork provides a simple DSL for defining these steps and transitions in a clear, readable way.
Creating a Pipeline Class
Pipeline classes must live under app/pipelines to be discovered correctly. While class names don’t require a “Pipeline” suffix, adding one can help prevent naming collisions depending on your conventions.
Create a pipeline by inheriting from Ductwork::Pipeline:
# app/pipelines/enrich_all_users_data.rb
class EnrichAllUsersData < Ductwork::Pipeline
end
# Or with a suffix for clarity
# app/pipelines/enrich_all_users_data_pipeline.rb
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
end
Each pipeline class automatically gets a default scope, allowing you to query for pipelines of that specific type:
:001> EnrichAllUsersDataPipeline.in_progress.to_sql
#=> "SELECT \"ductwork_pipelines\".* FROM \"ductwork_pipelines\"
# WHERE \"ductwork_pipelines\".\"klass\" = 'EnrichAllUsersDataPipeline'
# AND \"ductwork_pipelines\".\"status\" = 'in_progress'"
Defining Steps
Steps are Ruby objects that inherit from Ductwork::Step with a simple interface. They must live under app/steps and implement an #execute instance method that takes no arguments.
The initializer’s parameters depend on either:
- The arguments passed when the pipeline is triggered (for the first step)
- The return value from the previous step (for subsequent steps)
This simple interface makes steps highly testable without external dependencies. Like pipelines, step class names don’t require a “Step” suffix, though it may help with organization.
Example Step:
# app/steps/query_users_requiring_enrichment.rb
class QueryUsersRequiringEnrichment < Ductwork::Step
def initialize(days_outdated)
@days_outdated = days_outdated
end
def execute
ids = User.where("data_last_refreshed_at < ?", @days_outdated.days.ago).ids
Rails.logger.info("Enriching #{ids.length} users' data")
# Return the collection of IDs to pass to the next step
ids
end
end
Design philosophy: Steps represent checkpoints in your pipeline and should complete quickly. Break down long-running jobs into smaller work units by chaining steps together and passing data between them. The examples throughout this page demonstrate this approach.
Understanding Transitions
Transitions connect steps together. The key principle: the return value of one step becomes the input to the next step. This means you need to align the initializer’s arity between connected steps.
Important considerations:
- Align parameter counts between connected steps
- Use the splat operator (
*args) if you prefer treating everything as an array - Return values must be JSON-serializable
The Ductwork DSL uses a fluent interface pattern, enabling clean method chaining.
Transition Types
start - Define the First Step
The start transition defines your pipeline’s entry point. This step receives the arguments passed to the .trigger method (see Running Pipelines).
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(QueryUsersRequiringEnrichment)
end
end
expand - Fan Out to Multiple Steps
The expand transition acts like a “foreach” loop. It takes the return value from the previous step (which must be a collection) and creates a new step instance for each element. All expanded steps may run concurrently, depending on your job worker scaling.
The return value must be JSON-serializable and respond to #each.
Syntax: expand(to: StepClass)
# Remember in our step class that we returned an array of IDs. Each scalar ID
# will get passed to a new `LoadUserData` step.
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(QueryUsersRequiringEnrichment)
.expand(to: LoadUserData)
end
end
divide - Split Into Parallel Branches
The divide transition passes a copy of the return value to multiple steps simultaneously. Each step receives identical input and may run concurrently.
Syntax: divide(to: [StepClass1, StepClass2, ...])
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(QueryUsersRequiringEnrichment)
.expand(to: LoadUserData)
.divide(to: [FetchDataFromSourceA, FetchDataFromSourceB])
end
end
Block syntax: The divide transition accepts an optional block, yielding a branch for each step. This allows you to chain different transitions onto each branch independently:
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(StepA)
.divide(to: [StepB, StepC, StepD]) do |branch1, branch2, branch3|
branch1.chain(to: StepD)
branch2.expand(to: StepE)
branch3.chain(to: StepF).chain(to: StepG)
end
end
end
combine - Merge Branches Back Together
The combine transition is the inverse of divide. It merges multiple branches into a single step, combining the return values from all previous steps into an array. The combined step waits for all previous steps to complete before starting.
Syntax: combine(into: StepClass)
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(QueryUsersRequiringEnrichment)
.expand(to: LoadUserData)
.divide(to: [FetchDataFromSourceA, FetchDataFromSourceB])
.combine(into: CollateUserData)
end
end
Block syntax: For improved readability, you can call combine on any branch and pass the other branches as arguments:
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(StepA).divide(to: [StepB, StepC, StepD]) do |branch1, branch2, branch3|
branch1.combine(branch2, branch3, into: StepE)
end
end
end
Note: Calling combine on a pipeline that hasn’t been divided raises a Ductwork::DSL::DefinitionBuilder::CombineError.
chain - Connect Steps Sequentially
The chain transition is the simplest transition. It connects two steps sequentially—the second step only runs if the first succeeds. There’s no concurrency with chaining.
Syntax: chain(to: StepClass)
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(QueryUsersRequiringEnrichment)
.expand(to: LoadUserData)
.divide(to: [FetchDataFromSourceA, FetchDataFromSourceB])
.combine(into: CollateUserData)
.chain(to: UpdateUserData)
end
end
collapse - Gather Expanded Steps
The collapse transition is the inverse of expand. It gathers all steps from an expanded branch back into a single step, combining their return values into an array. Like combine, the collapsed step waits for all expanded steps to complete.
Syntax: collapse(into: StepClass)
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(QueryUsersRequiringEnrichment)
.expand(to: LoadUserData)
.divide(to: [FetchDataFromSourceA, FetchDataFromSourceB])
.combine(into: CollateUserData)
.chain(to: UpdateUserData)
.collapse(into: ReportUserEnrichmentSuccess)
end
end
Note: Calling collapse on a pipeline that hasn’t been expanded raises a Ductwork::DSL::DefinitionBuilder::CollapseError.
divert - Conditional Branching
The divert transition routes pipeline execution to different steps based on the return value of the previous step like a case/switch statement. Unlike divide, which sends the same input to all branches in parallel, divert selects only one branch to execute. The return value from the previous step is converted to a string and matched against the hash keys.
An otherwise key is always required as a fallback when no keys match.
Syntax: divert(to: { key1: StepClass1, key2: StepClass2, ..., otherwise: FallbackStepClass })
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(QueryUsersRequiringEnrichment)
.expand(to: LoadUserData)
.divert(to: {
premium: EnrichFromPremiumSource,
standard: EnrichFromStandardSource,
otherwise: EnrichFromDefaultSource
})
end
end
Block syntax: Like divide, the divert transition accepts an optional block, yielding a branch for each step. This allows you to chain different transitions onto each branch independently:
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(StepA)
.divert(to: { bar: StepB, otherwise: StepC }) do |branch1, branch2|
branch1.chain(to: StepD)
branch2.chain(to: StepE).chain(to: StepF)
end
end
end
Note: Omitting the otherwise key raises a Ductwork::DSL::DefinitionBuilder::DivertError. If at runtime the return value doesn’t match any key and no otherwise branch exists, the pipeline will halt.
converge - Merge Diverted Branches
The converge transition is the inverse of divert. It merges diverted branches back into a single step. The converged step receives the return value from whichever branch was selected at runtime.
Syntax: converge(into: StepClass)
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(QueryUsersRequiringEnrichment)
.expand(to: LoadUserData)
.divert(to: {
premium: EnrichFromPremiumSource,
standard: EnrichFromStandardSource,
otherwise: EnrichFromDefaultSource
})
.converge(into: UpdateUserData)
end
end
Block syntax: For improved readability, you can call converge on any branch and pass the other branches as arguments:
class EnrichAllUsersDataPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(StepA)
.divert(to: { bar: StepB, otherwise: StepC }) do |branch1, branch2|
branch1.converge(branch2, into: StepD)
end
end
end
Note: Calling converge on a pipeline that hasn’t been diverted raises a Ductwork::DSL::DefinitionBuilder::ConvergeError.
Next, we’re ready to run the ductwork processes.