메인 콘텐츠로 건너뛰기
ClickHouse에 연결하는 공식 Rust 클라이언트로, Paul Loyd가 처음 개발했습니다. 클라이언트의 소스 코드는 GitHub 리포지토리에서 확인할 수 있습니다.

개요

  • 행 인코딩/디코딩에 serde를 사용합니다.
  • serde 속성인 skip_serializing, skip_deserializing, rename을 지원합니다.
  • HTTP 전송을 통해 RowBinary 포맷을 사용합니다.
    • 향후 TCP 기반 Native로 전환할 계획입니다.
  • TLS(native-tlsrustls-tls 기능을 통해)를 지원합니다.
  • 압축 및 압축 해제(LZ4)를 지원합니다.
  • 데이터 조회 및 삽입, DDLs 실행, 클라이언트 측 배칭을 위한 API를 제공합니다.
  • 단위 테스트에 편리한 mock을 제공합니다.

설치

크레이트를 사용하려면 Cargo.toml에 다음을 추가하십시오:
[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
관련 항목: crates.io 페이지.

Cargo 기능

  • lz4 (기본적으로 활성화됨) — Compression::Lz4Compression::Lz4Hc(_) variant를 활성화합니다. 활성화되면 WATCH를 제외한 모든 쿼리에 기본적으로 Compression::Lz4가 사용됩니다.
  • native-tls — OpenSSL에 링크되는 hyper-tls를 통해 HTTPS 스킴 URL을 지원합니다.
  • rustls-tls — OpenSSL에 링크되지 않는 hyper-rustls를 통해 HTTPS 스킴 URL을 지원합니다.
  • inserterclient.inserter()를 활성화합니다.
  • test-util — mock 객체를 추가합니다. 예시를 참조하십시오. dev-dependencies에서만 사용하십시오.
  • watchclient.watch 기능을 활성화합니다. 자세한 내용은 해당 섹션을 참조하십시오.
  • uuiduuid 크레이트와 함께 사용할 수 있도록 serde::uuid를 추가합니다.
  • timetime 크레이트와 함께 사용할 수 있도록 serde::time을 추가합니다.
HTTPS URL로 ClickHouse에 연결할 때는 native-tls 또는 rustls-tls 기능 중 하나를 활성화해야 합니다. 둘 다 활성화된 경우 rustls-tls 기능이 우선 적용됩니다.

ClickHouse 버전 호환성

이 클라이언트는 ClickHouse의 LTS 버전 이상과 ClickHouse Cloud를 지원합니다. v22.6 이전의 ClickHouse 서버는 드물게 RowBinary를 잘못 처리하는 경우가 있습니다. 이 문제를 해결하려면 v0.11+를 사용하고 wa-37420 기능을 활성화할 수 있습니다. 참고: 이 기능은 더 최신 버전의 ClickHouse에서는 사용하지 않아야 합니다.

예시

클라이언트 리포지토리의 examples에서는 클라이언트 사용의 다양한 시나리오를 다룹니다. 개요는 examples README에서 확인할 수 있습니다. 예시 또는 아래 문서에서 불분명하거나 누락된 내용이 있으면 문의해 주세요.

사용

ch2rs 크레이트는 ClickHouse에서 행 유형을 생성할 때 유용합니다.

클라이언트 인스턴스 생성

생성한 클라이언트를 재사용하거나 복제하여 기본 hyper 연결 풀을 재사용하세요.
use clickhouse::Client;

let client = Client::default()
    // 프로토콜과 포트를 모두 포함해야 합니다
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");

HTTPS 또는 ClickHouse Cloud 연결

HTTPS는 rustls-tls 또는 native-tls cargo 기능과 함께 사용할 수 있습니다. 그런 다음 평소처럼 클라이언트를 생성합니다. 이 예시에서는 환경 변수를 사용해 연결 정보를 저장합니다:
URL에는 protocol과 포트가 모두 포함되어야 합니다. 예: https://instance.clickhouse.cloud:8443.
fn read_env_var(key: &str) -> String {
    env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
    .with_url(read_env_var("CLICKHOUSE_URL"))
    .with_user(read_env_var("CLICKHOUSE_USER"))
    .with_password(read_env_var("CLICKHOUSE_PASSWORD"));
관련 항목:
  • 클라이언트 리포지토리의 ClickHouse Cloud HTTPS 예시를 참조하십시오. 이는 온프레미스 HTTPS 연결에도 적용됩니다.

행 조회

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let table_name = "some";
let mut cursor = client
    .query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
    .bind(Identifier(table_name))
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • 플레이스홀더 ?fieldsno, name(Row의 필드)로 대체됩니다.
  • 플레이스홀더 ?는 뒤이은 bind() 호출에서 지정한 값으로 대체됩니다.
  • 첫 번째 행 또는 모든 행을 가져오려면 각각 편리한 fetch_one::<Row>()fetch_all::<Row>() 메서드를 사용할 수 있습니다.
  • 테이블 이름을 바인딩하는 데 sql::Identifier를 사용할 수 있습니다.
참고: 전체 응답이 스트리밍되므로 커서는 일부 행을 반환한 후에도 오류를 반환할 수 있습니다. 사용 환경에서 이런 문제가 발생하면 서버 측 응답 버퍼링을 활성화하기 위해 query(...).with_option("wait_end_of_query", "1")를 시도해 볼 수 있습니다. 자세한 내용. buffer_size 옵션도 유용할 수 있습니다.
행을 조회할 때 wait_end_of_query는 주의해서 사용하세요. 서버 측 메모리 사용량이 늘어날 수 있으며, 전반적인 성능도 저하될 가능성이 높습니다.

행 삽입하기

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • end()가 호출되지 않으면 INSERT가 취소됩니다.
  • 네트워크 부하를 분산하기 위해 행은 스트림 형태로 순차적으로 전송됩니다.
  • ClickHouse는 모든 행이 동일한 파티션에 속하고 그 수가 max_insert_block_size보다 적은 경우에만 배치를 원자적으로 삽입합니다.

Async insert (서버 측 배칭)

들어오는 데이터를 클라이언트 측에서 배칭하지 않으려면 ClickHouse asynchronous inserts를 사용할 수 있습니다. 이렇게 하려면 insert 메서드에 async_insert 옵션을 지정하면 됩니다. 또는 Client 인스턴스 자체에 지정하여 모든 insert 호출에 적용되도록 할 수도 있습니다.
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("async_insert", "1")
    .with_option("wait_for_async_insert", "0");
관련 항목:

Inserter 기능 (클라이언트 측 배칭)

inserter cargo 기능이 필요합니다.
let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}

// 애플리케이션 종료 시 inserter를 반드시 종료하고
// 남은 행을 커밋하십시오. `.end()`도 통계를 제공합니다.
inserter.end().await?;
  • Inserter는 임계값(max_bytes, max_rows, period) 중 하나에 도달하면 commit()에서 진행 중인 삽입을 종료합니다.
  • 병렬 inserter로 인한 부하 급증을 피하려면 with_period_bias를 사용해 활성 INSERT 종료 사이의 인터벌에 편향을 줄 수 있습니다.
  • Inserter::time_left()를 사용하면 현재 주기가 언제 끝나는지 확인할 수 있습니다. 스트림에서 항목이 드물게 생성된다면 한도를 확인하기 위해 Inserter::commit()을 다시 호출하십시오.
  • 시간 임계값은 inserter의 성능을 높이기 위해 quanta 크레이트를 사용해 구현됩니다. test-util이 활성화된 경우에는 사용되지 않으므로, 사용자 정의 테스트에서는 tokio::time::advance()로 시간을 제어할 수 있습니다.
  • commit() 호출 사이의 모든 행은 동일한 INSERT statement에 삽입됩니다.
삽입을 종료하거나 마무리하려면 반드시 플러시하십시오:
inserter.end().await?;

DDL 실행

단일 노드 배포에서는 다음과 같이 DDL을 실행하면 됩니다:
client.query("DROP TABLE IF EXISTS some").execute().await?;
하지만 로드 밸런서 또는 ClickHouse Cloud를 사용하는 클러스터형 배포 환경에서는 wait_end_of_query 옵션을 사용해 DDL이 모든 레플리카에 적용될 때까지 기다리는 것이 좋습니다. 다음과 같이 수행할 수 있습니다:
client
    .query("DROP TABLE IF EXISTS some")
    .with_option("wait_end_of_query", "1")
    .execute()
    .await?;

ClickHouse 설정

with_option 메서드를 사용해 다양한 ClickHouse 설정을 적용할 수 있습니다. 예를 들어 다음과 같습니다:
let numbers = client
    .query("SELECT number FROM system.numbers")
    // 이 설정은 해당 쿼리에만 적용됩니다;
    // 전역 클라이언트 설정을 재정의합니다.
    .with_option("limit", "3")
    .fetch_all::<u64>()
    .await?;
query뿐만 아니라 insertinserter 메서드에서도 비슷하게 작동합니다. 또한 모든 쿼리에 대한 전역 설정을 지정하려면 Client 인스턴스에서 동일한 메서드를 호출할 수 있습니다.

쿼리 ID

.with_option을 사용하면 ClickHouse 쿼리 로그에서 쿼리를 식별할 수 있도록 query_id 옵션을 설정할 수 있습니다.
let numbers = client
    .query("SELECT number FROM system.numbers LIMIT 1")
    .with_option("query_id", "some-query-id")
    .fetch_all::<u64>()
    .await?;
query뿐만 아니라 insertinserter 메서드도 비슷하게 동작합니다.
query_id를 수동으로 설정하는 경우 고유한 값인지 확인하십시오. 이때는 UUID를 사용하는 것이 좋습니다.
관련 항목: 클라이언트 리포지토리의 query_id 예시

세션 ID

query_id와 마찬가지로 session_id를 설정하면 동일한 세션에서 SQL 문을 실행할 수 있습니다. session_id는 클라이언트 수준에서 전역으로 설정하거나, 각 query, insert, 또는 inserter 호출별로 설정할 수 있습니다.
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("session_id", "my-session");
클러스터 배포에서는 “sticky sessions”를 지원하지 않으므로, 이 기능을 제대로 활용하려면 특정 클러스터 노드에 연결해야 합니다. 예를 들어 라운드 로빈 방식의 로드 밸런서는 후속 요청이 동일한 ClickHouse 노드에서 처리된다고 보장하지 않습니다.
관련 항목: 클라이언트 리포지토리의 session_id 예시.

사용자 지정 HTTP 헤더

프록시 authentication을 사용하거나 사용자 지정 헤더를 전달해야 하는 경우 다음과 같이 설정할 수 있습니다.
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_header("X-My-Header", "hello");
관련 항목: 클라이언트 리포지토리의 사용자 지정 HTTP 헤더 예시를 확인하십시오.

사용자 지정 HTTP 클라이언트

내부 HTTP 연결 풀 설정을 조정할 때 유용할 수 있습니다.
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // 또는 HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
    // 클라이언트 측에서 특정 유휴 소켓을 유지할 시간 (밀리초 단위).
    // ClickHouse 서버의 KeepAlive 타임아웃보다 충분히 짧게 설정해야 합니다.
    // 기본값은 23.11 이전 버전에서 3초, 이후 버전에서 10초입니다.
    .pool_idle_timeout(Duration::from_millis(2_500))
    // 풀에서 허용되는 최대 유휴 Keep-Alive 연결 수를 설정합니다.
    .pool_max_idle_per_host(4)
    .build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
이 예시는 레거시 Hyper API를 사용하므로, 향후 변경될 수 있습니다.
관련 항목: 클라이언트 리포지토리의 사용자 지정 HTTP 클라이언트 예시.

데이터 타입

  • (U)Int(8|16|32|64|128)는 해당하는 (u|i)(8|16|32|64|128) 타입 또는 이를 감싼 newtype과 매핑됩니다.
  • (U)Int256은 직접 지원되지는 않지만, 우회 방법이 있습니다.
  • Float(32|64)는 해당하는 f(32|64) 또는 이를 감싼 newtype과 매핑됩니다.
  • Decimal(32|64|128)은 해당하는 i(32|64|128) 또는 이를 감싼 newtype과 매핑됩니다. fixnum이나 다른 부호 있는 고정소수점 수 구현을 사용하는 편이 더 편리합니다.
  • Booleanbool 또는 이를 감싼 newtype과 매핑됩니다.
  • String은 모든 문자열 또는 바이트 타입(예: &str, &[u8], String, Vec<u8> 또는 SmartString)과 매핑됩니다. 새 타입도 지원됩니다. 바이트를 저장할 때는 더 효율적인 serde_bytes 사용을 고려하십시오.
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
    str: &'a str,
    string: String,
    #[serde(with = "serde_bytes")]
    bytes: Vec<u8>,
    #[serde(with = "serde_bytes")]
    byte_slice: &'a [u8],
}
  • FixedString(N)은 바이트의 배열로 지원됩니다. 예를 들면 [u8; N]입니다.
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
    fixed_str: [u8; 16], // FixedString(16)
}
  • Enum(8|16)serde_repr를 통해 지원됩니다.
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
    Debug = 1,
    Info = 2,
    Warn = 3,
    Error = 4,
}
  • UUIDserde::uuid를 사용해 uuid::Uuid와 상호 변환됩니다. uuid 기능이 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::uuid")]
    uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4")]
    ipv4: std::net::Ipv4Addr,
}
  • Dateu16 또는 이를 감싼 newtype에 매핑되며, 1970-01-01 이후 경과한 일수를 나타냅니다. 또한 time::Dateserde::time::date를 사용해 지원되며, 이 경우 time 기능이 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: u16,
    #[serde(with = "clickhouse::serde::time::date")]
    date: Date,
}
  • Date32i32 또는 이를 감싼 newtype과 상호 변환되며, 1970-01-01 이후 경과한 일수를 나타냅니다. 또한 time::Dateserde::time::date32를 통해 지원되며, 이 경우 time 기능이 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: i32,
    #[serde(with = "clickhouse::serde::time::date32")]
    date: Date,
}
  • DateTimeu32 또는 이를 감싼 newtype에 매핑되거나 그로부터 매핑될 수 있으며, UNIX epoch 이후 경과한 초 수를 나타냅니다. 또한 time::OffsetDateTimeserde::time::datetime을 사용하면 지원되며, 이 경우 time 기능이 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: u32,
    #[serde(with = "clickhouse::serde::time::datetime")]
    dt: OffsetDateTime,
}
  • DateTime64(_)i32 또는 이를 감싼 새 타입(newtype)으로/에서 매핑되며, UNIX epoch 이후의 경과 시간을 나타냅니다. 또한 time::OffsetDateTimeserde::time::datetime64::*를 통해 지원되며, 이 경우 time 기능이 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: i64, // `DateTime64(X)`에 따라 경과 시간을 s/us/ms/ns 단위로 표현
    #[serde(with = "clickhouse::serde::time::datetime64::secs")]
    dt64s: OffsetDateTime,  // `DateTime64(0)`
    #[serde(with = "clickhouse::serde::time::datetime64::millis")]
    dt64ms: OffsetDateTime, // `DateTime64(3)`
    #[serde(with = "clickhouse::serde::time::datetime64::micros")]
    dt64us: OffsetDateTime, // `DateTime64(6)`
    #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
    dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...)(A, B, ...) 또는 이를 감싼 newtype과 상호 매핑됩니다.
  • Array(_)는 모든 슬라이스와 상호 매핑됩니다. 예: Vec<_>, &[_]. 새 타입도 지원됩니다.
  • Map(K, V)Array((K, V))처럼 동작합니다.
  • LowCardinality(_)는 별도 처리 없이 자연스럽게 지원됩니다.
  • Nullable(_)Option<_>와 상호 매핑됩니다. clickhouse::serde::* 도우미를 사용할 경우 ::option을 추가하십시오.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4::option")]
    ipv4_opt: Option<Ipv4Addr>,
}
  • Nested는 이름을 변경한 여러 배열을 제공하는 방식으로 지원됩니다.
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(rename = "items.name")]
    items_name: Vec<String>,
    #[serde(rename = "items.count")]
    items_count: Vec<u32>,
}
  • Geo 타입이 지원됩니다. Point(f64, f64) 형태의 튜플처럼 동작하며, 나머지 타입은 모두 포인트의 슬라이스일 뿐입니다.
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    point: Point,
    ring: Ring,
    polygon: Polygon,
    multi_polygon: MultiPolygon,
    line_string: LineString,
    multi_line_string: MultiLineString,
}
  • Variant, Dynamic, (신규) JSON 데이터 타입은 아직 지원되지 않습니다.

모킹

이 크레이트는 CH 서버를 모킹하고 DDL, SELECT, INSERT, WATCH 쿼리를 테스트하기 위한 유틸리티를 제공합니다. 이 기능은 test-util 기능을 활성화해 사용할 수 있습니다. 반드시 개발 의존성으로만 사용하십시오. 예시를 참조하십시오.

문제 해결

CANNOT_READ_ALL_DATA

CANNOT_READ_ALL_DATA 오류의 가장 일반적인 원인은 애플리케이션 측의 행 정의가 ClickHouse의 행 정의와 일치하지 않기 때문입니다. 다음 테이블을 살펴보겠습니다:
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
그런 다음, 애플리케이션 측에서 EventLog가 서로 맞지 않는 타입으로 정의된 경우(예:):
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: String, // <- u32이어야 합니다!
}
데이터 삽입 시 다음과 같은 오류가 발생할 수 있습니다:
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
이 예시에서는 EventLog struct를 올바르게 정의하여 이 문제를 해결할 수 있습니다:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: u32
}

알려진 제한 사항

  • Variant, Dynamic, (new) JSON 데이터 타입은 아직 지원되지 않습니다.
  • 서버 측 매개변수 바인딩은 아직 지원되지 않습니다. 관련 진행 상황은 이 이슈에서 확인하십시오.

문의하기

질문이 있거나 도움이 필요하시면 Community Slack 또는 GitHub issues를 통해 문의해 주세요.
마지막 수정일 2026년 6월 10일