This document provides a comprehensive reference for the pipeline.yaml configuration file used by the AGT (Agnostic) pipeline runner.
Overview
The pipeline.yaml file defines the configuration for a data processing pipeline that runs on ClickHouse. The configuration is structured around several main sections that control different aspects of pipeline execution.
Configuration Structure
# Pipeline configuration
Init:
# Initialization settings
Source:
# Data source configuration
Stages:
# Processing stages
Finalizer:
# Cleanup configuration
# Engine configuration
Engine:
# Engine settings
# Monitoring configuration
StartupProbe:
# Startup probe settings
# Prometheus metrics endpoint
PromAddr: ":9999"Configuration Reference
Root Configuration
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Init | InitConfig | No | {} | Initialization configuration |
Source | SourceConfig | Yes | - | Data source configuration |
Stages | Array of StageConfig | Yes | - | Processing stages (minimum 1 required) |
Finalizer | FinalizerConfig | No | {} | Cleanup configuration |
Engine | EngineConfig | No | {} | Engine configuration |
StartupProbe | StartupProbeConfig | No | {} | Startup probe configuration |
PromAddr | string | No | ":9999" | Prometheus metrics endpoint address |
Init Configuration
The Init section defines initialization queries and settings that run before the pipeline starts.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Queries | Array of QueryRef | No | [] | Initialization queries to execute |
ClickhouseSettings | Map[string, any] | No | {} | ClickHouse settings for initialization queries |
Example:
Init:
Queries:
- Name: "create_tables"
IgnoreFailure: false
ClickhouseSettings:
max_memory_usage: 1000000000Source Configuration
The Source section defines how data is ingested into the pipeline.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Query | QueryRef | Yes | - | Source query to execute |
PollInterval | duration | No | 0s | Interval between query executions (0 = run once) |
StopAfter | integer | No | 0 | Stop after N iterations (0 = run indefinitely) |
StopOnEmpty | boolean | No | false | Stop when query returns no results |
ClickhouseSettings | Map[string, any] | No | {} | ClickHouse settings for source queries |
Example:
Source:
Query:
Name: "fetch_events"
IgnoreFailure: false
PollInterval: "30s"
StopAfter: 100
StopOnEmpty: true
ClickhouseSettings:
max_execution_time: 300Stage Configuration
The Stages array defines the processing stages that transform data as it flows through the pipeline. Each stage must specify exactly one stage type.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
ChanSize | integer | No | 0 | Channel buffer size for this stage |
Execute | ExecuteStageConfig | No | nil | Execute stage configuration |
Debug | DebugStageConfig | No | nil | Debug stage configuration |
Sleep | SleepStageConfig | No | nil | Sleep stage configuration |
Buffer | BufferStageConfig | No | nil | Buffer stage configuration |
Metrics | MetricsStageConfig | No | nil | Metrics stage configuration |
Execute Stage
Executes ClickHouse queries for each data item with concurrent processing capabilities.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
PoolSize | integer | No | 0 | Number of concurrent workers |
SemaphoreWeight | integer | No | 0 | Semaphore weight limit |
WorkerChanSize | integer | No | 0 | Worker channel buffer size |
SequencerChanSize | integer | No | 0 | Sequencer channel buffer size |
Queries | Array of QueryRef | Yes | - | Queries to execute |
ClickhouseSettings | Map[string, any] | No | {} | ClickHouse settings |
Example:
Stages:
- Execute:
PoolSize: 4
SemaphoreWeight: 10
Queries:
- Name: "process_event"
IgnoreFailure: false
- Name: "update_metrics"
IgnoreFailure: true
ClickhouseSettings:
max_memory_usage: 500000000Debug Stage
Outputs data items for debugging purposes.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Id | string | No | "" | Debug identifier |
Pretty | boolean | No | false | Pretty-print JSON output |
Example:
Stages:
- Debug:
Id: "stage_1_output"
Pretty: trueSleep Stage
Introduces a delay in processing.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Duration | duration | Yes | - | Sleep duration |
Example:
Stages:
- Sleep:
Duration: "1s"Buffer Stage
Buffers data items based on conditions and time windows.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Enter | QueryRef | No | nil | Query to execute when entering buffer |
Leave | QueryRef | No | nil | Query to execute when leaving buffer |
Condition | QueryRef | No | nil | Condition query for buffering |
Queries | Array of QueryRef | No | [] | Queries to execute on buffered data |
MaxDuration | duration | No | 0s | Maximum buffer duration |
ClickhouseSettings | Map[string, any] | No | {} | ClickHouse settings |
Example:
Stages:
- Buffer:
Condition:
Name: "should_buffer"
MaxDuration: "5m"
Queries:
- Name: "process_batch"
ClickhouseSettings:
max_memory_usage: 2000000000Metrics Stage
Collects and reports metrics based on query results.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Metrics | Array of MetricConfig | Yes | - | Metrics to collect |
Query | QueryRef | Yes | - | Query to execute for metrics |
ClickhouseSettings | Map[string, any] | No | {} | ClickHouse settings |
Example:
Stages:
- Metrics:
Query:
Name: "get_metrics"
Metrics:
- Name: "events_processed"
Type: "counter"
- Name: "processing_duration"
Type: "histogram"Finalizer Configuration
The Finalizer section defines cleanup operations that run after the pipeline completes. Currently, this is an empty configuration object.
Finalizer: {}Engine Configuration
The Engine section configures the execution engine (local or remote ClickHouse).
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Local | LocalEngineConfig | No | nil | Local ClickHouse engine configuration |
Remote | RemoteEngineConfig | No | nil | Remote ClickHouse engine configuration |
Local Engine
Configures a local ClickHouse instance managed by the pipeline.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
BinaryPath | string | No | "" | Path to ClickHouse binary |
WorkingDir | string | No | "" | Working directory |
Env | Map[string, string] | No | {} | Environment variables |
Bundles | Array of string | No | [] | Bundle names to load |
BundlesPath | string | No | "" | Path to bundles directory |
DisableCleanup | boolean | No | false | Disable cleanup on shutdown |
ServerSettings | Map[string, any] | No | {} | ClickHouse server settings |
Dsn | string | No | "" | Database connection string |
Settings | Map[string, any] | No | {} | ClickHouse client settings |
Vars | Map[string, any] | No | {} | Template variables |
Example:
Engine:
Local:
BinaryPath: "/usr/bin/clickhouse"
WorkingDir: "/tmp/clickhouse"
Bundles:
- "ethereum"
- "polygon"
BundlesPath: "/opt/bundles"
ServerSettings:
max_connections: 1000
Settings:
max_memory_usage: 1000000000Remote Engine
Configures connection to a remote ClickHouse instance.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Dsn | string | Yes | - | Database connection string |
Settings | Map[string, any] | No | {} | ClickHouse client settings |
Logging | LogHandlerConfig | No | {} | Logging configuration |
Example:
Engine:
Remote:
Dsn: "clickhouse://user:password@host:9000/database"
Settings:
max_memory_usage: 1000000000
Logging:
DiscardSources:
- "system.query_log"Startup Probe Configuration
The StartupProbe section configures health checks during startup.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
MaxDelay | duration | No | 0s | Maximum delay before considering startup failed |
PollInterval | duration | No | 0s | Interval between health checks |
Example:
StartupProbe:
MaxDelay: "30s"
PollInterval: "1s"Query Reference
Query references specify named queries with error handling options.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Name | string | Yes | - | Query template name |
IgnoreFailure | boolean | No | false | Continue execution if query fails |
IgnoreErrorCodes | Array of integer | No | [] | Specific error codes to ignore |
Example:
Query:
Name: "my_query"
IgnoreFailure: false
IgnoreErrorCodes: [62, 144] # Table doesn't exist, Memory limit exceededMetric Configuration
Metric configurations define how metrics are collected and reported.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
Name | string | Yes | - | Metric name |
Type | string | Yes | - | Metric type (counter, gauge, histogram, etc.) |
Example:
Metrics:
- Name: "requests_total"
Type: "counter"
- Name: "request_duration_seconds"
Type: "histogram"Log Handler Configuration
Log handler configurations control logging behavior for remote engines.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
DiscardSources | Array of string | No | [] | Log sources to discard |
Example:
Logging:
DiscardSources:
- "system.query_log"
- "system.trace_log"Data Types
Duration
Duration values are specified as strings with time units:
ns- nanosecondsμsorus- microsecondsms- millisecondss- secondsm- minutesh- hours
Examples:
PollInterval: "30s"
MaxDuration: "5m"
MaxDelay: "1h30m"Environment Variables
Configuration values can be overridden using environment variables with the AGT_ prefix. Use uppercase and underscores for nested fields.
Examples:
AGT_PROMADDR=":8080"
AGT_ENGINE_REMOTE_DSN="clickhouse://localhost:9000/default"
AGT_SOURCE_POLLINTERVAL="60s"Template Variables
Query templates can use variables that are:
- Passed via command line (
--var key=value) - Defined in the engine configuration (
Varssection) - Derived from previous query results
Variables are accessible in templates using Go template syntax: {{ .variable_name }}
Complete Example
For a complete example, check out our Tutorial: Hackernews !
You can find the full pipeline.yaml configuration file in our github repository