Skip to content

Commit

Permalink
feat(clickhouse): add ClickHouse import script PE-6848
Browse files Browse the repository at this point in the history
Adds a bash script, scripts/clickhouse-import, that imports exported
Parquet into ClickHouse. It assumes ClickHouse is running on its default
ports and uses the 'clickhouse client' CLI command to perform the
import.

To test, start the ClickHouse service and load the schema:

docker-compose up clickhouse
clickhouse client --multiquery < resources/clickhouse-schema.sql

Export Parquet (assuming core service running):

mkdir -p data/parquet
curl -X POST \
  -H "Authorization: Bearer access" \
  -H "Content-Type: application/json" \
  -d '{ "outputDir": "data/parquet", "startHeight": 800000, "endHeight": 800050, "maxFileRows": 1000000 }' \
  "http://localhost:4000/ar-io/admin/export-parquet"

Then run the import script:

./scripts/clickhouse-import

You can then query the transaction tables in ClickHouse:

echo "SELECT COUNT(*) FROM transactions" | clickhouse client
  • Loading branch information
djwhitt committed Nov 20, 2024
1 parent 5c98e0a commit c40ed8e
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 11 deletions.
4 changes: 4 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ services:
image: clickhouse/clickhouse-server:${CLICKHOUSE_IMAGE_TAG:-24}
profiles:
- clickhouse
ports:
- 8123:8123
- 8443:8443
- 9000:9000
ulimits:
nofile:
soft: '262144'
Expand Down
30 changes: 20 additions & 10 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,38 @@
flake-utils.url = "github:numtide/flake-utils";
};

outputs = { self, nixpkgs, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let pkgs = import nixpkgs { inherit system; };
in {
outputs =
{
self,
nixpkgs,
flake-utils,
}:
flake-utils.lib.eachDefaultSystem (
system:
let
pkgs = import nixpkgs { inherit system; };
in
{
devShells = {
default = pkgs.mkShell {
name = "ar-io-node-shell";
buildInputs = with pkgs; [
clickhouse
duckdb
gnumake
graphviz
nodejs_20
yarn
nodePackages.typescript-language-server
yaml-language-server
nodejs_20
openjdk
sqlite-interactive
python311
duckdb
sqlite-interactive
yaml-language-server
yarn
];
};
};
});
}
);

nixConfig.bash-prompt = "\\e[32m[ar-io-node-shell]$\\e[0m ";
}
60 changes: 60 additions & 0 deletions scripts/clickhouse-import
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env bash

parquet_path=${PARQUET_PATH:-data/parquet}

for blocks_parquet in "$parquet_path/blocks"*.parquet; do
height_range=$(basename "$blocks_parquet" | sed 's/blocks-//;s/-rowCount:[0-9]*\.parquet//')
txs_parquet=$(ls data/parquet/transactions-"$height_range"-*.parquet)
tags_parquet=$(ls data/parquet/tags-"$height_range"-*.parquet)

echo "Importing $blocks_parquet..."
clickhouse client --query="INSERT INTO staging_blocks FROM INFILE '$blocks_parquet' FORMAT Parquet;"

echo "Importing $txs_parquet..."
clickhouse client --query="INSERT INTO staging_transactions FROM INFILE '$txs_parquet' FORMAT Parquet;"

echo "Importing $tags_parquet"
clickhouse client --query="INSERT INTO staging_tags FROM INFILE '$tags_parquet' FORMAT Parquet;"

for prefix in "" "owner_" "target_"; do
cat <<EOF | clickhouse client
INSERT INTO ${prefix}transactions
SELECT
txs.height,
txs.block_transaction_index,
txs.is_data_item,
txs.id,
txs.anchor,
txs.owner_address,
txs.target,
txs.quantity,
txs.reward,
txs.data_size,
txs.content_type,
txs.format,
txs.data_root,
txs.parent AS parent_id,
blocks.indep_hash AS block_indep_hash,
blocks.block_timestamp,
blocks.previous_block AS block_previous_block,
txs.indexed_at,
CASE
WHEN tags.id IS NOT NULL THEN COUNT(*)
ELSE 0
END AS tags_count,
CASE
WHEN tags.id IS NOT NULL THEN
arrayMap((x) -> (x.2, x.3), arraySort((x) -> x.1, groupArray((tag_index, tag_name, tag_value))))
ELSE []
END AS tags
FROM staging_transactions txs
LEFT JOIN staging_tags tags ON txs.height = tags.height AND txs.id = tags.id
JOIN staging_blocks blocks ON txs.height = blocks.height
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, tags.id
EOF
done

clickhouse client --query="TRUNCATE TABLE staging_blocks"
clickhouse client --query="TRUNCATE TABLE staging_transactions"
clickhouse client --query="TRUNCATE TABLE staging_tags"
done
128 changes: 128 additions & 0 deletions src/database/clickhouse/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
CREATE TABLE IF NOT EXISTS staging_blocks (
indep_hash BLOB,
height UInt64 NOT NULL,
previous_block BLOB,
nonce BLOB NOT NULL,
hash BLOB NOT NULL,
block_timestamp INTEGER NOT NULL,
tx_count UInt32 NOT NULL,
block_size UInt64,
PRIMARY KEY (height)
) Engine = MergeTree()
ORDER BY (height);

CREATE TABLE IF NOT EXISTS staging_transactions (
id BLOB NOT NULL,
indexed_at UInt32,
block_transaction_index UInt16,
is_data_item Boolean,
target BLOB,
quantity Decimal(20,0),
reward Decimal(20,0),
anchor BLOB NOT NULL,
data_size UInt64,
content_type String,
format UInt8,
height UInt64,
owner_address BLOB,
data_root BLOB,
parent BLOB,
"offset" UInt64,
"size" UInt64,
data_offset UInt64,
owner_offset UInt64,
owner_size UInt64,
owner BLOB,
signature_offset UInt64,
signature_size UInt64,
signature_type UInt64,
PRIMARY KEY (height, block_transaction_index, is_data_item, id)
) Engine = MergeTree()
ORDER BY (height, block_transaction_index, is_data_item, id);

CREATE TABLE IF NOT EXISTS staging_tags (
height UInt32 NOT NULL,
id BLOB NOT NULL,
tag_index UInt16 NOT NULL,
indexed_at UInt64,
tag_name BLOB NOT NULL,
tag_value BLOB NOT NULL,
is_data_item BOOLEAN NOT NULL,
PRIMARY KEY (height, id)
) Engine = MergeTree()
ORDER BY (height, id);

CREATE TABLE IF NOT EXISTS transactions (
height UInt32 NOT NULL,
block_transaction_index UInt16,
is_data_item Boolean,
id BLOB NOT NULL,
anchor BLOB NOT NULL,
owner_address BLOB,
target BLOB,
quantity Decimal(20,0) NOT NULL,
reward Decimal(20,0) NOT NULL,
data_size UInt64,
content_type String,
format UInt8 NOT NULL,
data_root BLOB,
parent_id BLOB,
block_indep_hash BLOB,
block_timestamp UInt32,
block_previous_block BLOB,
indexed_at UInt64,
updated_at UInt64,
tags Array(Tuple(BLOB, BLOB)),
PRIMARY KEY (height, block_transaction_index, is_data_item, id)
) Engine = MergeTree()
ORDER BY (height, block_transaction_index, is_data_item, id);

CREATE TABLE IF NOT EXISTS owner_transactions (
height UInt32 NOT NULL,
block_transaction_index UInt16,
is_data_item Boolean,
id BLOB NOT NULL,
anchor BLOB NOT NULL,
owner_address BLOB,
target BLOB,
quantity Decimal(20,0) NOT NULL,
reward Decimal(20,0) NOT NULL,
data_size UInt64,
content_type String,
format UInt8 NOT NULL,
data_root BLOB,
parent_id BLOB,
block_indep_hash BLOB,
block_timestamp UInt32,
block_previous_block BLOB,
indexed_at UInt64,
updated_at UInt64,
tags Array(Tuple(BLOB, BLOB)),
PRIMARY KEY (owner_address, height, block_transaction_index, is_data_item, id)
) Engine = MergeTree()
ORDER BY (owner_address, height, block_transaction_index, is_data_item, id);

CREATE TABLE IF NOT EXISTS target_transactions (
height UInt32 NOT NULL,
block_transaction_index UInt16,
is_data_item Boolean,
id BLOB NOT NULL,
anchor BLOB NOT NULL,
owner_address BLOB,
target BLOB,
quantity Decimal(20,0) NOT NULL,
reward Decimal(20,0) NOT NULL,
data_size UInt64,
content_type String,
format UInt8 NOT NULL,
data_root BLOB,
parent_id BLOB,
block_indep_hash BLOB,
block_timestamp UInt32,
block_previous_block BLOB,
indexed_at UInt64,
updated_at UInt64,
tags Array(Tuple(BLOB, BLOB)),
PRIMARY KEY (target, height, block_transaction_index, is_data_item, id)
) Engine = MergeTree()
ORDER BY (target, height, block_transaction_index, is_data_item, id);
2 changes: 1 addition & 1 deletion src/database/duckdb/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ CREATE TABLE IF NOT EXISTS blocks (
block_timestamp INTEGER NOT NULL,
tx_count INTEGER NOT NULL,
block_size UINTEGER
);
);

0 comments on commit c40ed8e

Please sign in to comment.