From d5e44528717eee7e8f2348a1371cd4a53c3aa532 Mon Sep 17 00:00:00 2001 From: hongker Date: Thu, 1 Feb 2024 19:40:49 +0800 Subject: [PATCH] Add WithAllocBufferColStrProvider string column allocator for batch insert performance boost (#1181) * add ColStrProvider for column_gen * add test for WithAllocBufferColStrProvider --- lib/column/column_gen.go | 2 +- lib/column/column_gen_option.go | 45 ++++++++ tests/issues/1164_test.go | 194 ++++++++++++++++++++++++++++++++ 3 files changed, 240 insertions(+), 1 deletion(-) create mode 100644 lib/column/column_gen_option.go create mode 100644 tests/issues/1164_test.go diff --git a/lib/column/column_gen.go b/lib/column/column_gen.go index d13781af91..455be4c0e1 100644 --- a/lib/column/column_gen.go +++ b/lib/column/column_gen.go @@ -136,7 +136,7 @@ func (t Type) Column(name string, tz *time.Location) (Interface, error) { case "Point": return &Point{name: name}, nil case "String": - return &String{name: name}, nil + return &String{name: name, col: colStrProvider()}, nil case "Object('json')": return &JSONObject{name: name, root: true, tz: tz}, nil } diff --git a/lib/column/column_gen_option.go b/lib/column/column_gen_option.go new file mode 100644 index 0000000000..ea307612d3 --- /dev/null +++ b/lib/column/column_gen_option.go @@ -0,0 +1,45 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package column + +import "github.com/ClickHouse/ch-go/proto" + +// ColStrProvider defines provider of proto.ColStr +type ColStrProvider func() proto.ColStr + +// colStrProvider provide proto.ColStr for Column() when type is String +var colStrProvider ColStrProvider = defaultColStrProvider + +// defaultColStrProvider defines sample provider for proto.ColStr +func defaultColStrProvider() proto.ColStr { + return proto.ColStr{} +} + +// issue: https://github.com/ClickHouse/clickhouse-go/issues/1164 +// WithAllocBufferColStrProvider allow pre alloc buffer cap for proto.ColStr +// It is more suitable for scenarios where a lot of data is written in batches +func WithAllocBufferColStrProvider(cap int) { + colStrProvider = func() proto.ColStr { + return proto.ColStr{Buf: make([]byte, 0, cap)} + } +} + +// WithColStrProvider more flexible than WithAllocBufferColStrProvider, such as use sync.Pool +func WithColStrProvider(provider ColStrProvider) { + colStrProvider = provider +} diff --git a/tests/issues/1164_test.go b/tests/issues/1164_test.go new file mode 100644 index 0000000000..f044dff284 --- /dev/null +++ b/tests/issues/1164_test.go @@ -0,0 +1,194 @@ +package issues + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/column" + clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests" + "github.com/stretchr/testify/require" + "testing" +) + +func TestIssue1164(t *testing.T) { + var ( + conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{ + "max_execution_time": 60, + "allow_experimental_object_type": true, + }, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ) + ctx := context.Background() + require.NoError(t, err) + const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()" + err = conn.Exec(ctx, ddl) + require.NoError(t, err) + defer func() { + conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164") + }() + + column.WithAllocBufferColStrProvider(4096) + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164") + require.NoError(t, err) + + for i := 0; i < 10000; i++ { + appendErr := batch.Append(fmt.Sprintf("some_text_%d", i)) + require.NoError(t, appendErr) + } + + err = batch.Send() + require.NoError(t, err) +} + +func BenchmarkIssue1164(b *testing.B) { + // result: + //cpu: Intel(R) Xeon(R) CPU E5-26xx v4 + //BenchmarkIssue1164 + //BenchmarkIssue1164/default-10000 + //BenchmarkIssue1164/default-10000-8 100 11533744 ns/op 1992731 B/op 40129 allocs/op + //BenchmarkIssue1164/preAlloc-10000 + //BenchmarkIssue1164/preAlloc-10000-8 104 11136623 ns/op 1991154 B/op 40110 allocs/op + //BenchmarkIssue1164/default-50000 + //BenchmarkIssue1164/default-50000-8 22 49932579 ns/op 11592053 B/op 200150 allocs/op + //BenchmarkIssue1164/preAlloc-50000 + //BenchmarkIssue1164/preAlloc-50000-8 24 49687163 ns/op 11573934 B/op 200148 allocs/op + b.Run("default-10000", func(b *testing.B) { + var ( + conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{ + "max_execution_time": 60, + "allow_experimental_object_type": true, + }, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ) + ctx := context.Background() + require.NoError(b, err) + const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()" + err = conn.Exec(ctx, ddl) + require.NoError(b, err) + defer func() { + conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164") + }() + + b.ReportAllocs() + for k := 0; k < b.N; k++ { + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164") + require.NoError(b, err) + + for i := 0; i < 10000; i++ { + appendErr := batch.Append(fmt.Sprintf("some_text_%d", i)) + require.NoError(b, appendErr) + } + + err = batch.Send() + require.NoError(b, err) + } + + }) + b.Run("preAlloc-10000", func(b *testing.B) { + var ( + conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{ + "max_execution_time": 60, + "allow_experimental_object_type": true, + }, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ) + ctx := context.Background() + require.NoError(b, err) + const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()" + err = conn.Exec(ctx, ddl) + require.NoError(b, err) + defer func() { + conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164") + }() + + column.WithAllocBufferColStrProvider(4096) + + b.ReportAllocs() + for k := 0; k < b.N; k++ { + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164") + require.NoError(b, err) + + for i := 0; i < 10000; i++ { + appendErr := batch.Append(fmt.Sprintf("some_text_%d", i)) + require.NoError(b, appendErr) + } + + err = batch.Send() + require.NoError(b, err) + } + + }) + b.Run("default-50000", func(b *testing.B) { + var ( + conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{ + "max_execution_time": 60, + "allow_experimental_object_type": true, + }, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ) + ctx := context.Background() + require.NoError(b, err) + const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()" + err = conn.Exec(ctx, ddl) + require.NoError(b, err) + defer func() { + conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164") + }() + + b.ReportAllocs() + for k := 0; k < b.N; k++ { + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164") + require.NoError(b, err) + + for i := 0; i < 50000; i++ { + appendErr := batch.Append(fmt.Sprintf("some_text_%d", i)) + require.NoError(b, appendErr) + } + + err = batch.Send() + require.NoError(b, err) + } + + }) + b.Run("preAlloc-50000", func(b *testing.B) { + var ( + conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{ + "max_execution_time": 60, + "allow_experimental_object_type": true, + }, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ) + ctx := context.Background() + require.NoError(b, err) + const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()" + err = conn.Exec(ctx, ddl) + require.NoError(b, err) + defer func() { + conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164") + }() + + column.WithAllocBufferColStrProvider(4096) + + b.ReportAllocs() + for k := 0; k < b.N; k++ { + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164") + require.NoError(b, err) + + for i := 0; i < 50000; i++ { + appendErr := batch.Append(fmt.Sprintf("some_text_%d", i)) + require.NoError(b, appendErr) + } + + err = batch.Send() + require.NoError(b, err) + } + + }) + +}