メインコンテンツへスキップ
ClickHouse に接続するための公式 Rust クライアントです。もともとは Paul Loyd によって開発されました。クライアントのソースコードは GitHub リポジトリ で公開されています。

概要

  • 行のエンコード/デコードに serde を使用します。
  • serde の属性 skip_serializingskip_deserializingrename をサポートします。
  • HTTP トランスポートで RowBinary フォーマットを使用します。
    • 将来的には TCP 上の Native へ切り替える予定です。
  • TLS (native-tls および rustls-tls features 経由) をサポートします。
  • 圧縮および展開 (LZ4) をサポートします。
  • データの選択や挿入、DDL の実行、クライアント側でのバッチ処理のための API を提供します。
  • 単体テストに便利なモックを提供します。

インストール

このクレートを使用するには、以下を Cargo.toml に追加します。
[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
関連項目: crates.io のページ

Cargo フィーチャー

  • lz4 (デフォルトで有効) — Compression::Lz4 および Compression::Lz4Hc(_) のバリアントを有効にします。有効な場合、WATCH を除くすべてのクエリで、デフォルトで Compression::Lz4 が使用されます。
  • native-tls — OpenSSL にリンクする hyper-tls 経由で、HTTPS スキームの URL をサポートします。
  • rustls-tls — OpenSSL にリンクしない hyper-rustls 経由で、HTTPS スキームの URL をサポートします。
  • inserterclient.inserter() を有効にします。
  • test-util — モックを追加します。サンプル を参照してください。使用するのは dev-dependencies のみとしてください。
  • watchclient.watch 機能を有効にします。詳細は該当するセクションを参照してください。
  • uuiduuid クレート と連携するために serde::uuid を追加します。
  • timetime クレート と連携するために serde::time を追加します。
HTTPS URL 経由で ClickHouse に接続する場合は、native-tls または rustls-tls のいずれかのフィーチャーを有効にする必要があります。 両方が有効な場合は、rustls-tls が優先されます。

ClickHouse バージョンの互換性

このクライアントは、ClickHouse の LTS およびそれ以降のバージョン、ならびに ClickHouse Cloud と互換性があります。 v22.6 より前の ClickHouse サーバー では、ごくまれなケースで RowBinary が正しく処理されません。 この問題を回避するには、v0.11 以降を使用し、wa-37420 フィーチャーを有効にできます。注: この機能は、より新しい ClickHouse バージョンでは使用しないでください。

クライアントのさまざまな利用シナリオを、クライアントリポジトリ内の examples で網羅することを目指しています。概要は examples README で確認できます。 examples や以下のドキュメントに不明な点や不足している内容があれば、お気軽に お問い合わせください

使い方

ch2rs クレートを使うと、ClickHouse から行型を生成できます。

クライアントインスタンスの作成

作成済みのクライアントは再利用するか、クローンして、基盤となる hyper の接続プールを再利用してください。
use clickhouse::Client;

let client = Client::default()
    // プロトコルとポートの両方を含める必要があります
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");

HTTPS または ClickHouse Cloud への接続

HTTPS は、rustls-tls または native-tls のいずれの Cargo フィーチャーでも利用できます。 次に、通常どおりクライアントを作成します。この例では、接続情報を保存するために環境変数を使用します。
URL にはプロトコルとポートの両方を含める必要があります。たとえば https://instance.clickhouse.cloud:8443 のように指定します。
fn read_env_var(key: &str) -> String {
    env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
    .with_url(read_env_var("CLICKHOUSE_URL"))
    .with_user(read_env_var("CLICKHOUSE_USER"))
    .with_password(read_env_var("CLICKHOUSE_PASSWORD"));
関連項目:

行の選択

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let table_name = "some";
let mut cursor = client
    .query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
    .bind(Identifier(table_name))
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • プレースホルダー ?fieldsno, name (Row のフィールド) に置き換えられます。
  • プレースホルダー ? は、後続の bind() 呼び出しで指定された値に置き換えられます。
  • 先頭の1行またはすべての行を取得するには、便利な fetch_one::<Row>() メソッドと fetch_all::<Row>() メソッドをそれぞれ使用できます。
  • sql::Identifier を使ってテーブル名をバインドできます。
注: レスポンス全体がストリーミングされるため、カーソルは一部の行を返した後でもエラーを返すことがあります。ユースケース上これが問題になる場合は、query(...).with_option("wait_end_of_query", "1") を試して、サーバー側でレスポンスのバッファリングを有効にしてください。詳細はこちらbuffer_size オプションも役立つ場合があります。
行を選択する際に wait_end_of_query を使用する場合は注意してください。サーバー側のメモリ消費が増え、全体的なパフォーマンスが低下する可能性があります。

行を挿入する

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • end() が呼び出されない場合、INSERT は中断されます。
  • ネットワーク負荷を分散するため、行はストリームとして順次送信されます。
  • ClickHouse でバッチの insert がアトミックに行われるのは、すべての行が同じパーティションに収まり、かつその数が max_insert_block_size 未満の場合に限られます。

非同期 INSERT (サーバー側バッチ処理)

受信データのクライアント側でのバッチ処理を避けるには、ClickHouse の非同期挿入 を使用できます。これを行うには、insert メソッドに async_insert オプションを指定するだけです (または Client インスタンス自体に指定すれば、すべての insert 呼び出しに適用されます) 。
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("async_insert", "1")
    .with_option("wait_for_async_insert", "0");
関連項目:

Inserter 機能 (クライアント側バッチ処理)

inserter Cargo feature が必要です。
let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}

// アプリケーションのシャットダウン時には、inserterのファイナライズと
// 残りの行のコミットを忘れずに行ってください。`.end()` も統計情報を返します。
inserter.end().await?;
  • いずれかのしきい値 (max_bytesmax_rowsperiod) に達すると、Insertercommit() で進行中の insert を終了します。
  • 並列 inserter による負荷スパイクを避けるため、with_period_bias を使って、進行中の INSERT を終了する間隔に偏りを持たせることができます。
  • Inserter::time_left() は、現在の period がいつ終了するかを判定するために使用できます。ストリームでアイテムがまれにしか生成されない場合は、制限を確認するために Inserter::commit() を再度呼び出してください。
  • 時間しきい値は、inserter を高速化するために quanta クレート を使用して実装されています。test-util が有効な場合は使用されません (そのため、カスタムテストでは tokio::time::advance() で時間を制御できます) 。
  • commit() 呼び出しの間にあるすべての行は、同じ INSERT ステートメントで挿入されます。
挿入を終了して確定したい場合は、flush を忘れないでください。
inserter.end().await?;

DDLの実行

単一ノード構成では、以下のようにDDLを実行すれば十分です。
client.query("DROP TABLE IF EXISTS some").execute().await?;
ただし、ロードバランサーを使用するクラスター構成の環境や ClickHouse Cloud では、wait_end_of_query オプションを使って、DDL がすべてのレプリカに適用されるのを待つことを推奨します。これは次のように実行できます。
client
    .query("DROP TABLE IF EXISTS some")
    .with_option("wait_end_of_query", "1")
    .execute()
    .await?;

ClickHouse設定

with_option メソッドを使用すると、さまざまな ClickHouse設定を適用できます。例:
let numbers = client
    .query("SELECT number FROM system.numbers")
    // この設定は該当のクエリにのみ適用されます。
    // グローバルなクライアント設定より優先されます。
    .with_option("limit", "3")
    .fetch_all::<u64>()
    .await?;
queryに加え、insertメソッドおよびinserterメソッドでも同様に機能します。さらに、すべてのクエリに共通の設定を行うために、同じメソッドをClientインスタンスに対して呼び出すこともできます。

クエリ ID

.with_option を使うと、query_id オプションを設定して、ClickHouse のクエリログでクエリを識別できます。
let numbers = client
    .query("SELECT number FROM system.numbers LIMIT 1")
    .with_option("query_id", "some-query-id")
    .fetch_all::<u64>()
    .await?;
query に加え、insert および inserter メソッドでも同様に使用できます。
query_id を手動で設定する場合は、一意になるようにしてください。そのためには UUID が適しています。
関連項目: クライアントリポジトリの query_id の例

セッションID

query_id と同様に、session_id を設定すると、同じセッションでステートメントを実行できます。session_id は、クライアントレベルでグローバルに設定することも、queryinsert、または inserter の呼び出しごとに設定することもできます。
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("session_id", "my-session");
クラスター化されたデプロイメントでは、“スティッキーセッション” がないため、このフィーチャーを適切に利用するには、特定のクラスター ノード に接続する必要があります。たとえば、ラウンドロビン方式のロードバランサーでは、後続のリクエストが同じ ClickHouse ノードで処理されるとは限りません。
関連項目: クライアントリポジトリ内の session_id の例

カスタムHTTPヘッダー

プロキシ認証を使用している場合や、カスタムヘッダーを指定する必要がある場合は、次のように設定できます。
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_header("X-My-Header", "hello");
関連項目: クライアントリポジトリ内のカスタム HTTP ヘッダーの例を参照してください。

カスタムHTTPクライアント

これは、基盤となるHTTP接続プールの設定を調整するのに役立ちます。
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // または HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
    // クライアント側で特定のアイドル状態のソケットを保持する時間(ミリ秒単位)。
    // ClickHouse サーバーの KeepAlive タイムアウトよりも十分に短い値にすることが推奨される。
    // デフォルト値は 23.11 より前のバージョンでは 3 秒、それ以降は 10 秒。
    .pool_idle_timeout(Duration::from_millis(2_500))
    // プール内で許可されるアイドル状態の Keep-Alive 接続の最大数を設定する。
    .pool_max_idle_per_host(4)
    .build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
この例はレガシーな Hyper API に依存しており、今後変更される可能性があります。
関連項目: クライアントリポジトリにあるカスタム HTTP クライアントの例

データ型

  • (U)Int(8|16|32|64|128) は、対応する (u|i)(8|16|32|64|128) 型、またはそれらをラップした newtype と相互変換できます。
  • (U)Int256 は直接サポートされていませんが、回避策があります。
  • Float(32|64) は、対応する f(32|64)、またはそれらをラップした newtype と相互変換できます。
  • Decimal(32|64|128) は、対応する i(32|64|128)、またはそれらをラップした newtype と相互変換できます。符号付き固定小数点数には、fixnum などの実装を使うと便利です。
  • Booleanbool、またはそれをラップした newtype と相互変換できます。
  • String は、任意の文字列型またはバイト列型と相互変換できます。たとえば &str&[u8]StringVec<u8>SmartString です。newtype もサポートされています。バイト列を保存する場合は、より効率的な serde_bytes の使用を検討してください。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
    str: &'a str,
    string: String,
    #[serde(with = "serde_bytes")]
    bytes: Vec<u8>,
    #[serde(with = "serde_bytes")]
    byte_slice: &'a [u8],
}
  • FixedString(N) は、たとえば [u8; N] のようなバイトの配列としてサポートされています。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
    fixed_str: [u8; 16], // FixedString(16)
}
  • Enum(8|16)serde_repr を使ってサポートされています。
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
    Debug = 1,
    Info = 2,
    Warn = 3,
    Error = 4,
}
  • UUIDserde::uuid を使用することで uuid::Uuid と相互変換できます。uuid フィーチャーが必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::uuid")]
    uuid: uuid::Uuid,
}
  • IPv6std::net::Ipv6Addr との間で相互に変換されます。
  • IPv4serde::ipv4 を使用することで、std::net::Ipv4Addr との間で相互に変換されます。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4")]
    ipv4: std::net::Ipv4Addr,
}
  • Dateu16 またはそれをラップした newtype と相互変換でき、1970-01-01 からの経過日数を表します。また、time::Dateserde::time::date を使用することでサポートされますが、これには time 機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: u16,
    #[serde(with = "clickhouse::serde::time::date")]
    date: Date,
}
  • Date32i32 またはそれをラップした newtype と相互変換でき、1970-01-01 からの経過日数を表します。また、time::Dateserde::time::date32 を使用することでサポートされますが、これには time フィーチャー が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: i32,
    #[serde(with = "clickhouse::serde::time::date32")]
    date: Date,
}
  • DateTimeu32、またはそれをラップした newtype と相互に対応し、UNIX epoch からの経過秒数を表します。また、time::OffsetDateTimeserde::time::datetime を使うことでサポートされますが、その場合は time フィーチャー が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: u32,
    #[serde(with = "clickhouse::serde::time::datetime")]
    dt: OffsetDateTime,
}
  • DateTime64(_)i32 またはそれをラップした newtype と相互変換でき、UNIX epoch からの経過時間を表します。また、time::OffsetDateTimeserde::time::datetime64::* を使うことでサポートされますが、その場合は time フィーチャー が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: i64, // `DateTime64(X)` に応じて s/us/ms/ns 単位の経過時間
    #[serde(with = "clickhouse::serde::time::datetime64::secs")]
    dt64s: OffsetDateTime,  // `DateTime64(0)`
    #[serde(with = "clickhouse::serde::time::datetime64::millis")]
    dt64ms: OffsetDateTime, // `DateTime64(3)`
    #[serde(with = "clickhouse::serde::time::datetime64::micros")]
    dt64us: OffsetDateTime, // `DateTime64(6)`
    #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
    dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...)(A, B, ...) またはそれをラップした newtype と相互に対応します。
  • Array(_) は任意のスライス (例: Vec<_>, &[_]) と相互に対応します。独自型もサポートされます。
  • Map(K, V)Array((K, V)) と同様に振る舞います。
  • LowCardinality(_) は透過的にサポートされます。
  • Nullable(_)Option<_> と相互に対応します。clickhouse::serde::* ヘルパーを使う場合は ::option を追加してください。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4::option")]
    ipv4_opt: Option<Ipv4Addr>,
}
  • 複数の配列をリネームして指定することで、Nested をサポートできます。
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(rename = "items.name")]
    items_name: Vec<String>,
    #[serde(rename = "items.count")]
    items_count: Vec<u32>,
}
  • Geo 型がサポートされています。Point(f64, f64) のタプルとして扱われ、その他の型はいずれもポイントのスライスです。
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    point: Point,
    ring: Ring,
    polygon: Polygon,
    multi_polygon: MultiPolygon,
    line_string: LineString,
    multi_line_string: MultiLineString,
}
  • VariantDynamic、(新しい) JSON データ型はまだサポートされていません。

モック

このクレート は、CH server をモックし、DDL、SELECTINSERTWATCH クエリをテストするためのユーティリティを提供します。この機能は test-util フィーチャー を有効にすると利用できます。必ず 開発用依存関係としてのみ使用してください。 Exampleを参照してください。

トラブルシューティング

CANNOT_READ_ALL_DATA

CANNOT_READ_ALL_DATA エラーの最も一般的な原因は、アプリケーション側の行定義が ClickHouse 側の行定義と一致していないことです。 次のテーブルを考えてみましょう。
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
次に、たとえばアプリケーション側で EventLog が異なる型で定義されている場合:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: String, // <- u32 にすべきです!
}
データの挿入時に、次のエラーが発生することがあります。
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
この例では、EventLog 構造体を正しく定義することで修正できます。
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: u32
}

既知の制限事項

  • VariantDynamic、 (新しい) JSON データ型は、まだサポートされていません。
  • サーバー側のパラメータバインディングは、まだサポートされていません。進捗状況については、this issue を参照してください。

お問い合わせ

ご不明な点やサポートが必要な場合は、Community Slack または GitHub issues でお気軽にご連絡ください。
最終更新日 2026年6月10日