ClickHouse に接続するための公式 Rust クライアントです。もともとは Paul Loyd によって開発されました。クライアントのソースコードは GitHub リポジトリ で公開されています。
- 行のエンコード/デコードに
serde を使用します。
serde の属性 skip_serializing、skip_deserializing、rename をサポートします。
- HTTP トランスポートで
RowBinary フォーマットを使用します。
- 将来的には TCP 上の
Native へ切り替える予定です。
- TLS (
native-tls および rustls-tls features 経由) をサポートします。
- 圧縮および展開 (LZ4) をサポートします。
- データの選択や挿入、DDL の実行、クライアント側でのバッチ処理のための API を提供します。
- 単体テストに便利なモックを提供します。
このクレートを使用するには、以下を Cargo.toml に追加します。
[dependencies]
clickhouse = "0.12.2"
[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
関連項目: crates.io のページ。
lz4 (デフォルトで有効) — Compression::Lz4 および Compression::Lz4Hc(_) のバリアントを有効にします。有効な場合、WATCH を除くすべてのクエリで、デフォルトで Compression::Lz4 が使用されます。
native-tls — OpenSSL にリンクする hyper-tls 経由で、HTTPS スキームの URL をサポートします。
rustls-tls — OpenSSL にリンクしない hyper-rustls 経由で、HTTPS スキームの URL をサポートします。
inserter — client.inserter() を有効にします。
test-util — モックを追加します。サンプル を参照してください。使用するのは dev-dependencies のみとしてください。
watch — client.watch 機能を有効にします。詳細は該当するセクションを参照してください。
uuid — uuid クレート と連携するために serde::uuid を追加します。
time — time クレート と連携するために serde::time を追加します。
HTTPS URL 経由で ClickHouse に接続する場合は、native-tls または rustls-tls のいずれかのフィーチャーを有効にする必要があります。
両方が有効な場合は、rustls-tls が優先されます。
このクライアントは、ClickHouse の LTS およびそれ以降のバージョン、ならびに ClickHouse Cloud と互換性があります。
v22.6 より前の ClickHouse サーバー では、ごくまれなケースで RowBinary が正しく処理されません。
この問題を回避するには、v0.11 以降を使用し、wa-37420 フィーチャーを有効にできます。注: この機能は、より新しい ClickHouse バージョンでは使用しないでください。
クライアントのさまざまな利用シナリオを、クライアントリポジトリ内の examples で網羅することを目指しています。概要は examples README で確認できます。
examples や以下のドキュメントに不明な点や不足している内容があれば、お気軽に お問い合わせください。
ch2rs クレートを使うと、ClickHouse から行型を生成できます。
作成済みのクライアントは再利用するか、クローンして、基盤となる hyper の接続プールを再利用してください。
use clickhouse::Client;
let client = Client::default()
// プロトコルとポートの両方を含める必要があります
.with_url("http://localhost:8123")
.with_user("name")
.with_password("123")
.with_database("test");
HTTPS または ClickHouse Cloud への接続
HTTPS は、rustls-tls または native-tls のいずれの Cargo フィーチャーでも利用できます。
次に、通常どおりクライアントを作成します。この例では、接続情報を保存するために環境変数を使用します。
URL にはプロトコルとポートの両方を含める必要があります。たとえば https://instance.clickhouse.cloud:8443 のように指定します。
fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}
let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));
関連項目:
use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let table_name = "some";
let mut cursor = client
.query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
.bind(Identifier(table_name))
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- プレースホルダー
?fields は no, name (Row のフィールド) に置き換えられます。
- プレースホルダー
? は、後続の bind() 呼び出しで指定された値に置き換えられます。
- 先頭の1行またはすべての行を取得するには、便利な
fetch_one::<Row>() メソッドと fetch_all::<Row>() メソッドをそれぞれ使用できます。
sql::Identifier を使ってテーブル名をバインドできます。
注: レスポンス全体がストリーミングされるため、カーソルは一部の行を返した後でもエラーを返すことがあります。ユースケース上これが問題になる場合は、query(...).with_option("wait_end_of_query", "1") を試して、サーバー側でレスポンスのバッファリングを有効にしてください。詳細はこちら。buffer_size オプションも役立つ場合があります。
行を選択する際に wait_end_of_query を使用する場合は注意してください。サーバー側のメモリ消費が増え、全体的なパフォーマンスが低下する可能性があります。
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
end() が呼び出されない場合、INSERT は中断されます。
- ネットワーク負荷を分散するため、行はストリームとして順次送信されます。
- ClickHouse でバッチの insert がアトミックに行われるのは、すべての行が同じパーティションに収まり、かつその数が
max_insert_block_size 未満の場合に限られます。
受信データのクライアント側でのバッチ処理を避けるには、ClickHouse の非同期挿入 を使用できます。これを行うには、insert メソッドに async_insert オプションを指定するだけです (または Client インスタンス自体に指定すれば、すべての insert 呼び出しに適用されます) 。
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");
関連項目:
Inserter 機能 (クライアント側バッチ処理)
inserter Cargo feature が必要です。
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
// アプリケーションのシャットダウン時には、inserterのファイナライズと
// 残りの行のコミットを忘れずに行ってください。`.end()` も統計情報を返します。
inserter.end().await?;
- いずれかのしきい値 (
max_bytes、max_rows、period) に達すると、Inserter は commit() で進行中の insert を終了します。
- 並列 inserter による負荷スパイクを避けるため、
with_period_bias を使って、進行中の INSERT を終了する間隔に偏りを持たせることができます。
Inserter::time_left() は、現在の period がいつ終了するかを判定するために使用できます。ストリームでアイテムがまれにしか生成されない場合は、制限を確認するために Inserter::commit() を再度呼び出してください。
- 時間しきい値は、
inserter を高速化するために quanta クレート を使用して実装されています。test-util が有効な場合は使用されません (そのため、カスタムテストでは tokio::time::advance() で時間を制御できます) 。
commit() 呼び出しの間にあるすべての行は、同じ INSERT ステートメントで挿入されます。
挿入を終了して確定したい場合は、flush を忘れないでください。
単一ノード構成では、以下のようにDDLを実行すれば十分です。
client.query("DROP TABLE IF EXISTS some").execute().await?;
ただし、ロードバランサーを使用するクラスター構成の環境や ClickHouse Cloud では、wait_end_of_query オプションを使って、DDL がすべてのレプリカに適用されるのを待つことを推奨します。これは次のように実行できます。
client
.query("DROP TABLE IF EXISTS some")
.with_option("wait_end_of_query", "1")
.execute()
.await?;
with_option メソッドを使用すると、さまざまな ClickHouse設定を適用できます。例:
let numbers = client
.query("SELECT number FROM system.numbers")
// この設定は該当のクエリにのみ適用されます。
// グローバルなクライアント設定より優先されます。
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;
queryに加え、insertメソッドおよびinserterメソッドでも同様に機能します。さらに、すべてのクエリに共通の設定を行うために、同じメソッドをClientインスタンスに対して呼び出すこともできます。
.with_option を使うと、query_id オプションを設定して、ClickHouse のクエリログでクエリを識別できます。
let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", "some-query-id")
.fetch_all::<u64>()
.await?;
query に加え、insert および inserter メソッドでも同様に使用できます。
query_id を手動で設定する場合は、一意になるようにしてください。そのためには UUID が適しています。
関連項目: クライアントリポジトリの query_id の例
query_id と同様に、session_id を設定すると、同じセッションでステートメントを実行できます。session_id は、クライアントレベルでグローバルに設定することも、query、insert、または inserter の呼び出しごとに設定することもできます。
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("session_id", "my-session");
クラスター化されたデプロイメントでは、“スティッキーセッション” がないため、このフィーチャーを適切に利用するには、特定のクラスター ノード に接続する必要があります。たとえば、ラウンドロビン方式のロードバランサーでは、後続のリクエストが同じ ClickHouse ノードで処理されるとは限りません。
関連項目: クライアントリポジトリ内の session_id の例。
プロキシ認証を使用している場合や、カスタムヘッダーを指定する必要がある場合は、次のように設定できます。
let client = Client::default()
.with_url("http://localhost:8123")
.with_header("X-My-Header", "hello");
関連項目: クライアントリポジトリ内のカスタム HTTP ヘッダーの例を参照してください。
これは、基盤となるHTTP接続プールの設定を調整するのに役立ちます。
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;
let connector = HttpConnector::new(); // または HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// クライアント側で特定のアイドル状態のソケットを保持する時間(ミリ秒単位)。
// ClickHouse サーバーの KeepAlive タイムアウトよりも十分に短い値にすることが推奨される。
// デフォルト値は 23.11 より前のバージョンでは 3 秒、それ以降は 10 秒。
.pool_idle_timeout(Duration::from_millis(2_500))
// プール内で許可されるアイドル状態の Keep-Alive 接続の最大数を設定する。
.pool_max_idle_per_host(4)
.build(connector);
let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
この例はレガシーな Hyper API に依存しており、今後変更される可能性があります。
関連項目: クライアントリポジトリにあるカスタム HTTP クライアントの例。
(U)Int(8|16|32|64|128) は、対応する (u|i)(8|16|32|64|128) 型、またはそれらをラップした newtype と相互変換できます。
(U)Int256 は直接サポートされていませんが、回避策があります。
Float(32|64) は、対応する f(32|64)、またはそれらをラップした newtype と相互変換できます。
Decimal(32|64|128) は、対応する i(32|64|128)、またはそれらをラップした newtype と相互変換できます。符号付き固定小数点数には、fixnum などの実装を使うと便利です。
Boolean は bool、またはそれをラップした newtype と相互変換できます。
String は、任意の文字列型またはバイト列型と相互変換できます。たとえば &str、&[u8]、String、Vec<u8>、SmartString です。newtype もサポートされています。バイト列を保存する場合は、より効率的な serde_bytes の使用を検討してください。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
byte_slice: &'a [u8],
}
FixedString(N) は、たとえば [u8; N] のようなバイトの配列としてサポートされています。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
fixed_str: [u8; 16], // FixedString(16)
}
use serde_repr::{Deserialize_repr, Serialize_repr};
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
level: Level,
}
#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
}
UUID は serde::uuid を使用することで uuid::Uuid と相互変換できます。uuid フィーチャーが必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::uuid")]
uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4")]
ipv4: std::net::Ipv4Addr,
}
Date は u16 またはそれをラップした newtype と相互変換でき、1970-01-01 からの経過日数を表します。また、time::Date も serde::time::date を使用することでサポートされますが、これには time 機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: u16,
#[serde(with = "clickhouse::serde::time::date")]
date: Date,
}
Date32 は i32 またはそれをラップした newtype と相互変換でき、1970-01-01 からの経過日数を表します。また、time::Date も serde::time::date32 を使用することでサポートされますが、これには time フィーチャー が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: i32,
#[serde(with = "clickhouse::serde::time::date32")]
date: Date,
}
DateTime は u32、またはそれをラップした newtype と相互に対応し、UNIX epoch からの経過秒数を表します。また、time::OffsetDateTime も serde::time::datetime を使うことでサポートされますが、その場合は time フィーチャー が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
dt: OffsetDateTime,
}
DateTime64(_) は i32 またはそれをラップした newtype と相互変換でき、UNIX epoch からの経過時間を表します。また、time::OffsetDateTime も serde::time::datetime64::* を使うことでサポートされますが、その場合は time フィーチャー が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: i64, // `DateTime64(X)` に応じて s/us/ms/ns 単位の経過時間
#[serde(with = "clickhouse::serde::time::datetime64::secs")]
dt64s: OffsetDateTime, // `DateTime64(0)`
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
dt64ms: OffsetDateTime, // `DateTime64(3)`
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
dt64us: OffsetDateTime, // `DateTime64(6)`
#[serde(with = "clickhouse::serde::time::datetime64::nanos")]
dt64ns: OffsetDateTime, // `DateTime64(9)`
}
Tuple(A, B, ...) は (A, B, ...) またはそれをラップした newtype と相互に対応します。
Array(_) は任意のスライス (例: Vec<_>, &[_]) と相互に対応します。独自型もサポートされます。
Map(K, V) は Array((K, V)) と同様に振る舞います。
LowCardinality(_) は透過的にサポートされます。
Nullable(_) は Option<_> と相互に対応します。clickhouse::serde::* ヘルパーを使う場合は ::option を追加してください。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4::option")]
ipv4_opt: Option<Ipv4Addr>,
}
- 複数の配列をリネームして指定することで、
Nested をサポートできます。
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(rename = "items.name")]
items_name: Vec<String>,
#[serde(rename = "items.count")]
items_count: Vec<u32>,
}
Geo 型がサポートされています。Point は (f64, f64) のタプルとして扱われ、その他の型はいずれもポイントのスライスです。
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
point: Point,
ring: Ring,
polygon: Polygon,
multi_polygon: MultiPolygon,
line_string: LineString,
multi_line_string: MultiLineString,
}
Variant、Dynamic、(新しい) JSON データ型はまだサポートされていません。
このクレート は、CH server をモックし、DDL、SELECT、INSERT、WATCH クエリをテストするためのユーティリティを提供します。この機能は test-util フィーチャー を有効にすると利用できます。必ず 開発用依存関係としてのみ使用してください。
Exampleを参照してください。
CANNOT_READ_ALL_DATA エラーの最も一般的な原因は、アプリケーション側の行定義が ClickHouse 側の行定義と一致していないことです。
次のテーブルを考えてみましょう。
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
次に、たとえばアプリケーション側で EventLog が異なる型で定義されている場合:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: String, // <- u32 にすべきです!
}
データの挿入時に、次のエラーが発生することがあります。
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
この例では、EventLog 構造体を正しく定義することで修正できます。
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: u32
}
Variant、Dynamic、 (新しい) JSON データ型は、まだサポートされていません。
- サーバー側のパラメータバインディングは、まだサポートされていません。進捗状況については、this issue を参照してください。
ご不明な点やサポートが必要な場合は、Community Slack または GitHub issues でお気軽にご連絡ください。