All products
Data Engineering · Prototype · 2025-01-01

Task RunnerWorkflow Engine

Scalable workflow engine for DuckDB/S3 operations with per-task isolation and data lineage tracking.

Task Runner - Workflow Engine
Year
2025
Status
Prototype
Category
Data Engineering
Role
Architect & Lead

Key metrics

100+ tasks/sec
THROUGHPUT
Per-task containers
ISOLATION

Architecture

Docker-based task isolation with DuckDB analytics, Redis queue, and automatic lineage tracking.

Case study

Task Runner - Scalable Workflow Engine

Scalable workflow engine for DuckDB/S3 data operations with DAG support, built on Dramatiq with comprehensive error handling, scheduling, and data lineage tracking.

Architecture

graph TB
    subgraph "Workflow Definition"
        YAML[YAML Workflow
Spec] CLI[CLI Interface] end subgraph "Task Execution" ENGINE[Workflow Engine
DAG Resolver] RUNNER[Isolated Task
Runner] DOCKER[Docker
Containers] end subgraph "Data Layer" DUCKDB[(DuckDB
Analytics)] S3[(S3
Storage)] REDIS[(Redis
Queue)] end subgraph "Monitoring" LINEAGE[Data Lineage
Tracker] INCR[Incremental
Processor] METRICS[CloudWatch
Metrics)] end YAML --> ENGINE CLI --> ENGINE ENGINE --> RUNNER RUNNER --> DOCKER DOCKER --> DUCKDB DOCKER --> S3 ENGINE --> REDIS RUNNER --> LINEAGE RUNNER --> INCR RUNNER --> METRICS

Key Innovation: Per-Task Isolation

Unlike traditional workflow engines, each task gets:

nodes:
  - id: "process-data"
    type: "duckdb_sql"
    params:
      sql: "SELECT * FROM data"
      environment:
        python_version: "3.11"        # Task-specific Python
        requirements:
          - "duckdb==0.9.2"           # Task-specific deps
          - "pandas==2.1.4"
        system_packages:
          - "build-essential"
        memory_limit: "2Gi"
        cpu_limit: "1.0"
        timeout_seconds: 3600

Benefits:

  • No dependency conflicts between tasks
  • Independent scaling per task type
  • Version flexibility (Python 3.9-3.12)
  • Resource optimization (right-size each task)

DRY Task Development

All tasks inherit from a common base:

from task_runner import BaseTask, TaskResult

class MyCustomTask(BaseTask):
    """
    Base class provides:
    - Configuration loading
    - Parameter validation  
    - Logging with context
    - Output saving
    - Error handling
    """
    
    def get_required_params(self) -> list:
        """Define required parameters"""
        return ["input_file", "output_file"]
    
    def execute(self) -> TaskResult:
        """Implement task logic"""
        try:
            input_file = self.get_param("input_file")
            output_file = self.get_param("output_file")
            
            # Your task logic here
            result_data = self.process(input_file)
            self.save_output(output_file, result_data)
            
            return TaskResult(
                success=True,
                message="Task completed successfully",
                data={"processed": True}
            )
        except Exception as e:
            return TaskResult(
                success=False,
                message=f"Task failed: {e}",
                error=str(e)
            )

Data Lineage Tracking

Track data flow through workflows:

from task_runner import DataLineageTracker

tracker = DataLineageTracker(redis_client)

# Automatically records:
# - Data reads (which files/tables consumed)
# - Data transforms (source → target mappings)
# - Task dependencies (execution order)
# - Temporal lineage (when data was created)

# Query lineage
lineage = await tracker.get_asset_lineage(
    asset_id="s3://bucket/processed/data.parquet",
    direction="both"  # upstream + downstream
)

# Result:
{
    "asset": "s3://bucket/processed/data.parquet",
    "upstream": [
        {"asset": "s3://bucket/raw/data.json", "task": "download"},
        {"asset": "s3://bucket/raw/data2.json", "task": "download"}
    ],
    "downstream": [
        {"asset": "s3://bucket/reports/summary.csv", "task": "analyze"}
    ],
    "transformations": [
        {
            "task": "convert",
            "operation": "json_to_parquet",
            "timestamp": "2025-01-15T10:30:00Z"
        }
    ]
}

Incremental Processing

Intelligent skip logic:

class IncrementalProcessor:
    """
    Determines when tasks can be safely skipped
    """
    
    async def should_skip_task(self, task_spec, context) -> bool:
        """
        Skip if:
        1. Data hasn't changed (S3 ETag match)
        2. Parameters identical (checksum match)
        3. Dependencies unchanged (upstream checksums)
        4. Within max_age_seconds window
        """
        
        # Check data staleness
        if task_spec.incremental.max_age_seconds:
            last_run = await self.get_last_run(task_spec.id)
            if time.time() - last_run < task_spec.incremental.max_age_seconds:
                # Check if source data changed
                current_etag = await self.get_s3_etag(task_spec.source)
                last_etag = await self.get_cached_etag(task_spec.id)
                
                if current_etag == last_etag:
                    self.logger.info(f"Skipping {task_spec.id}: data unchanged")
                    return True
        
        # Check parameter changes
        param_checksum = self.calculate_checksum(task_spec.params)
        last_checksum = await self.get_cached_checksum(task_spec.id)
        
        if param_checksum != last_checksum:
            return False  # Parameters changed, must run
        
        # Check dependency changes
        if task_spec.incremental.dependency_check:
            for dep in task_spec.dependencies:
                dep_changed = await self.dependency_changed(dep)
                if dep_changed:
                    return False
        
        return True  # Safe to skip

Workflow Example

Complete data pipeline in YAML:

name: "daily-data-pipeline"
schedule:
  cron: "0 2 * * *"  # 2 AM daily

nodes:
  - id: "download-data"
    type: "download_to_s3"
    params:
      url: "https://api.example.com/data"
      s3_uri: "s3://my-bucket/raw/data.json"
      environment:
        python_version: "3.9"
        requirements: ["requests==2.31.0"]
        memory_limit: "512Mi"
  
  - id: "convert-to-parquet"
    type: "s3_json_to_parquet_duckdb"
    params:
      src_glob: "s3://my-bucket/raw/*.json"
      dst_uri: "s3://my-bucket/processed/data.parquet"
      environment:
        python_version: "3.11"
        requirements: ["duckdb==0.9.2", "pandas==2.1.4"]
        memory_limit: "2Gi"
      incremental:
        enabled: true
        strategy: "hybrid"
        max_age_seconds: 3600
        checksum_fields: ["date", "source"]
        dependency_check: true
    needs: ["download-data"]
  
  - id: "run-analysis"
    type: "duckdb_sql"
    params:
      sql: |
        SELECT 
          date,
          COUNT(*) as record_count,
          AVG(value) as avg_value
        FROM read_parquet('s3://my-bucket/processed/data.parquet')
        GROUP BY date
        ORDER BY date
      output_uri: "s3://my-bucket/reports/analysis.csv"
      environment:
        python_version: "3.12"
        requirements: ["duckdb==0.9.2"]
        memory_limit: "1Gi"
    needs: ["convert-to-parquet"]

Available Task Types

DuckDB Operations

  • duckdb_sql - Execute SQL with S3 support
  • s3_json_to_parquet_duckdb - Convert JSON/CSV to Parquet
  • duckdb_aggregate - Pre-built aggregation tasks

S3 Operations

  • download_to_s3 - Download URLs to S3
  • s3_extract_archive - Extract compressed archives
  • git_clone_to_s3 - Clone repos to S3
  • s3_sync - Sync between buckets

Infrastructure

  • ecs_run_task - Execute ECS Fargate tasks
  • lambda_invoke - Invoke Lambda functions

Scheduling System

from task_runner import WorkflowScheduler, ScheduleConfig

scheduler = WorkflowScheduler(redis_client)

# Cron-based schedule
schedule = ScheduleConfig(
    id="daily-pipeline",
    name="Daily Data Pipeline",
    workflow=workflow_spec,
    schedule_type=ScheduleType.CRON,
    cron_expression="0 2 * * *",  # 2 AM daily
    timezone="UTC"
)

await scheduler.add_schedule(schedule)

# Interval-based schedule  
schedule = ScheduleConfig(
    id="frequent-pipeline",
    name="Hourly Updates",
    workflow=workflow_spec,
    schedule_type=ScheduleType.INTERVAL,
    interval_seconds=3600  # Every hour
)

await scheduler.add_schedule(schedule)

Error Handling & Retries

@broker.task(
    retry_on_error=True,
    max_retries=3,
    retry_backoff=True,  # Exponential backoff
    acks_late=True  # Acknowledge after completion
)
async def resilient_task(task_spec: TaskSpec):
    """
    Automatic error handling:
    - Retry with backoff (1s, 2s, 4s)
    - Dead letter queue after max retries
    - Error context preservation
    - Alerting on failures
    """
    try:
        return await execute_task(task_spec)
    except TemporaryError as e:
        # Will be retried
        raise
    except PermanentError as e:
        # Sent to DLQ immediately
        await send_to_dlq(task_spec, e)
        raise

Performance Metrics

Metric                    | Value         | Notes
--------------------------|---------------|---------------------------
Tasks/second             | 100+          | With auto-scaling
Task startup time        | 2-5 seconds   | Container initialization
DuckDB query perf        | 1GB/s         | S3 read throughput
Incremental skip rate    | 40-60%        | Typical data pipelines
Concurrent workflows     | 50+           | Per worker pool
End-to-end latency       | 30s - 5min    | Depends on pipeline

Deployment Options

Docker Compose (Development)

docker compose up -d
task-runner workflow run --file pipeline.yaml

AWS ECS (Production)

# Auto-scaling based on queue depth
cdk deploy TaskRunnerStack

# Deploys:
# - ECS cluster with Fargate
# - Application Load Balancer
# - CloudWatch dashboards
# - Auto-scaling policies

Kubernetes (Enterprise)

helm install task-runner ./charts/task-runner
# Includes:
# - Worker deployments
# - Redis StatefulSet
# - Horizontal Pod Autoscaler

CLI Interface

# Run workflow
task-runner workflow run --tenant mycompany --file pipeline.yaml

# Check status
task-runner workflow status --job-id job-123

# View logs
task-runner workflow logs --job-id job-123 --follow

# Schedule management
task-runner schedule add --name "daily" --cron "0 2 * * *"
task-runner schedule list
task-runner schedule trigger --id daily-123

# Data lineage
task-runner lineage job --job-id job-123
task-runner lineage asset --uri s3://bucket/file.parquet

# Incremental stats
task-runner incremental summary --tenant mycompany

Use Cases

1. ETL Pipelines

Transform data from APIs/databases to analytics-ready formats with incremental processing.

2. ML Feature Engineering

Generate features from raw data with lineage tracking for model debugging.

3. Data Quality Checks

Scheduled validation workflows with alerting on failures.

4. Multi-Tenant Analytics

Isolated workflows per customer with quota management.

Technical Highlights

  • Dramatiq Workers - Type-safe async job processing
  • DuckDB Analytics - In-process OLAP database with S3 integration
  • Docker Isolation - Per-task containers for clean dependencies
  • Redis Queue - High-throughput job distribution
  • CloudWatch Integration - Comprehensive monitoring and alerting

Scalable data engineering from MacLeod Labs

Tech stack

PythonDuckDBS3DramatiqDockerECS