Running Pipelines
Starting the Ductwork Process
If you recall, after running the Rails generator you’ll have a new binstub at bin/ductwork. This executable launches the supervisor process, which orchestrates your entire pipeline infrastructure—forking a single pipeline advancer and a job worker for each configured pipeline. Both the pipeline advancer and job worker create multiple threads to handle concurrent work.
The supervisor continuously monitors child processes via heartbeat checks. When a child process misses heartbeats for 5 minutes—indicating a crash or hang—the supervisor immediately spawns a replacement process to ensure minimal pipeline execution interruption.
Basic Usage
bin/ductwork
Custom Configuration
Use the -c or --config flag to specify a custom YAML configuration file. This lets you run multiple Ductwork instances with different pipeline configurations and scaling settings:
bin/ductwork -c config/ductwork.yml
bin/ductwork --config config/ductwork.0.yml
Pro tip: Run separate Ductwork instances with different configurations to isolate high-priority pipelines or scale specific workloads independently.
Triggering Pipelines from Your Code
With Ductwork running, you can trigger your pipelines from anywhere in your Rails application. The .trigger method enqueues the pipeline and returns a Ductwork::Pipeline instance that you can query for status, progress, or results. The .trigger method takes a single argument that will be passed to the first step.
Example: Rake Task
task enrich_user_data: :environment do
pipeline = EnrichAllUsersDataPipeline.trigger(7)
puts "Pipeline #{pipeline.id} started!"
end
Example: Controller Action
class DataEnrichmentController < ApplicationController
def create
days_outdated = params[:days_outdated] || 7
pipeline = EnrichAllUsersDataPipeline.trigger(days_outdated)
render json: {
pipeline_id: pipeline.id,
status: pipeline.status
}
end
end
Example: Scheduled Task
# config/schedule.rb (using whenever gem)
every 1.day, at: '3:00 am' do
runner "EnrichAllUsersDataPipeline.trigger(7)"
end
Monitoring Your Pipeline
The returned pipeline instance gives you real-time access to pipeline state:
pipeline = EnrichAllUsersDataPipeline.trigger(7)
pipeline.id #=> "123"
pipeline.status #=> "in_progress", "completed", "failed", etc.
pipeline.steps #=> Collection of all steps in the pipeline
pipeline.created_at #=> Timestamp
That’s it! Your pipelines are now running and processing work.