Overview
The Agnostic (AGT) project is a SQL-native ETL framework that orchestrates data pipelines using ClickHouse and Apache Iceberg. It leverages a pipeline.yaml configuration, dynamic variables (vars), and templated SQL queries for flexible, fault-tolerant data processing. This document outlines AGT’s core features, focusing on pipeline configuration, stage types, and query templating. Examples are drawn from the HackerNews pipeline .
Pipeline Configuration with pipeline.yaml
AGT pipelines are defined in a pipeline.yaml file, which declaratively specifies the execution engine, initialization, source, and processing stages. This centralized configuration ensures reproducible workflows and simplifies pipeline management.
Dynamic Variables (vars)
AGT uses dynamic key-value pairs called vars, set via the CLI at startup or generated during execution. These enable parameterized pipeline logic and SQL queries.
- CLI Input: Set variables like
ICEBERG_DESTINATION_TABLE_LOCATIONorORDER_BYwhen running the pipeline (e.g.,agt run --config pipeline.yaml --var ORDER_BY=id). - Runtime Generation: Queries can produce
vars(e.g.,RANGE_START,OUTPUT_FILE) for use in later stages. - Usage: Variables are accessed in queries using
{{.VAR_NAME}}syntax, supporting dynamic behavior.
Pipeline Components
The pipeline.yaml file structures the pipeline into four components: Engine, Init, Source, and Stages.
Engine
The Engine section configures the execution environment, typically a local or remote ClickHouse instance. It supports custom settings (e.g., JSON parsing, S3 connections) and UDF bundles like icepq for Iceberg operations. The HackerNews pipeline uses a local ClickHouse engine with optimized settings.
Init
The Init section runs a single query at startup to initialize state, such as retrieving the latest processed data for fault tolerance. For example, the HackerNews pipeline’s init_start query fetches the maximum post ID from Iceberg metadata, outputting INIT_START as a var. It supports error handling, like ignoring specific error codes for non-existent tables.
Init:
Queries:
- init_start#ignore-error-codes=8888Source
The Source section generates tasks by executing a SQL query at a configurable interval (e.g., every 30 seconds). Each query row creates a task with vars. In the HackerNews pipeline, the source query produces post ID ranges (RANGE_START, RANGE_END), with options to stop on empty results or after a set number of runs.
Source:
Query: source
PollInterval: 30sStages
The Stages section defines a sequence of processing steps, with tasks flowing through each stage in order. AGT supports five stage types: Execute, Debug, Sleep, Buffer, and Metrics. The HackerNews pipeline uses Execute and Buffer.
Execute Stage
The Execute stage runs SQL queries for each task, optionally in parallel.
- Features: Supports
PoolSizefor concurrency,Orderedto maintain task sequence, andClickhouseSettingsfor query optimization. - Example: The HackerNews pipeline’s first stage runs
fetch_rangeto fetch posts into aMemorytable, processing four tasks concurrently with order preserved.
Debug Stage
The Debug stage logs task vars or query results for troubleshooting, aiding development without modifying data. It’s useful for inspecting intermediate states.
Sleep Stage
The Sleep stage introduces a configurable delay per task, enabling rate-limiting or synchronization with external systems.
Buffer Stage
The Buffer stage consolidates tasks into a single operation to optimize processing.
- Features: Uses
Enterto initialize (e.g., create a table),Queriesto process each task,Conditionto check completion,Leaveto finalize, andMaxDurationto limit buffering time. - Example: The HackerNews pipeline’s second stage buffers posts to reduce Iceberg write frequency, using queries like
create_buffer,insert_into_buffer, andmerge_vars.
Stages:
- Buffer:
Enter: create_buffer
Leave: rename_buffer
Condition: condition
Queries:
- insert_into_buffer
- drop_range
- merge_vars
MaxDuration: 5sMetrics Stage
The Metrics stage collects performance data, such as task processing times or error rates, for monitoring and optimization.
Templatized SQL Queries
AGT’s SQL queries use templating to inject vars dynamically with {{.VAR_NAME}} syntax.
- Variable Substitution: Accesses
varslikeRANGE_STARTorORDER_BY. - Default Values: Supports fallbacks (e.g.,
{{.MAX_BUFFER_SIZE | default "1000"}}). - Contextual Templating: In
Bufferstages,LEFTandRIGHTcontexts mergevars(e.g.,{{.LEFT.RANGE_START}}). - Example: The HackerNews pipeline’s
fetch_rangequery constructs API URLs usingRANGE_STARTandRANGE_END, whilemerge_varsgenerates unique file names.
This templating enhances query reusability and adaptability across pipeline stages.