Nimbus is a managed data ingestion service that continuously writes to open table formats such as Iceberg.
Run docker compose up -d to start the local stack:
- RisingWave: creates Iceberg tables and manages streams
- Lakekeeper: Iceberg catalog
- Postgres: RisingWave metadata store and a sample CDC source
- MinIO: object storage for Iceberg and RisingWave
Connect to RisingWave:
psql "postgresql://root:@localhost:4566/dev"Then run:
CREATE CONNECTION IF NOT EXISTS lakekeeper_catalog_conn WITH (
type = 'iceberg',
warehouse.path = 'risingwave-warehouse',
s3.region = 'us-east-1',
s3.access.key = 'minioadmin',
s3.secret.key = 'minioadmin',
s3.endpoint = 'http://minio:9301',
catalog.type = 'rest',
catalog.uri = 'http://lakekeeper:8181/catalog/',
s3.path.style.access = true
);
SET iceberg_engine_connection = 'public.lakekeeper_catalog_conn';
CREATE TABLE IF NOT EXISTS my_iceberg_table (
id INT PRIMARY KEY,
name VARCHAR
) ENGINE = iceberg;Connect to Postgres:
psql "postgres://postgres:postgres@localhost:5432/postgres"Then run:
CREATE TABLE IF NOT EXISTS my_pg_table (
id INTEGER PRIMARY KEY,
name VARCHAR
);bus = Nimbus(conn_info)
j = bus.json_ingestor()
stream = bus.create_stream(
name="direct_ingest",
destination=bus.table("my_iceberg_table"),
source=j,
)
stream.deploy()
try:
for i in range(10):
j.ingest_record({"id": i, "name": f"name_{i}"})
j.flush()
finally:
j.close()Or use ack to wait for each ingest:
for i in range(10):
ack = j.ingest_record({"id": i, "name": f"name_{i}"})
ack.wait()Connect to RisingWave:
psql "postgresql://root:@localhost:4566/dev"Then run:
SELECT * FROM my_iceberg_table;bus = Nimbus(conn_info)
pg_source = bus.postgres_source(
name="my_pg_source",
config=pg_source_config,
)
stream = bus.create_stream(
name="pg_cdc_ingest",
destination=bus.table("my_iceberg_table"),
source=bus.postgres_cdc(
postgres_source=pg_source,
postgres_table_name="public.my_pg_table",
config={
"snapshot": True,
"backfill.parallelism": 4,
},
),
)
stream.deploy()Connect to Postgres:
psql "postgres://postgres:postgres@localhost:5432/postgres"Then run:
INSERT INTO my_pg_table VALUES (10, 'pg_10'), (11, 'pg_11');Connect to RisingWave:
psql "postgresql://root:@localhost:4566/dev"Then run:
SELECT * FROM my_iceberg_table;