pg2iceberg replicates data from Postgres directly to Iceberg, no Kafka needed. Opinionated by design:
- Specifically replicates Postgres → Iceberg, nothing else.
- Assumes pg2iceberg is the sole writer of the Iceberg tables it manages, including compaction.
graph LR
App[Application] <-->|Read/Write| PG
subgraph pg2iceberg
PG[Postgres] -->|Replicate| ICE[Iceberg]
end
OLAP[Snowflake<br />ClickHouse<br />etc.] -- Query --> ICE
pg2iceberg can operate in logical replication mode (recommended, full CDC) or query mode (watermark-based polling for Postgres replicas without wal_level=logical).
graph LR
subgraph Postgres
TableA["Table A"]
TableB["Table B"]
Coord["_pg2iceberg schema<br />(coordination)"]
end
subgraph S3
StagedA["Staged WAL<br />(Parquet)"]
StagedB["Staged WAL<br />(Parquet)"]
end
subgraph Iceberg
TargetA[Table A]
TargetB[Table B]
end
TableA -->|Logical Replication| StagedA
TableB -->|Logical Replication| StagedB
StagedA -->|Materializer| TargetA
StagedB -->|Materializer| TargetB
StagedA -.->|offset index| Coord
StagedB -.->|offset index| Coord
pg2iceberg captures WAL change events via PostgreSQL logical replication and stages them as Parquet files in S3. A lightweight coordination layer in the source Postgres database (_pg2iceberg schema) tracks offsets and materializer progress. Since the write path only involves S3 uploads + a small PG transaction (no Iceberg catalog on the hot path), the replication slot LSN can be advanced quickly, minimizing WAL retention on the source.
A materializer, which runs at a separate interval, reads the staged Parquet files and merges them into the corresponding Iceberg tables using merge-on-read (equality deletes for updates/deletes, data files for inserts).
Staged files use a fixed Parquet schema regardless of source table changes: metadata columns (_op, _lsn, _ts, _unchanged_cols) plus a JSON _data column containing user data. Schema evolution (ALTER TABLE) only affects the Iceberg materialized table, not the staging layer.
Single-process (default pg2iceberg run): one process runs the WAL writer and materializer together. Simplest to deploy.
+------------------------------+
| pg2iceberg run |
| +----------+ +------------+|
| |WAL Writer|->|Materializer||
| +----------+ +------------+|
+------------------------------+
Distributed: one pg2iceberg stream-only process owns the replication slot; N pg2iceberg materializer-only --worker-id <id> workers each claim a deterministic slice of tables via heartbeat-based coordination. Workers can be added or removed dynamically — tables rebalance on the next cycle.
+------------------+ +--------------------------+ +--------------------------+
| stream-only | | materializer-only | | materializer-only |
| +------------+ | | --worker-id worker-a | | --worker-id worker-b |
| | WAL Writer | | | (tables 1, 3) | | (tables 2, 4) |
| +------------+ | +--------------------------+ +--------------------------+
+------------------+ ^ ^
+-- _pg2iceberg.consumer ---------+
(heartbeat registry)
All coordination state lives under the _pg2iceberg schema in the source (or a dedicated state) Postgres:
| Table | Purpose |
|---|---|
log_seq |
Per-table offset counter (atomic increment) |
log_index |
Sparse index of staged Parquet files with offset ranges + LSN |
mat_cursor |
Materializer progress (last committed offset per table) |
consumer |
Heartbeat registry for distributed materializer workers |
lock |
Per-table locks (legacy; not load-bearing in current design) |
pipeline_meta |
Singleton: source-cluster system_identifier (DSN-swap detection) |
flushed_lsn |
Singleton: highest LSN we've acked the slot to (slot-tamper detection) |
tables |
Per-table snapshot status + pg_class.oid (drop-recreate detection) |
snapshot_progress |
Per-table mid-snapshot resume cursor |
query_watermarks |
Per-table watermark for query mode |
pending_markers |
Pending blue-green replica-alignment markers |
marker_emissions |
Per-(uuid, table) marker emission record (idempotent dedup) |
Coordinator write amplification is negligible: a few small PG writes per flush regardless of batch size.
graph LR
subgraph Postgres
TableA["Table A"]
TableB["Table B"]
end
subgraph Iceberg
TargetA[Table A]
TargetB[Table B]
end
TableA -->|"SELECT WHERE watermark > $1"| TargetA
TableB -->|"SELECT WHERE watermark > $1"| TargetB
Query mode polls Postgres using watermark-based SELECT queries and writes directly to the materialized Iceberg tables. Each row is an upsert (equality delete + insert) keyed by primary key.
Query mode is simpler but cannot detect hard deletes and has no transaction semantics. Use logical mode when you need full CDC fidelity.
pg2iceberg <SUBCOMMAND> --config /etc/pg2iceberg/config.yaml [flags...]| Subcommand | Purpose |
|---|---|
run |
Long-running pipeline. Logical or query mode depending on source.mode. |
snapshot |
One-shot: run the initial snapshot phase per configured table, then exit. Auto-creates the slot first so a later run doesn't lose WAL. |
cleanup |
Drop the replication slot, drop the publication, and DROP SCHEMA … CASCADE on the coordinator. Resets PG-side state ahead of a re-bootstrap. Doesn't drop Iceberg tables — do that out-of-band. |
compact |
One-shot: run a single compaction pass over every configured table, then exit. For cron / k8s CronJob. |
maintain |
One-shot: snapshot expiry + orphan-file cleanup over every configured table. Reads sink.maintenance_retention / sink.maintenance_grace. |
verify |
Diff PG ground truth against Iceberg materialized state for every configured table. Exits non-zero on any diff. Day-2 confidence check. |
stream-only |
Distributed mode: WAL writer only; pair with one or more materializer-only workers. |
materializer-only --worker-id <id> |
Distributed mode: materializer worker only. Joins the heartbeat group keyed by state.group; tables auto-rebalance on join/leave. |
migrate-coord |
Run the coordinator's idempotent schema migration (every statement is CREATE … IF NOT EXISTS). |
connect-pg / connect-iceberg |
Connectivity smoke tests for the PG / Iceberg-catalog prod paths. |
Run pg2iceberg --help for the full list and per-subcommand flags.
pg2iceberg/
├── Cargo.toml # workspace root; pins polynya-dev/iceberg-rust fork
├── crates/
│ ├── pg2iceberg/ # binary: CLI dispatch, run.rs, setup.rs
│ ├── pg2iceberg-core/ # types only (no IO): Lsn, ChangeEvent, TableSchema, Mode, …
│ ├── pg2iceberg-pg/ # PG client: pgoutput stream, slot health, replication trait
│ ├── pg2iceberg-coord/ # Coordinator trait + SQL + Postgres impl
│ ├── pg2iceberg-stream/ # BlobStore trait + object_store-backed prod impl + codec
│ ├── pg2iceberg-iceberg/ # Catalog trait, TableWriter, MoR fold, vended-S3 router, meta tables
│ ├── pg2iceberg-logical/ # Pipeline + Materializer + ticker schedule
│ ├── pg2iceberg-snapshot/ # Resumable snapshot phase
│ ├── pg2iceberg-query/ # Query-mode pipeline
│ ├── pg2iceberg-validate/ # Startup invariants + lifecycle helper + verify subcommand
│ ├── pg2iceberg-sim/ # Memory-backed implementations of every prod trait (DST harness)
│ └── pg2iceberg-tests/ # DST scenario tests + testcontainers integration tests
├── docs/ # mdbook-style reference (architecture, catalogs, usage, …)
├── example/
│ ├── single/ # Docker Compose stack: PG + iceberg-rest + MinIO + Grafana
│ └── blue-green/ # Two-side replica-alignment example with marker UUIDs
└── Dockerfile # multi-stage release build
Every IO-touching crate is gated behind a prod feature; the default build is sim-only and feeds the deterministic-simulation testing harness in pg2iceberg-tests.
| PostgreSQL type | Iceberg type | Notes |
|---|---|---|
smallint |
int |
|
integer, serial, oid |
int |
|
bigint, bigserial |
long |
|
real |
float |
|
double precision |
double |
|
numeric(p,s) where p ≤ 38 |
decimal(p,s) |
Precision preserved exactly |
numeric(p,s) where p > 38 |
— | Pipeline refuses to start (see below) |
numeric (unconstrained) |
decimal(38,18) |
Warning logged; values that overflow will error |
boolean |
boolean |
|
text, varchar, char, name |
string |
|
bytea |
binary |
|
date |
date |
|
time, timetz |
time |
Microsecond precision |
timestamp |
timestamp |
Microsecond precision |
timestamptz |
timestamptz |
Microsecond precision |
uuid |
uuid |
|
json, jsonb |
string |
Iceberg supports a maximum decimal precision of 38. If a PostgreSQL table has a numeric(p,s) column where p > 38, pg2iceberg fails on start, and also fails on schema evolution. This is intentional to avoid silent data corruption.
Unconstrained numeric columns (no precision specified) default to decimal(38,18).
| Change | Iceberg behavior |
|---|---|
ADD COLUMN (nullable) |
Appends a column with the next field id |
DROP COLUMN |
Soft-drop: column stays in schema, becomes nullable; older data files keep resolving |
ALTER COLUMN TYPE (legal promotion: int → long, float → double, decimal precision increase) |
Type-promote in place, field id preserved |
ALTER COLUMN TYPE (illegal: narrowing, cross-family) |
Refuses with an actionable error; operator must re-snapshot |
RENAME COLUMN |
Treated as drop + add (pgoutput doesn't carry attribute OIDs to detect renames) |
SET / DROP NOT NULL |
Invisible: pgoutput Relation messages don't carry nullability |
Any catalog implementing the Iceberg REST Catalog spec should work. The following have been verified end-to-end:
| Catalog | Authentication | Vended Credentials? |
|---|---|---|
| Apache Polaris | OAuth2 / Bearer | Yes (credential_mode: vended) |
| Apache REST reference (testcontainers) | None | No |
| Cloudflare R2 Data Catalog | Bearer | Yes (not yet re-verified end-to-end) |
| AWS Glue | SigV4 with IAM | No (not yet re-verified end-to-end) |
cd example/single
docker compose up -d --waitOpen http://localhost:8123/play and run:
SELECT * FROM rideshare.`rideshare.rides`You should see new rows appearing as the simulator drives PG.
Configuration is YAML-first; see config.example.yaml for the full surface. CLI flags and env vars override individual fields.
| Env var | YAML field | Description |
|---|---|---|
POSTGRES_URL |
source.postgres.dsn |
PostgreSQL connection URL |
TABLES |
tables |
List of source tables to replicate |
MODE |
source.mode |
logical (default) or query |
SLOT_NAME |
source.logical.slot_name |
Replication slot (default: pg2iceberg_slot) |
PUBLICATION_NAME |
source.logical.publication_name |
Publication (default: pg2iceberg_pub) |
ICEBERG_CATALOG_URL |
sink.catalog_uri |
Iceberg REST catalog URL |
WAREHOUSE |
sink.warehouse |
Iceberg warehouse path (s3://bucket/prefix/) |
NAMESPACE |
sink.namespace |
Iceberg namespace |
S3_ENDPOINT |
sink.s3_endpoint |
S3 endpoint URL |
S3_ACCESS_KEY / S3_SECRET_KEY / S3_REGION |
sink.s3_* |
S3 credentials and region |
STATE_POSTGRES_URL |
state.postgres_url |
Optional separate Postgres for coord state |
| Mode | Description |
|---|---|
static (default) |
Operator-supplied S3 keys in YAML / env |
iam |
AWS SDK env / IMDS / EC2-metadata credential chain |
vended |
Per-table credentials from the catalog (Polaris / Snowflake / Tabular). Requires header.x-iceberg-access-delegation: vended-credentials in the REST request, which pg2iceberg sets automatically when credential_mode: vended. |
State persisted in _pg2iceberg:
- Cluster fingerprint (
pipeline_meta.system_identifier): stamped at first startup. A differentIDENTIFY_SYSTEMvalue on subsequent runs (e.g. accidental DSN swap, blue-green cutover) returnsSystemIdMismatchand refuses to start. - Slot-tamper baseline (
flushed_lsn): the highest LSN we've ever acked. Compared againstslot.confirmed_flush_lsnat startup to catch external advancement (pg_replication_slot_advance, drop-recreate, straypg_recvlogical). - Per-table snapshot state (
tables,snapshot_progress):snapshot_complete+pg_oidper table; mid-snapshot resume cursor cleared on completion.
To resume from where the previous process left off, just restart with the same config — coord cursors + slot LSN are sufficient.
To reset everything (cleanup PG-side state) without manually dropping tables:
pg2iceberg cleanup --config <path>
# Iceberg tables remain — drop them via the catalog separately if you want a full reset.To use a separate Postgres for coord state (instead of the source DB):
state:
postgres_url: postgresql://user:pass@host:5432/state-db
coordinator_schema: _pg2icebergThe default test target is sim-based DST + unit tests; no Docker needed.
cargo test --workspaceIntegration tests against testcontainers (PG 16-alpine + MinIO + Apache iceberg-rest) are gated behind the integration feature:
# Colima users: see docs/getting-started/installation.md for the env vars
cargo test --workspace --features integration -- --test-threads=1The integration suite covers the prod Coordinator (PG-replication path), the prod PgClient (pgoutput decoding against real PG), and a full end-to-end LogicalLifecycle against the docker stack. Roughly 17 tests.
A future maintenance pass will expose Prometheus metrics on a configurable address. The Metrics trait is wired through every hot path; the operational HTTP endpoint that exports them is not yet wired.
The binary uses tracing with tracing-subscriber for stdout output. Set RUST_LOG=info,pg2iceberg=debug (or finer) to control verbosity.
Distributed-tracing export via OTLP is not yet wired. The tracing instrumentation that exists is stdout-only today.
When sink.meta_namespace is set, pg2iceberg writes operational telemetry to four Iceberg tables under that namespace:
| Table | Row written by |
|---|---|
<meta_ns>.commits |
Each successful materializer cycle (per-table) |
<meta_ns>.compactions |
Each successful compaction commit |
<meta_ns>.maintenance |
Each expire_snapshots / clean_orphans op (per-table) |
<meta_ns>.markers |
Blue-green marker alignment (when marker mode is enabled) |
See docs/usage/metadata-tables.md for the full schemas.
A fifth, <meta_ns>.checkpoints, is exposed via the record_checkpoint API for callers who want to emit one manually but is not auto-written — recovery doesn't depend on periodic checkpoint saves (the slot's confirmed_flush_lsn plus per-table state is sufficient).