Tutorial: Hackernews

This tutorial explains how to create an AGT pipeline to extract posts from the HackerNews API, transform them, and load them into an Apache Iceberg table on an S3-compatible endpoint. The pipeline, defined in pipeline.yaml and referenced SQL queries, uses AGT’s SQL-native ETL capabilities with templated queries, dynamic vars, and staged processing for efficient, fault-tolerant data handling. All files are in the examples/hackernews_posts directory.

The pipeline is configured in pipeline.yaml, specifying a local ClickHouse engine, an initialization query, a source query for task generation, and three processing stages to fetch, buffer, and store HackerNews posts.

The Engine section sets up a local ClickHouse instance with settings optimized for JSON and Iceberg processing, including the icepq UDF bundle for Iceberg operations.

pipeline.yaml — yaml
Engine:
  Local:
    # Here you can instruct the Local engine to install any ClickHouse UDF bundle
    # so you can use these inside your queries.
    # Here, we make use of the `PLATFORM` variable exposed by AGT to target the right UDF binary for the
    # current platform.
    Bundles:
      - https://github.com/agnosticeng/icepq/releases/download/v0.1.2/icepq_clickhouse_udf_bundle_0.1.2_{{ .PLATFORM | default "linux_amd64_v3" }}.tar.gz

    # You can set any ClickHouse setting here to ensure they are applied for every query
    Settings:
      send_logs_level: warning
      allow_experimental_dynamic_type: 1
      allow_experimental_json_type: 1
      enable_dynamic_type: 1
      enable_json_type: 1
      enable_named_columns_in_function_tuple: 1
      schema_inference_make_columns_nullable: auto
      remote_filesystem_read_prefetch: 0
      input_format_parquet_use_native_reader: 1
      output_format_parquet_string_as_string: 0
      output_format_parquet_use_custom_encoder: 1
      output_format_parquet_write_page_index: 1
      output_format_parquet_write_bloom_filter: 1
      output_format_parquet_parallel_encoding: 0
      max_execution_time: 3600
      min_os_cpu_wait_time_ratio_to_throw: 100
      max_os_cpu_wait_time_ratio_to_throw: 200
      s3_max_connections: 50

The Init section runs the init_start query once at startup to retrieve the latest post ID from the Iceberg table, enabling recovery from failures since AGT is stateless.

pipeline.yaml — yaml
Init:
  Queries:
    - init_start#ignore-error-codes=8888
init_start — sql
select
    arrayMax(res.value[].upper)::UInt64 as INIT_START,
    throwIf(res.error::String = 'table does not exist', 'table does not exists', 8888::Int16) as _1,
    throwIf(res.error::String <> '', res.error::String) as _2
from (select icepq_field_bound_values(concat('s3:/', path('{{.ICEBERG_DESTINATION_TABLE_LOCATION}}')), 'id') as res)
settings allow_custom_error_code_in_throwif=true

The init_start query uses the ICEBERG_DESTINATION_TABLE_LOCATION var (an S3 endpoint URL) and the icepq_field_bound_values UDF from the icepq project to read the maximum id from Iceberg metadata without scanning Parquet files. It transforms the URL to an s3:// scheme and uses throwIf to handle errors, such as a non-existent table (error code 8888). The query outputs INIT_START as a var for the source, ignoring columns prefixed with _.

The Source section runs the source query every 30 seconds (PollInterval: 30s) to generate tasks, each representing a range of HackerNews post IDs. The source can stop AGT on an empty result set or after a set number of runs.

pipeline.yaml — yaml
Source:
  Query: source
  PollInterval: 30s
source — sql
with
    {{.MAX_BATCH_SIZE | default "10"}} as max_batch_size,
    {{.MAX_BATCH_PER_RUN | default "100"}} as max_batch_per_run,
    (select * from url('https://hacker-news.firebaseio.com/v0/maxitem.json', 'Raw')) as tip,

    coalesce(
        {{.RANGE_END | toCH}} + 1,
        {{.INIT_START | toCH}},
        {{.DEFAULT_START | toCH}},
        1
    ) as start

select
    generate_series as RANGE_START,
    least(tip::UInt64, (generate_series + max_batch_size - 1)::UInt64) as RANGE_END
from generate_series(
    assumeNotNull(start)::UInt64,
    assumeNotNull(tip)::UInt64,
    max_batch_size::UInt64
)
limit max_batch_per_run

The source query fetches the maximum post ID (tip) via the HackerNews API subquery:

sql
select * from url('https://hacker-news.firebaseio.com/v0/maxitem.json', 'Raw')

It determines the range start using RANGE_END + 1 (from a prior run), INIT_START (from init), DEFAULT_START (from CLI), or 1. Tasks are generated with RANGE_START and RANGE_END vars, respecting MAX_BATCH_SIZE (default 10) and MAX_BATCH_PER_RUN (default 100), e.g.:

vars — json
{
  "RANGE_START": 100,
  "RANGE_END": 109
}

The Stages section defines three processing stages using Execute and Buffer types (from AGT’s five stage types: Execute, Debug, Sleep, Buffer, Metrics). Stages run sequentially, with each stage using vars from prior steps to transform tasks into an Iceberg table.

This Execute stage runs the fetch_range query on up to four tasks concurrently (PoolSize: 4), with Ordered: true to preserve task order.

pipeline.yaml — yaml
Stages:
  - Execute:
      PoolSize: 4
      Ordered: true
      Queries:
        - fetch_range
fetch_range — sql
create table range_{{.RANGE_START}}_{{.RANGE_END}} engine=Memory
as (
    select
        `id`,
        `time`,
        `type`,
        `by`,
        `parent`,
        `kids`,
        `descendants`,
        `text`
    from url(
        'https://hacker-news.firebaseio.com/v0/item/{' || '{{.RANGE_START}}' || '..' || '{{.RANGE_END}}' ||'}.json',
        'JSON',
        '
            id UInt64,
            time Int64,
            type String,
            by String,
            parent Nullable(UInt64),
            kids Array(UInt64),
            descendants Nullable(Int64),
            text Nullable(String)
        '
    )
)

The fetch_range query uses ClickHouse’s url table function to fetch posts for the RANGE_START to RANGE_END range, storing them in a Memory table named range_{{RANGE_START}}_{{RANGE_END}}. It retains the task’s vars unchanged since no result set is produced.

This Buffer stage aggregates tasks to minimize frequent Iceberg writes, using five queries and a 5-second MaxDuration.

pipeline.yaml — yaml
Stages:
---
Buffer:
  Enter: create_buffer
  Leave: rename_buffer
  Condition: condition
  Queries:
    - insert_into_buffer
    - drop_range
    - merge_vars
  MaxDuration: 5s
create_buffer — sql
create table buffer as range_{{.RANGE_START}}_{{.RANGE_END}}
engine = MergeTree
order by {{.ORDER_BY}}
settings old_parts_lifetime=10

The create_buffer query, run on the first task, creates a MergeTree table buffer with the schema of range_{{RANGE_START}}_{{RANGE_END}}, sorted by ORDER_BY.

insert_into_buffer — sql
insert into table buffer
select * from range_{{.RIGHT.RANGE_START}}_{{.RIGHT.RANGE_END}}

The insert_into_buffer query copies rows from each task’s range_{{RIGHT.RANGE_START}}_{{RIGHT.RANGE_END}} table into buffer.

drop_range — sql
drop table range_{{.RIGHT.RANGE_START}}_{{.RIGHT.RANGE_END}} sync

The drop_range query drops the task’s range table to free memory.

merge_vars — sql
select
    least(
        {{.LEFT.RANGE_START | default "18446744073709551615"}},
        {{.RIGHT.RANGE_START}}
    ) as RANGE_START,
    greatest(
        {{.LEFT.RANGE_END | default "0"}},
        {{.RIGHT.RANGE_END}}
    ) as RANGE_END,
    generateUUIDv7() || '.parquet' as OUTPUT_FILE

The merge_vars query updates vars with the minimum RANGE_START and maximum RANGE_END across tasks, adding OUTPUT_FILE (e.g., UUID.parquet).

condition — sql
select toUInt64(count(*)) >= {{.MAX_BUFFER_SIZE | default "1000"}} as value from buffer

The condition query checks if buffer exceeds MAX_BUFFER_SIZE (default 1000 rows), triggering rename_buffer if true or after 5 seconds.

rename_buffer — sql
rename buffer to buffer_{{.RANGE_START}}_{{.RANGE_END}}

The rename_buffer query renames buffer to buffer_{{RANGE_START}}_{{RANGE_END}} and resets the buffer state.

This Execute stage runs three queries sequentially to write buffered data to S3 and commit to Iceberg.

pipeline.yaml — yaml
Stages:
---
- Execute:
    Queries:
      - write_parquet_file
      - iceberg_commit
      - drop_buffer
    ClickhouseSettings:
      max_threads: 1
      max_insert_threads: 1
      max_block_size: 32768
      min_insert_block_size_rows: 0
      min_insert_block_size_bytes: 33554432
write_parquet_file — sql
insert into function s3('{{.ICEBERG_DESTINATION_TABLE_LOCATION}}/data/{{.OUTPUT_FILE}}')
select * from buffer_{{.RANGE_START}}_{{.RANGE_END}}
order by {{.ORDER_BY}}

The write_parquet_file query writes the buffer_{{RANGE_START}}_{{RANGE_END}} table to a Parquet file in S3, sorted by ORDER_BY.

iceberg_commit — sql
select icepq_add(concat('s3:/', path('{{.ICEBERG_DESTINATION_TABLE_LOCATION}}')), ['{{.OUTPUT_FILE}}'])

The iceberg_commit query uses the icepq_add UDF to add the Parquet file to the Iceberg table.

drop_buffer — sql
drop table buffer_{{.RANGE_START}}_{{.RANGE_END}} sync

The drop_buffer query drops the buffer table to free resources.


This tutorial demonstrates AGT’s SQL-driven ETL for HackerNews data. Explore the Features section for more on stage types and vars.