References

This document provides a comprehensive reference for the pipeline.yaml configuration file used by the AGT (Agnostic) pipeline runner.

The pipeline.yaml file defines the configuration for a data processing pipeline that runs on ClickHouse. The configuration is structured around several main sections that control different aspects of pipeline execution.

yaml
# Pipeline configuration
Init:
  # Initialization settings
Source:
  # Data source configuration
Stages:
  # Processing stages
Finalizer:
  # Cleanup configuration

# Engine configuration
Engine:
  # Engine settings

# Monitoring configuration
StartupProbe:
  # Startup probe settings

# Prometheus metrics endpoint
PromAddr: ":9999"

Field Type Required Default Description
Init InitConfig No {} Initialization configuration
Source SourceConfig Yes - Data source configuration
Stages Array of StageConfig Yes - Processing stages (minimum 1 required)
Finalizer FinalizerConfig No {} Cleanup configuration
Engine EngineConfig No {} Engine configuration
StartupProbe StartupProbeConfig No {} Startup probe configuration
PromAddr string No ":9999" Prometheus metrics endpoint address

The Init section defines initialization queries and settings that run before the pipeline starts.

Field Type Required Default Description
Queries Array of QueryRef No [] Initialization queries to execute
ClickhouseSettings Map[string, any] No {} ClickHouse settings for initialization queries

Example:

yaml
Init:
  Queries:
    - Name: "create_tables"
      IgnoreFailure: false
  ClickhouseSettings:
    max_memory_usage: 1000000000

The Source section defines how data is ingested into the pipeline.

Field Type Required Default Description
Query QueryRef Yes - Source query to execute
PollInterval duration No 0s Interval between query executions (0 = run once)
StopAfter integer No 0 Stop after N iterations (0 = run indefinitely)
StopOnEmpty boolean No false Stop when query returns no results
ClickhouseSettings Map[string, any] No {} ClickHouse settings for source queries

Example:

yaml
Source:
  Query:
    Name: "fetch_events"
    IgnoreFailure: false
  PollInterval: "30s"
  StopAfter: 100
  StopOnEmpty: true
  ClickhouseSettings:
    max_execution_time: 300

The Stages array defines the processing stages that transform data as it flows through the pipeline. Each stage must specify exactly one stage type.

Field Type Required Default Description
ChanSize integer No 0 Channel buffer size for this stage
Execute ExecuteStageConfig No nil Execute stage configuration
Debug DebugStageConfig No nil Debug stage configuration
Sleep SleepStageConfig No nil Sleep stage configuration
Buffer BufferStageConfig No nil Buffer stage configuration
Metrics MetricsStageConfig No nil Metrics stage configuration

Executes ClickHouse queries for each data item with concurrent processing capabilities.

Field Type Required Default Description
PoolSize integer No 0 Number of concurrent workers
SemaphoreWeight integer No 0 Semaphore weight limit
WorkerChanSize integer No 0 Worker channel buffer size
SequencerChanSize integer No 0 Sequencer channel buffer size
Queries Array of QueryRef Yes - Queries to execute
ClickhouseSettings Map[string, any] No {} ClickHouse settings

Example:

yaml
Stages:
  - Execute:
      PoolSize: 4
      SemaphoreWeight: 10
      Queries:
        - Name: "process_event"
          IgnoreFailure: false
        - Name: "update_metrics"
          IgnoreFailure: true
      ClickhouseSettings:
        max_memory_usage: 500000000

Outputs data items for debugging purposes.

Field Type Required Default Description
Id string No "" Debug identifier
Pretty boolean No false Pretty-print JSON output

Example:

yaml
Stages:
  - Debug:
      Id: "stage_1_output"
      Pretty: true

Introduces a delay in processing.

Field Type Required Default Description
Duration duration Yes - Sleep duration

Example:

yaml
Stages:
  - Sleep:
      Duration: "1s"

Buffers data items based on conditions and time windows.

Field Type Required Default Description
Enter QueryRef No nil Query to execute when entering buffer
Leave QueryRef No nil Query to execute when leaving buffer
Condition QueryRef No nil Condition query for buffering
Queries Array of QueryRef No [] Queries to execute on buffered data
MaxDuration duration No 0s Maximum buffer duration
ClickhouseSettings Map[string, any] No {} ClickHouse settings

Example:

yaml
Stages:
  - Buffer:
      Condition:
        Name: "should_buffer"
      MaxDuration: "5m"
      Queries:
        - Name: "process_batch"
      ClickhouseSettings:
        max_memory_usage: 2000000000

Collects and reports metrics based on query results.

Field Type Required Default Description
Metrics Array of MetricConfig Yes - Metrics to collect
Query QueryRef Yes - Query to execute for metrics
ClickhouseSettings Map[string, any] No {} ClickHouse settings

Example:

yaml
Stages:
  - Metrics:
      Query:
        Name: "get_metrics"
      Metrics:
        - Name: "events_processed"
          Type: "counter"
        - Name: "processing_duration"
          Type: "histogram"

The Finalizer section defines cleanup operations that run after the pipeline completes. Currently, this is an empty configuration object.

yaml
Finalizer: {}

The Engine section configures the execution engine (local or remote ClickHouse).

Field Type Required Default Description
Local LocalEngineConfig No nil Local ClickHouse engine configuration
Remote RemoteEngineConfig No nil Remote ClickHouse engine configuration

Configures a local ClickHouse instance managed by the pipeline.

Field Type Required Default Description
BinaryPath string No "" Path to ClickHouse binary
WorkingDir string No "" Working directory
Env Map[string, string] No {} Environment variables
Bundles Array of string No [] Bundle names to load
BundlesPath string No "" Path to bundles directory
DisableCleanup boolean No false Disable cleanup on shutdown
ServerSettings Map[string, any] No {} ClickHouse server settings
Dsn string No "" Database connection string
Settings Map[string, any] No {} ClickHouse client settings
Vars Map[string, any] No {} Template variables

Example:

yaml
Engine:
  Local:
    BinaryPath: "/usr/bin/clickhouse"
    WorkingDir: "/tmp/clickhouse"
    Bundles:
      - "ethereum"
      - "polygon"
    BundlesPath: "/opt/bundles"
    ServerSettings:
      max_connections: 1000
    Settings:
      max_memory_usage: 1000000000

Configures connection to a remote ClickHouse instance.

Field Type Required Default Description
Dsn string Yes - Database connection string
Settings Map[string, any] No {} ClickHouse client settings
Logging LogHandlerConfig No {} Logging configuration

Example:

yaml
Engine:
  Remote:
    Dsn: "clickhouse://user:password@host:9000/database"
    Settings:
      max_memory_usage: 1000000000
    Logging:
      DiscardSources:
        - "system.query_log"

The StartupProbe section configures health checks during startup.

Field Type Required Default Description
MaxDelay duration No 0s Maximum delay before considering startup failed
PollInterval duration No 0s Interval between health checks

Example:

yaml
StartupProbe:
  MaxDelay: "30s"
  PollInterval: "1s"

Query references specify named queries with error handling options.

Field Type Required Default Description
Name string Yes - Query template name
IgnoreFailure boolean No false Continue execution if query fails
IgnoreErrorCodes Array of integer No [] Specific error codes to ignore

Example:

yaml
Query:
  Name: "my_query"
  IgnoreFailure: false
  IgnoreErrorCodes: [62, 144]  # Table doesn't exist, Memory limit exceeded

Metric configurations define how metrics are collected and reported.

Field Type Required Default Description
Name string Yes - Metric name
Type string Yes - Metric type (counter, gauge, histogram, etc.)

Example:

yaml
Metrics:
  - Name: "requests_total"
    Type: "counter"
  - Name: "request_duration_seconds"
    Type: "histogram"

Log handler configurations control logging behavior for remote engines.

Field Type Required Default Description
DiscardSources Array of string No [] Log sources to discard

Example:

yaml
Logging:
  DiscardSources:
    - "system.query_log"
    - "system.trace_log"

Duration values are specified as strings with time units:

  • ns - nanoseconds
  • μs or us - microseconds
  • ms - milliseconds
  • s - seconds
  • m - minutes
  • h - hours

Examples:

yaml
PollInterval: "30s"
MaxDuration: "5m"
MaxDelay: "1h30m"

Configuration values can be overridden using environment variables with the AGT_ prefix. Use uppercase and underscores for nested fields.

Examples:

bash
AGT_PROMADDR=":8080"
AGT_ENGINE_REMOTE_DSN="clickhouse://localhost:9000/default"
AGT_SOURCE_POLLINTERVAL="60s"

Query templates can use variables that are:

  1. Passed via command line (--var key=value)
  2. Defined in the engine configuration (Vars section)
  3. Derived from previous query results

Variables are accessible in templates using Go template syntax: {{ .variable_name }}

For a complete example, check out our Tutorial: Hackernews ! You can find the full pipeline.yaml configuration file in our github repository