Skip to content

Commit

Permalink
feat(clickhouse-auto-import): add script + service for continuous Cli…
Browse files Browse the repository at this point in the history
…ckHouse import PE-7137

Adds a clickhouse-auto-import script that loops forever exporting
Parquet from the SQLite DB, importing it into ClickHouse, moving it to
another directory for potential archival, and pruning old data from
SQLite.

When using Docker Compose, the script is run automatically in a separate
service when the 'clickhouse' profile is in use.

Currently the script is written in bash. In the future we may integrate
it into the core service TypeScript codebase, but having a standalone
script initially allows for faster iteration.
  • Loading branch information
djwhitt committed Nov 27, 2024
1 parent d2fa13f commit 771083c
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 8 deletions.
26 changes: 26 additions & 0 deletions Dockerfile.clickhouse-auto-import
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM debian:bullseye-slim

# Install required packages and ClickHouse
RUN apt-get update && apt-get install -y \
curl \
jq \
&& rm -rf /var/lib/apt/lists/* \
&& curl https://clickhouse.com/ | sh \
&& mv clickhouse /usr/local/bin/

# Create necessary directories
WORKDIR /app
RUN mkdir -p data/parquet/imported

# Copy the auto-import script
COPY scripts/clickhouse-auto-import /app/scripts/
COPY scripts/clickhouse-import /app/scripts/

# Make scripts executable
RUN chmod +x /app/scripts/clickhouse-auto-import /app/scripts/clickhouse-import

# Environment variables
ENV ADMIN_API_KEY=""

# Run the auto-import script
CMD ["/app/scripts/clickhouse-auto-import"]
25 changes: 25 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ services:
- ${DUCKDB_DATA_PATH:-./data/duckdb}:/app/data/duckdb
- ${TEMP_DATA_PATH:-./data/tmp}:/app/data/tmp
- ${LMDB_DATA_PATH:-./data/lmdb}:/app/data/lmdb
- ${PARQUET_DATA_PATH:-./data/parquet}:/app/data/parquet
environment:
- NODE_ENV=${NODE_ENV:-production}
- LOG_LEVEL=${CORE_LOG_LEVEL:-info}
Expand Down Expand Up @@ -137,6 +138,30 @@ services:
volumes:
- ${CLICKHOUSE_DATA_PATH:-./data/clickhouse}:/var/lib/clickhouse
- ${CLICKHOUSE_LOGS_PATH:-./logs/clickhouse}:/var/log/clickhouse-server
networks:
- ar-io-network

clickhouse-auto-import:
image: clickhouse-auto-import:latest
profiles:
- clickhouse
build:
context: .
dockerfile: Dockerfile.clickhouse-auto-import
restart: on-failure
volumes:
- ${PARQUET_DATA_PATH:-./data/parquet}:/app/data/parquet
environment:
- PARQUET_DATA_PATH:${PARQUET_DATA_PATH:-./data/parquet}
- CLICKHOUSE_HOST=clickhouse
- AR_IO_HOST=core
- AR_IO_PORT=4000
- ADMIN_API_KEY=${ADMIN_API_KEY:-}
networks:
- ar-io-network
depends_on:
- core
- clickhouse

observer:
image: ghcr.io/ar-io/ar-io-observer:${OBSERVER_IMAGE_TAG:-latest}
Expand Down
94 changes: 94 additions & 0 deletions scripts/clickhouse-auto-import
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env bash

set -euo pipefail

# Load environment variables from .env if it exists
if [ -f .env ]; then
set -a
source .env
set +a
fi

# Set local variables with defaults
ar_io_host=${AR_IO_HOST:-localhost}
ar_io_port=${AR_IO_PORT:-4000}
parquet_dir=${PARQUET_DATA_PATH:-./data/parquet}
sleep_interval=${CLICKHOUSE_AUTO_IMPORT_SLEEP_INTERVAL:-60} # Export every hour by default

if [ -z "${ADMIN_API_KEY:-}" ]; then
echo "Error: ADMIN_API_KEY environment variable is not set in .env"
exit 1
fi

imported_dir="$parquet_dir/imported"
height_interval=10000
max_rows_per_file=1000000

mkdir -p "$parquet_dir" "$imported_dir"

while true; do
# Get stable height range from admin debug endpoint
debug_info=$(curl -s -H "Authorization: Bearer $ADMIN_API_KEY" "http://${ar_io_host}:${ar_io_port}/ar-io/admin/debug")
min_height=$(echo "$debug_info" | jq -r '.db.heights.minStableDataItem')
max_height=$(echo "$debug_info" | jq -r '.db.heights.maxStableDataItem')
max_indexed_at=$(echo "$debug_info" | jq -r '.db.timestamps.maxStableDataItemIndexedAt')

# Align to inverals of 10,000
current_height=$(((min_height / height_interval) * height_interval))

while [ "$current_height" -le "$max_height" ]; do
end_height=$((current_height + height_interval))

echo "Processing heights $current_height to $end_height..."

# Export to Parquet files using API
curl -X POST "http://${ar_io_host}:${ar_io_port}/ar-io/admin/export-parquet" \
-H "Authorization: Bearer $ADMIN_API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"outputDir\": \"$parquet_dir\",
\"startHeight\": $current_height,
\"endHeight\": $end_height,
\"maxFileRows\": $max_rows_per_file
}"

# Wait for the export to complete
while true; do
if ! status=$(curl -s -f -H "Authorization: Bearer $ADMIN_API_KEY" "http://${ar_io_host}:${ar_io_port}/ar-io/admin/export-parquet/status"); then
echo "Failed to get export status"
rm -f "$parquet_dir"/*.parquet
exit 1
fi

export_status=$(echo "$status" | jq -r '.status')
if [ "$export_status" = "completed" ]; then
break
elif [ "$export_status" = "errored" ]; then
error=$(echo "$status" | jq -r '.error')
echo "Export failed: $error"
rm -f "$parquet_dir"/*.parquet
exit 1
fi

echo "Waiting for export to complete..."
sleep 10
done

# Import Parquet files
./scripts/clickhouse-import

# Move processed files to imported directory
mv "$parquet_dir"/*.parquet "$imported_dir/"

# Prune stable data items
curl -X POST "http://${ar_io_host}:${ar_io_port}/ar-io/admin/prune-stable-data-items" \
-H "Authorization: Bearer $ADMIN_API_KEY" \
-H "Content-Type: application/json" \
-d "{\"indexedAtThreshold\": $max_indexed_at}"

current_height=$end_height
done

echo "Sleeping for $sleep_interval seconds..."
sleep "$sleep_interval"
done
15 changes: 8 additions & 7 deletions scripts/clickhouse-import
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env bash

clickhouse_host=${CLICKHOUSE_HOST:-localhost}
parquet_path=${PARQUET_PATH:-data/parquet}

for blocks_parquet in "$parquet_path/blocks"*.parquet; do
Expand All @@ -8,16 +9,16 @@ for blocks_parquet in "$parquet_path/blocks"*.parquet; do
tags_parquet=$(ls data/parquet/tags-"$height_range"-*.parquet)

echo "Importing $blocks_parquet..."
clickhouse client --query="INSERT INTO staging_blocks FROM INFILE '$blocks_parquet' FORMAT Parquet;"
clickhouse client --host "$clickhouse_host" --query="INSERT INTO staging_blocks FROM INFILE '$blocks_parquet' FORMAT Parquet;"

echo "Importing $txs_parquet..."
clickhouse client --query="INSERT INTO staging_transactions FROM INFILE '$txs_parquet' FORMAT Parquet;"
clickhouse client --host "$clickhouse_host" --query="INSERT INTO staging_transactions FROM INFILE '$txs_parquet' FORMAT Parquet;"

echo "Importing $tags_parquet"
clickhouse client --query="INSERT INTO staging_tags FROM INFILE '$tags_parquet' FORMAT Parquet;"
clickhouse client --host "$clickhouse_host" --query="INSERT INTO staging_tags FROM INFILE '$tags_parquet' FORMAT Parquet;"

for prefix in "" "owner_" "target_"; do
cat <<EOF | clickhouse client
cat <<EOF | clickhouse client --host "$clickhouse_host"
INSERT INTO ${prefix}transactions
SELECT
txs.height,
Expand Down Expand Up @@ -54,7 +55,7 @@ GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, tags.id
EOF
done

clickhouse client --query="TRUNCATE TABLE staging_blocks"
clickhouse client --query="TRUNCATE TABLE staging_transactions"
clickhouse client --query="TRUNCATE TABLE staging_tags"
clickhouse client --host "$clickhouse_host" --query="TRUNCATE TABLE staging_blocks"
clickhouse client --host "$clickhouse_host" --query="TRUNCATE TABLE staging_transactions"
clickhouse client --host "$clickhouse_host" --query="TRUNCATE TABLE staging_tags"
done
2 changes: 1 addition & 1 deletion src/database/sql/bundles/cleanup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ WHERE height < @height_threshold OR (
)

-- deleteStableDataItemsLessThanIndexedAt
DELETE FROM stable_data_items
DELETE FROM stable_data_items
WHERE indexed_at < @indexed_at_threshold

0 comments on commit 771083c

Please sign in to comment.