Официальный Rust-клиент для подключения к ClickHouse, изначально разработанный Paul Loyd. Исходный код клиента доступен в репозитории на GitHub.
- Использует
serde для кодирования и декодирования строк.
- Поддерживает атрибуты
serde: skip_serializing, skip_deserializing, rename.
- Использует формат
RowBinary по HTTP.
- В будущем планируется перейти на
Native по TCP.
- Поддерживает TLS (через возможности
native-tls и rustls-tls).
- Поддерживает сжатие и декомпрессию (LZ4).
- Предоставляет API для выборки и вставки данных, выполнения DDL-запросов и клиентского батчинга.
- Предоставляет удобные моки для модульного тестирования.
Чтобы использовать крейт, добавьте следующее в Cargo.toml:
[dependencies]
clickhouse = "0.12.2"
[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
См. также: страница crates.io.
lz4 (включена по умолчанию) — включает варианты Compression::Lz4 и Compression::Lz4Hc(_). Если эта возможность включена, то по умолчанию для всех запросов, кроме WATCH, используется Compression::Lz4.
native-tls — поддерживает URL-адреса со схемой HTTPS через hyper-tls, который линкуется с OpenSSL.
rustls-tls — поддерживает URL-адреса со схемой HTTPS через hyper-rustls, который не линкуется с OpenSSL.
inserter — включает client.inserter().
test-util — добавляет моки. См. пример. Используйте только в dev-dependencies.
watch — включает функциональность client.watch. Подробности см. в соответствующем разделе.
uuid — добавляет serde::uuid для работы с крейтом uuid.
time — добавляет serde::time для работы с крейтом time.
При подключении к ClickHouse через URL-адрес со схемой HTTPS должна быть включена возможность native-tls или rustls-tls.
Если включены обе, приоритет будет у rustls-tls.
Совместимость версий ClickHouse
Клиент совместим с LTS-версиями ClickHouse и более новыми версиями, а также с ClickHouse Cloud.
ClickHouse server версий ниже v22.6 в некоторых редких случаях некорректно обрабатывает RowBinary.
Чтобы решить эту проблему, можно использовать v0.11+ и включить возможность wa-37420. Примечание: эту возможность не следует использовать с более новыми версиями ClickHouse.
Мы стараемся охватить различные сценарии использования клиента в примерах из репозитория клиента. Обзор доступен в README примеров.
Если в примерах или в приведённой ниже документации что-то непонятно или чего-то не хватает, смело свяжитесь с нами.
Крейт ch2rs позволяет генерировать тип строки из ClickHouse.
Создание экземпляра клиента
Повторно используйте созданные клиенты или клонируйте их, чтобы переиспользовать базовый пул соединений hyper.
use clickhouse::Client;
let client = Client::default()
// должен содержать протокол и порт
.with_url("http://localhost:8123")
.with_user("name")
.with_password("123")
.with_database("test");
HTTPS или подключение к ClickHouse Cloud
HTTPS работает с cargo-возможностями rustls-tls и native-tls.
Затем создайте клиент как обычно. В этом примере переменные окружения используются для хранения сведений о подключении:
URL должен включать и протокол, и порт, например https://instance.clickhouse.cloud:8443.
fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}
let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));
См. также:
use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let table_name = "some";
let mut cursor = client
.query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
.bind(Identifier(table_name))
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- Плейсхолдер
?fields заменяется на no, name (поля Row).
- Плейсхолдер
? заменяется значениями в последующих вызовах bind().
- Для получения первой строки или всех строк соответственно можно использовать удобные методы
fetch_one::<Row>() и fetch_all::<Row>().
sql::Identifier можно использовать для привязки имён таблиц.
Примечание: поскольку весь ответ передаётся потоком, курсоры могут возвращать ошибку даже после того, как уже было получено несколько строк. Если это происходит в вашем случае, попробуйте query(...).with_option("wait_end_of_query", "1"), чтобы включить буферизацию ответа на стороне сервера. Подробнее. Параметр buffer_size тоже может быть полезен.
Используйте wait_end_of_query с осторожностью при выборке строк, так как это может привести к повышенному потреблению памяти на стороне сервера и, вероятно, снизит общую производительность.
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- Если
end() не вызвать, INSERT будет прерван.
- Строки отправляются постепенно, в виде потока, чтобы распределить сетевую нагрузку.
- ClickHouse выполняет атомарную вставку батчей только в том случае, если все строки попадают в одну и ту же партицию, а их количество меньше
max_insert_block_size.
Async insert (батчинг на стороне сервера)
Вы можете использовать асинхронные вставки ClickHouse, чтобы избежать батчинга входящих данных на стороне клиента. Для этого достаточно передать опцию async_insert в метод insert (или даже самому экземпляру Client, чтобы она применялась ко всем вызовам insert).
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");
См. также:
Возможность Inserter (батчинг на стороне клиента)
Требуется cargo-возможность inserter.
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
// не забудьте завершить inserter при завершении работы приложения
// и выполнить коммит оставшихся строк. `.end()` также вернёт статистику.
inserter.end().await?;
Inserter завершает активную вставку в commit(), если достигнут любой из порогов (max_bytes, max_rows, period).
- Интервал между завершениями активных
INSERT можно сместить с помощью with_period_bias, чтобы избежать всплесков нагрузки при параллельной работе нескольких Inserter.
Inserter::time_left() можно использовать, чтобы определить, когда закончится текущий период. Если ваш поток редко выдает элементы, снова вызовите Inserter::commit(), чтобы проверить ограничения.
- Пороги по времени реализованы с использованием крейта quanta для ускорения
inserter. Не используется, если включен test-util (поэтому в пользовательских тестах временем можно управлять через tokio::time::advance()).
- Все строки между вызовами
commit() вставляются в рамках одного оператора INSERT.
Не забудьте выполнить flush, если хотите завершить/финализировать вставку:
При одноузловом развертывании достаточно выполнить DDL-запросы следующим образом:
client.query("DROP TABLE IF EXISTS some").execute().await?;
Однако при кластерных развертываниях с балансировщиком нагрузки или в ClickHouse Cloud рекомендуется дождаться, пока DDL будет применён на всех репликах, используя параметр wait_end_of_query. Это можно сделать так:
client
.query("DROP TABLE IF EXISTS some")
.with_option("wait_end_of_query", "1")
.execute()
.await?;
Вы можете использовать различные настройки ClickHouse с помощью метода with_option. Например:
let numbers = client
.query("SELECT number FROM system.numbers")
// Этот параметр будет применён только к данному запросу;
// он переопределит глобальный параметр клиента.
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;
Помимо query, это также работает с методами insert и inserter; кроме того, тот же метод можно вызвать у экземпляра Client, чтобы задать глобальные настройки для всех запросов.
С помощью .with_option можно задать параметр query_id для идентификации запросов в журнале запросов ClickHouse.
let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", "some-query-id")
.fetch_all::<u64>()
.await?;
Помимо query, это аналогично работает и для методов insert и inserter.
Если вы задаёте query_id вручную, убедитесь, что он уникален. Для этого хорошо подходят UUID.
См. также: пример с query_id в репозитории клиента.
Как и в случае с query_id, вы можете задать session_id, чтобы выполнять команды в рамках одного сеанса. session_id можно задать либо глобально на уровне клиента, либо для каждого вызова query, insert или inserter.
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("session_id", "my-session");
В кластерных развертываниях из-за отсутствия “липких сеансов” для корректного использования этой возможности нужно подключаться к определённому узлу кластера, поскольку, например, балансировщик нагрузки round-robin не гарантирует, что последующие запросы будут обрабатываться одним и тем же узлом ClickHouse.
См. также: пример session_id в репозитории клиента.
Если вы используете аутентификацию через прокси или вам нужно передать произвольные заголовки, это можно сделать так:
let client = Client::default()
.with_url("http://localhost:8123")
.with_header("X-My-Header", "hello");
См. также: пример пользовательских HTTP-заголовков в репозитории клиента.
Пользовательский HTTP-клиент
Это может быть полезно для тонкой настройки параметров базового пула HTTP-соединений.
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;
let connector = HttpConnector::new(); // или HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// Как долго держать конкретный бездействующий сокет открытым на стороне клиента (в миллисекундах).
// Это значение должно быть заметно меньше тайм-аута KeepAlive сервера ClickHouse,
// который по умолчанию составлял 3 секунды для версий до 23.11 и 10 секунд после.
.pool_idle_timeout(Duration::from_millis(2_500))
// Задаёт максимальное количество бездействующих Keep-Alive соединений, допустимых в пуле.
.pool_max_idle_per_host(4)
.build(connector);
let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
Этот пример использует устаревший API Hyper и в будущем может измениться.
См. также: пример пользовательского HTTP-клиента в репозитории клиента.
См. также дополнительные примеры:
(U)Int(8|16|32|64|128) сопоставляется с соответствующими типами (u|i)(8|16|32|64|128) или newtype-обёртками на их основе, и наоборот.
(U)Int256 напрямую не поддерживается, но для него есть обходное решение.
Float(32|64) сопоставляется с соответствующими f(32|64) или newtype-обёртками на их основе, и наоборот.
Decimal(32|64|128) сопоставляется с соответствующими i(32|64|128) или newtype-обёртками на их основе, и наоборот. Удобнее использовать fixnum или другую реализацию знаковых чисел с фиксированной точкой.
Boolean сопоставляется с bool или newtype-обёртками на его основе, и наоборот.
String сопоставляется с любыми строковыми или байтовыми типами, и наоборот, например &str, &[u8], String, Vec<u8> или SmartString. Пользовательские типы также поддерживаются. Для хранения байтов рекомендуется использовать serde_bytes, поскольку это эффективнее.
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
byte_slice: &'a [u8],
}
FixedString(N) поддерживается в виде массива байтов, например [u8; N].
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
fixed_str: [u8; 16], // FixedString(16)
}
use serde_repr::{Deserialize_repr, Serialize_repr};
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
level: Level,
}
#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
}
UUID преобразуется в uuid::Uuid и обратно с помощью serde::uuid. Требуется возможность uuid.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::uuid")]
uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4")]
ipv4: std::net::Ipv4Addr,
}
Date преобразуется в/из u16 или newtype-обёртки вокруг него и представляет количество дней, прошедших с 1970-01-01. Также поддерживается time::Date при использовании serde::time::date; для этого требуется возможность time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: u16,
#[serde(with = "clickhouse::serde::time::date")]
date: Date,
}
Date32 преобразуется в/из i32 или нового типа-обёртки на его основе и представляет количество дней, прошедших с 1970-01-01. Также поддерживается time::Date при использовании serde::time::date32; для этого требуется возможность time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: i32,
#[serde(with = "clickhouse::serde::time::date32")]
date: Date,
}
DateTime преобразуется в/из u32 или нового типа-обёртки на его основе и представляет собой количество секунд, прошедших с эпохи UNIX. Также поддерживается time::OffsetDateTime при использовании serde::time::datetime; для этого требуется возможность time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
dt: OffsetDateTime,
}
DateTime64(_) преобразуется в/из i32 или newtype-обёртки вокруг него и представляет время, прошедшее с эпохи UNIX. Также поддерживается time::OffsetDateTime при использовании serde::time::datetime64::*; для этого требуется возможность time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: i64, // с/мкс/мс/нс с начала эпохи UNIX в зависимости от `DateTime64(X)`
#[serde(with = "clickhouse::serde::time::datetime64::secs")]
dt64s: OffsetDateTime, // `DateTime64(0)`
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
dt64ms: OffsetDateTime, // `DateTime64(3)`
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
dt64us: OffsetDateTime, // `DateTime64(6)`
#[serde(with = "clickhouse::serde::time::datetime64::nanos")]
dt64ns: OffsetDateTime, // `DateTime64(9)`
}
Tuple(A, B, ...) преобразуется в/из (A, B, ...) или в/из newtype-обёртки над ним.
Array(_) преобразуется в/из любого среза, например Vec<_>, &[_]. New types тоже поддерживаются.
Map(K, V) ведёт себя как Array((K, V)).
LowCardinality(_) поддерживается без каких-либо сложностей.
Nullable(_) преобразуется в/из Option<_>. Для вспомогательных средств clickhouse::serde::* добавьте ::option.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4::option")]
ipv4_opt: Option<Ipv4Addr>,
}
Nested поддерживается через указание нескольких массивов с переименованием.
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(rename = "items.name")]
items_name: Vec<String>,
#[serde(rename = "items.count")]
items_count: Vec<u32>,
}
- Поддерживаются типы
Geo. Point представляет собой кортеж (f64, f64), а все остальные типы — это просто срезы точек.
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
point: Point,
ring: Ring,
polygon: Polygon,
multi_polygon: MultiPolygon,
line_string: LineString,
multi_line_string: MultiLineString,
}
- Типы данных
Variant, Dynamic и (новый) JSON пока не поддерживаются.
Крейт предоставляет утилиты для эмуляции сервера CH и тестирования DDL, а также запросов SELECT, INSERT и WATCH. Эта функциональность включается с помощью возможности test-util. Используйте её только как зависимость для разработки.
См. пример.
Наиболее частая причина ошибки CANNOT_READ_ALL_DATA заключается в том, что описание строки на стороне приложения не совпадает с описанием в ClickHouse.
Рассмотрим следующую таблицу:
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
Затем, если EventLog определён на стороне приложения с типами, которые не совпадают, например:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: String, // <- должно быть u32!
}
При вставке данных может возникнуть следующая ошибка:
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
В этом примере проблема решается правильным определением структуры EventLog:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: u32
}
- Типы данных
Variant, Dynamic, (new) JSON пока не поддерживаются.
- Привязка параметров на стороне сервера пока не поддерживается; статус можно отслеживать в этой задаче.
Если у вас есть вопросы или вам нужна помощь, обращайтесь к нам в Community Slack или через GitHub issues. Последнее изменение 10 июня 2026 г.