標準 API の完全なコード例は こちら を参照してください。
接続設定については、Configuration を参照してください。
サポートされているデータ型と Go の型マッピングについては、Data Types を参照してください。
database/sql (または「標準」API) を使用すると、標準インターフェイスに準拠することで、アプリケーションコードを基盤となるデータベースに依存させたくないケースでもこのクライアントを利用できます。その代わりに、抽象化や間接化のレイヤーが増え、ClickHouse とは必ずしも整合しないプリミティブも含まれます。ただし、ツールが複数のデータベースに接続する必要がある場面では、こうしたコストは通常許容できます。
また、このクライアントはトランスポート層として HTTP もサポートしています。この場合でも、最適なパフォーマンスを得るためにデータはネイティブフォーマットでエンコードされます。
接続は、clickhouse://<host>:<port>?<query_option>=<value> 形式の DSN 文字列と Open メソッドを使用する方法、または clickhouse.OpenDB メソッドを使用する方法のいずれかで確立できます。後者は database/sql の仕様には含まれていませんが、sql.DB インスタンスを返します。このメソッドでは profiling などの機能を利用できますが、これらを database/sql の仕様を通じて公開する明確な手段はありません。
func Connect() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}
func ConnectDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完全な例
以降のすべての例では、特に明記がない限り、ClickHouse の conn 変数は作成済みで使用可能であるものとします。
ほとんどの設定オプションは ClickHouse API と共通です。共通の設定については Configuration を参照してください。以下の SQL 固有の DSN パラメータを利用できます。
hosts - 負荷分散とフェイルオーバーのための、単一アドレスのホストをカンマ区切りで指定したリスト - 複数のノードへの接続 を参照してください。
username/password - 認証情報 - Authentication を参照してください
database - 現在のデフォルト database を選択します
dial_timeout - duration 文字列は、符号付きの場合もある 10 進数の数値を並べたもので、各値には必要に応じて小数部と、300ms、1s のような単位の接尾辞を付けられます。有効な時間単位は ms、s、m です。
connection_open_strategy - random/in_order (デフォルトは random) - 複数のノードへの接続 を参照してください
round_robin - 一連の server からラウンドロビンで選択します
in_order - 指定された順序で、最初に稼働中の server が選択されます
debug - デバッグ出力を有効にします (ブール値)
compress - 圧縮アルゴリズムを指定します - none (デフォルト) 、zstd、lz4、gzip、deflate、br。true に設定すると lz4 が使用されます。ネイティブ通信でサポートされるのは lz4 と zstd のみです。
compress_level - 圧縮レベル (デフォルトは 0) です。圧縮 を参照してください。これはアルゴリズムごとに異なります。
gzip - -2 (最高速) から 9 (最高圧縮)
deflate - -2 (最高速) から 9 (最高圧縮)
br - 0 (最高速) から 11 (最高圧縮)
zstd、lz4 - 無視されます
secure - セキュアな SSL 接続を確立します (デフォルトは false)
skip_verify - 証明書の検証をスキップします (デフォルトは false)
block_buffer_size - block バッファサイズを制御できます。BlockBufferSize を参照してください。 (デフォルトは 2)
func ConnectSettings() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d/%s?username=%s&password=%s&dial_timeout=10s&connection_open_strategy=round_robin&debug=true&compress=lz4", env.Host, env.Port, env.Database, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完全なサンプル
デフォルトでは、接続はネイティブプロトコルで確立されます。HTTP が必要な場合は、DSN を変更して HTTP プロトコルを含めるか、接続オプションで Protocol を指定することで有効にできます。
func ConnectHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
})
return conn.Ping()
}
func ConnectDSNHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完全なサンプル
HTTP のみセッションが必要なのは、HTTP トランスポートを使用する場合のみです。native TCP connection には、セッションが自動的に組み込まれています。
HTTP を使用する場合は、session_id を設定として渡すことで、一時テーブルなどのセッションに紐づく機能を有効にできます。
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
Settings: clickhouse.Settings{
"session_id": uuid.NewString(),
},
})
if _, err := conn.Exec(`DROP TABLE IF EXISTS example`); err != nil {
return err
}
_, err = conn.Exec(`
CREATE TEMPORARY TABLE IF NOT EXISTS example (
Col1 UInt8
)
`)
if err != nil {
return err
}
scope, err := conn.Begin()
if err != nil {
return err
}
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 10; i++ {
_, err := batch.Exec(
uint8(i),
)
if err != nil {
return err
}
}
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
)
for rows.Next() {
if err := rows.Scan(&col1); err != nil {
return err
}
fmt.Printf("row: col1=%d\n", col1)
}
// 注意: rows.Err() のチェックを省略しないこと
if err := rows.Err(); err != nil {
return err
}
完全な例
接続を確立したら、Execメソッドを使って sql ステートメントを実行できます。
conn.Exec(`DROP TABLE IF EXISTS example`)
_, err = conn.Exec(`
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
_, err = conn.Exec("INSERT INTO example VALUES (1, 'test-1')")
完全な例
このメソッドはコンテキストを受け取れません。デフォルトではバックグラウンドコンテキストで実行されます。必要に応じて ExecContext を使用してください。詳しくは コンテキストの使用 を参照してください。
バッチ処理のセマンティクスは、Being メソッドで sql.Tx を作成することで実現できます。これを基に、INSERT ステートメントを指定して Prepare メソッドを使うと、バッチを取得できます。これにより sql.Stmt が返され、Exec メソッドを使ってそこに行を追加できます。元の sql.Tx に対して Commit が実行されるまで、バッチはメモリ上に蓄積されます。
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
_, err := batch.Exec(
uint8(42),
"ClickHouse", "Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
map[string]string{"key": "value"},
map[string]string{"key": "value"},
map[string]string{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return scope.Commit()
完全な例
単一の行をクエリするには、QueryRow メソッドを使用します。これは *sql.Row を返し、このオブジェクトに対して Scan を呼び出すことで、カラムの値を格納する変数へのポインタを渡せます。QueryRowContext バリアントを使うと、background 以外のコンテキストを渡せます。詳しくは コンテキストの使用 を参照してください。
row := conn.QueryRow("SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
完全な例
複数の行を反復処理するには、Query メソッドを使用します。これは *sql.Rows 構造体を返し、Next を呼び出して各行を順に処理できます。これに相当する QueryContext では、コンテキストを渡せます。
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
}
// 注意: rows.Err() のチェックを省略しないこと
if err := rows.Err(); err != nil {
return err
}
完全なサンプル
非同期挿入は、ExecContext メソッドで INSERT を実行することで行えます。以下のように、非同期モードを有効にしたコンテキストを渡す必要があります。これにより、クライアントがサーバーでの INSERT 完了まで待機するか、データを受信した時点で応答するかをユーザーが指定できます。これは実質的に、パラメーター wait_for_async_insert を制御します。
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if _, err := conn.Exec(ddl); err != nil {
return err
}
ctx := clickhouse.Context(context.Background(), clickhouse.WithStdAsync(false))
{
for i := 0; i < 100; i++ {
_, err := conn.ExecContext(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"))
if err != nil {
return err
}
}
}
サンプル全体
標準 API は、ClickHouse API と同様のパラメータバインディング機能をサポートしており、Exec、Query、QueryRow メソッド (およびそれらに対応する Context バリアント) にパラメータを渡せます。位置指定、名前付き、番号付きのパラメータをサポートしています。
var count uint64
// 位置バインド
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// 数値バインド
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// 名前付きバインド
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)
完全な例
特別なケースに関する注意事項も引き続き適用されます。
標準 API では、ClickHouse API と同様に、コンテキストを通じてデッドライン、キャンセルシグナル、そのほかのリクエストスコープの値を渡せます。ClickHouse API と異なり、これはメソッドの Context バリアントを使って実現します。つまり、デフォルトでバックグラウンドコンテキストを使用する Exec のようなメソッドには、先頭のパラメーターとしてコンテキストを渡せる ExecContext というバリアントがあります。これにより、アプリケーションフローのどの段階でもコンテキストを渡せます。たとえば、ConnContext を使って接続を確立するときや、QueryRowContext を使ってクエリの行を取得するときにコンテキストを渡せます。利用可能なすべてのメソッドの例を以下に示します。
コンテキストを使ってデッドライン、キャンセルシグナル、クエリ ID、クォータキー、接続設定を渡す方法の詳細については、ClickHouse API の コンテキストの使用 を参照してください。
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"async_insert": "1",
}))
// コンテキストを使用してクエリをキャンセルできる
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRowContext(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}
// クエリにデッドラインを設定する - 絶対時刻に達するとクエリがキャンセルされる。この場合もコネクションのみが終了し、
// クエリはClickHouse内で完了まで実行され続ける
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.PingContext(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}
// ログ内のクエリトレースを補助するためにクエリIDを設定する(例: system.query_log を参照)
var one uint8
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(uuid.NewString()))
if err = conn.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// クォータキーを設定する - まずクォータを作成する
if _, err = conn.ExecContext(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}
// コンテキストを使用してクエリをキャンセルできる
ctx, cancel = context.WithCancel(context.Background())
// キャンセル前にいくつかの結果を取得する
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_block_size": "1",
}))
rows, err := conn.QueryContext(ctx, "SELECT sleepEachRow(1), number FROM numbers(100);")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
col2 uint8
)
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
if col2 > 3 {
fmt.Println("expected cancel")
return nil
}
return err
}
fmt.Printf("row: col2=%d\n", col2)
if col2 == 3 {
cancel()
}
}
// 注意: rows.Err() のチェックを省略しないこと
if err := rows.Err(); err != nil {
return err
}
完全なサンプル
ClickHouse API と同様に、カラムの型情報を利用して、正しい型の変数の実行時インスタンスを作成し、それを Scan に渡すことができます。これにより、型が不明な場合でもカラムを読み取ることができます。
const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.QueryContext(context.Background(), query)
if err != nil {
return err
}
defer rows.Close()
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
vars := make([]interface{}, len(columnTypes))
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}
// 注意: rows.Err() のチェックを省略しないでください
if err := rows.Err(); err != nil {
return err
}
完全なサンプル
外部テーブル を使用すると、クライアントは SELECT クエリとともに ClickHouse にデータを送信できます。このデータは一時テーブルに格納され、評価のためにクエリ内で使用できます。
クエリとともに外部データを送信するには、ユーザーは ext.NewTable を使用して外部テーブルを作成し、それを Context 経由で渡す前に構築する必要があります。
table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}
for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}
table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.QueryContext(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
// 注意: rows.Err() のチェックを省略しないこと
if err := rows.Err(); err != nil {
return err
}
var count uint64
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)
完全なサンプル
ClickHouse は、TCP と HTTP の両方のトランスポートで trace context の伝播 をサポートしています。clickhouse.WithSpan を使用すると、context 経由でクエリに span を関連付けることができます。
HTTP トランスポートの制限ClickHouse server は標準の traceparent / tracestate HTTP ヘッダーを受け付けますが、clickhouse-go の HTTP トランスポートは現在これらを送信しないため、HTTP では WithSpan は効果がありません。回避策として、接続オプションの HttpHeaders でヘッダーを手動で設定できます。
var count uint64
rows := conn.QueryRowContext(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
// 注意: rows.Err() のチェックを省略しないこと
if err := rows.Err(); err != nil {
return err
}
fmt.Printf("count: %d\n", count)
完全なサンプル
標準 API は、ネイティブの ClickHouse API と同じ圧縮アルゴリズム、つまりブロックレベルでの lz4 および zstd による圧縮をサポートしています。さらに、HTTP 接続では gzip、deflate、br による圧縮もサポートされています。これらのいずれかを有効にすると、挿入時およびクエリ応答時にブロック単位で圧縮が行われます。その他のリクエスト、たとえば ping やクエリリクエストは、非圧縮のままです。これは lz4 および zstd のオプションと同様の動作です。
接続の確立に OpenDB メソッドを使用する場合は、Compression 設定を渡せます。これには、圧縮レベルを指定する機能も含まれます (以下を参照) 。DSN を使用して sql.Open で接続する場合は、compress パラメータを使用します。これは、gzip、deflate、br、zstd、lz4 のいずれかの圧縮アルゴリズム、またはブールフラグを指定できます。true に設定した場合は、lz4 が使用されます。デフォルトは none、つまり圧縮は無効です。
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionBrotli,
Level: 5,
},
Protocol: clickhouse.HTTP,
})
完全な例
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s&compress=gzip&compress_level=5", env.Host, env.HttpPort, env.Username, env.Password))
完全な例
適用する圧縮レベルは、DSN パラメータ compress_level または Compression オプションの Level フィールドで指定できます。既定値は 0 ですが、アルゴリズムによって異なります。
gzip - -2 (最速) ~9 (最高圧縮)
deflate - -2 (最速) ~9 (最高圧縮)
br - 0 (最速) ~11 (最高圧縮)
zstd, lz4 - 無視されます