El cliente oficial de Rust para conectarse a ClickHouse, desarrollado originalmente por Paul Loyd. El código fuente del cliente está disponible en el repositorio de GitHub.
- Usa
serde para codificar y decodificar filas.
- Admite atributos de
serde: skip_serializing, skip_deserializing, rename.
- Usa el formato
RowBinary a través del transporte HTTP.
- Está previsto cambiar a
Native sobre TCP.
- Admite TLS (mediante las features
native-tls y rustls-tls).
- Admite compresión y descompresión (LZ4).
- Proporciona APIs para consultar o insertar datos, ejecutar DDLs y realizar procesamiento por lotes en el cliente.
- Proporciona mocks útiles para pruebas unitarias.
Para usar el crate, añade lo siguiente a tu Cargo.toml:
[dependencies]
clickhouse = "0.12.2"
[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
Véase también: página en crates.io.
lz4 (habilitada de forma predeterminada) — habilita las variantes Compression::Lz4 y Compression::Lz4Hc(_). Si está habilitada, Compression::Lz4 se usa de forma predeterminada para todas las consultas excepto WATCH.
native-tls — admite URL con el esquema HTTPS mediante hyper-tls, que se enlaza con OpenSSL.
rustls-tls — admite URL con el esquema HTTPS mediante hyper-rustls, que no se enlaza con OpenSSL.
inserter — habilita client.inserter().
test-util — añade mocks. Consulta el ejemplo. Úsalo solo en dev-dependencies.
watch — habilita la funcionalidad client.watch. Consulta la sección correspondiente para obtener más información.
uuid — añade serde::uuid para trabajar con el crate uuid.
time — añade serde::time para trabajar con el crate time.
Al conectarte a ClickHouse mediante una URL HTTPS, debe estar habilitada la funcionalidad native-tls o rustls-tls.
Si ambas están habilitadas, rustls-tls tendrá prioridad.
Compatibilidad de versiones de ClickHouse
El cliente es compatible con las versiones LTS o posteriores de ClickHouse, así como con ClickHouse Cloud.
Las versiones del servidor de ClickHouse anteriores a la v22.6 procesan RowBinary de forma incorrecta en algunos casos poco frecuentes.
Puede usar la versión v0.11+ y habilitar la feature wa-37420 para resolver este problema. Nota: esta feature no debe usarse con versiones más recientes de ClickHouse.
Nuestro objetivo es abarcar varios escenarios de uso del cliente con los ejemplos del repositorio del cliente. La información general está disponible en el README de ejemplos.
Si algo no queda claro o falta en los ejemplos o en la documentación siguiente, no dudes en ponerte en contacto con nosotros.
El crate ch2rs es útil para generar un tipo de fila desde ClickHouse.
Creación de una instancia de cliente
Reutiliza los clientes ya creados o clónalos para aprovechar el pool de conexiones subyacente de hyper.
use clickhouse::Client;
let client = Client::default()
// debe incluir tanto el protocolo como el puerto
.with_url("http://localhost:8123")
.with_user("name")
.with_password("123")
.with_database("test");
Conexión HTTPS o a ClickHouse Cloud
HTTPS funciona con las features de Cargo rustls-tls o native-tls.
A continuación, cree el cliente como de costumbre. En este ejemplo, se usan variables de entorno para almacenar los datos de conexión:
La URL debe incluir tanto el protocolo como el puerto; por ejemplo, https://instance.clickhouse.cloud:8443.
fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}
let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));
Véase también:
use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let table_name = "some";
let mut cursor = client
.query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
.bind(Identifier(table_name))
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- El marcador de posición
?fields se reemplaza por no, name (campos de Row).
- El marcador de posición
? se reemplaza por los valores de las siguientes llamadas a bind().
- Se pueden usar los prácticos métodos
fetch_one::<Row>() y fetch_all::<Row>() para obtener la primera fila o todas las filas, respectivamente.
sql::Identifier puede usarse para pasar nombres de tabla.
Nota: como toda la respuesta se envía por streaming, los cursores pueden devolver un error incluso después de haber producido algunas filas. Si esto ocurre en tu caso de uso, puedes probar query(...).with_option("wait_end_of_query", "1") para habilitar el búfer de respuesta en el servidor. Más detalles. La opción buffer_size también puede ser útil.
Usa wait_end_of_query con precaución al seleccionar filas, ya que puede aumentar el consumo de memoria en el servidor y probablemente reducir el rendimiento general.
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- Si no se llama a
end(), el INSERT se aborta.
- Las filas se envían progresivamente como un flujo para repartir la carga de red.
- ClickHouse inserta lotes de forma atómica solo si todas las filas caben en la misma partición y su número es menor que
max_insert_block_size.
Inserción asíncrona (agrupación en lotes en el servidor)
Puede usar las inserciones asíncronas de ClickHouse para evitar la agrupación por lotes en el cliente de los datos entrantes. Para ello, basta con proporcionar la opción async_insert al método insert (o incluso a la propia instancia de Client, para que afecte a todas las llamadas a insert).
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");
Véase también:
Funcionalidad Inserter (agrupación por lotes en el cliente)
Requiere la funcionalidad inserter de Cargo.
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
// no olvides finalizar el inserter durante el apagado de la aplicación
// y hacer commit de las filas restantes. `.end()` también proporcionará estadísticas.
inserter.end().await?;
Inserter finaliza la operación de insert activa en commit() si se alcanza cualquiera de los umbrales (max_bytes, max_rows, period).
- El intervalo entre el cierre de
INSERT activos puede ajustarse con with_period_bias para evitar picos de carga provocados por insertores en paralelo.
Inserter::time_left() puede usarse para detectar cuándo termina el período actual. Vuelve a llamar a Inserter::commit() para comprobar los límites si tu flujo emite elementos con poca frecuencia.
- Los umbrales de tiempo se implementan con el crate quanta para acelerar
inserter. No se usa si test-util está habilitado (por lo tanto, el tiempo puede gestionarse con tokio::time::advance() en pruebas personalizadas).
- Todas las filas entre llamadas a
commit() se insertan en la misma sentencia INSERT.
No olvides hacer flush si quieres terminar/finalizar la inserción:
Con un despliegue de un solo nodo, basta con ejecutar los DDLs así:
client.query("DROP TABLE IF EXISTS some").execute().await?;
Sin embargo, en despliegues en clúster con un balanceador de carga o ClickHouse Cloud, se recomienda esperar a que el DDL se aplique en todas las réplicas mediante la opción wait_end_of_query. Esto puede hacerse así:
client
.query("DROP TABLE IF EXISTS some")
.with_option("wait_end_of_query", "1")
.execute()
.await?;
Puede aplicar varios ajustes de ClickHouse mediante el método with_option. Por ejemplo:
let numbers = client
.query("SELECT number FROM system.numbers")
// Esta configuración se aplicará únicamente a esta consulta en particular;
// sobrescribirá la configuración de cliente global.
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;
Además de query, funciona de forma similar con los métodos insert e inserter; asimismo, se puede llamar al mismo método en la instancia Client para establecer la configuración global para todas las consultas.
Con .with_option, puede establecer la opción query_id para identificar consultas en el log de consultas de ClickHouse.
let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", "some-query-id")
.fetch_all::<u64>()
.await?;
Además de query, funciona de manera similar con los métodos insert e inserter.
Si establece query_id manualmente, asegúrese de que sea único. Los UUIDs son una buena opción para ello.
Véase también: ejemplo de query_id en el repositorio del cliente.
Al igual que con query_id, puedes establecer session_id para ejecutar las sentencias en la misma sesión. session_id puede establecerse de forma global a nivel de cliente, o en cada llamada a query, insert o inserter.
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("session_id", "my-session");
En los despliegues en clúster, debido a la falta de “sesiones persistentes”, debes estar conectado a un nodo concreto del clúster para poder utilizar esta función correctamente, ya que, por ejemplo, un balanceador de carga round-robin no garantiza que las solicitudes posteriores las procese el mismo nodo de ClickHouse.
Véase también: ejemplo de session_id en el repositorio del cliente.
Si usas autenticación mediante proxy o necesitas enviar cabeceras HTTP personalizadas, puedes hacerlo así:
let client = Client::default()
.with_url("http://localhost:8123")
.with_header("X-My-Header", "hello");
Véase también: ejemplo de cabeceras HTTP personalizadas en el repositorio del cliente.
Cliente HTTP personalizado
Esto puede ser útil para ajustar la configuración subyacente del pool de conexiones HTTP.
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;
let connector = HttpConnector::new(); // o HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// Tiempo durante el cual se mantiene activo un socket inactivo en el lado del cliente (en milisegundos).
// Debe ser bastante menor que el timeout KeepAlive del servidor ClickHouse,
// que era de 3 segundos por defecto en versiones anteriores a la 23.11, y de 10 segundos a partir de esa versión.
.pool_idle_timeout(Duration::from_millis(2_500))
// Establece el número máximo de conexiones Keep-Alive inactivas permitidas en el pool.
.pool_max_idle_per_host(4)
.build(connector);
let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
Este ejemplo se basa en la API Hyper heredada y puede cambiar en el futuro.
Véase también: ejemplo de cliente HTTP personalizado en el repositorio del cliente.
Consulte también estos ejemplos adicionales:
(U)Int(8|16|32|64|128) se corresponde con los tipos (u|i)(8|16|32|64|128) equivalentes, tanto de ida como de vuelta, o con newtypes basados en ellos.
(U)Int256 no es compatible directamente, pero hay una solución alternativa.
Float(32|64) se corresponde con f(32|64) equivalentes, tanto de ida como de vuelta, o con newtypes basados en ellos.
Decimal(32|64|128) se corresponde con i(32|64|128) equivalentes, tanto de ida como de vuelta, o con newtypes basados en ellos. Es más práctico usar fixnum u otra implementación de números de punto fijo con signo.
Boolean se corresponde con bool, tanto de ida como de vuelta, o con newtypes basados en él.
String se corresponde con cualquier tipo de cadena o bytes, tanto de ida como de vuelta; por ejemplo, &str, &[u8], String, Vec<u8> o SmartString. También se admiten newtypes. Para almacenar bytes, considere usar serde_bytes, ya que es más eficiente.
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
byte_slice: &'a [u8],
}
FixedString(N) se admite como un arreglo de bytes, p. ej., [u8; N].
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
fixed_str: [u8; 16], // FixedString(16)
}
use serde_repr::{Deserialize_repr, Serialize_repr};
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
level: Level,
}
#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
}
UUID se mapea a/desde uuid::Uuid mediante serde::uuid. Requiere la funcionalidad uuid.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::uuid")]
uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4")]
ipv4: std::net::Ipv4Addr,
}
Date se convierte desde y hacia u16, o un newtype basado en él, y representa un número de días transcurridos desde 1970-01-01. Además, time::Date también es compatible mediante serde::time::date, lo que requiere la funcionalidad time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: u16,
#[serde(with = "clickhouse::serde::time::date")]
date: Date,
}
Date32 se convierte desde y hacia i32, o un newtype basado en este, y representa una cantidad de días transcurridos desde 1970-01-01. Además, time::Date es compatible usando serde::time::date32, lo que requiere la funcionalidad time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: i32,
#[serde(with = "clickhouse::serde::time::date32")]
date: Date,
}
DateTime se convierte desde y hacia u32 o un newtype basado en este, y representa una cantidad de segundos transcurridos desde la época Unix. Además, time::OffsetDateTime es compatible mediante serde::time::datetime, lo que requiere la feature time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
dt: OffsetDateTime,
}
DateTime64(_) se puede mapear desde y hacia i32 o un newtype basado en él, y representa el tiempo transcurrido desde la época de Unix. Además, time::OffsetDateTime es compatible mediante serde::time::datetime64::*, lo que requiere la funcionalidad time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: i64, // s/us/ms/ns transcurridos dependiendo de `DateTime64(X)`
#[serde(with = "clickhouse::serde::time::datetime64::secs")]
dt64s: OffsetDateTime, // `DateTime64(0)`
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
dt64ms: OffsetDateTime, // `DateTime64(3)`
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
dt64us: OffsetDateTime, // `DateTime64(6)`
#[serde(with = "clickhouse::serde::time::datetime64::nanos")]
dt64ns: OffsetDateTime, // `DateTime64(9)`
}
Tuple(A, B, ...) se mapea a/desde (A, B, ...) o a un newtype basado en este.
Array(_) se mapea a/desde cualquier slice, p. ej., Vec<_>, &[_]. También se admiten tipos nuevos.
Map(K, V) se comporta como Array((K, V)).
LowCardinality(_) se admite de forma transparente.
Nullable(_) se mapea a/desde Option<_>. Para los helpers clickhouse::serde::*, añade ::option.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4::option")]
ipv4_opt: Option<Ipv4Addr>,
}
Nested es compatible al proporcionar varios arrays renombrados.
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(rename = "items.name")]
items_name: Vec<String>,
#[serde(rename = "items.count")]
items_count: Vec<u32>,
}
- Se admiten tipos
Geo. Point se comporta como una tupla (f64, f64), y el resto de los tipos no son más que secuencias de puntos.
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
point: Point,
ring: Ring,
polygon: Polygon,
multi_polygon: MultiPolygon,
line_string: LineString,
multi_line_string: MultiLineString,
}
- Los tipos de datos
Variant, Dynamic, (nuevo) JSON todavía no se admiten.
El crate proporciona utilidades para crear mocks del servidor CH y probar consultas DDL, SELECT, INSERT y WATCH. Esta funcionalidad se puede habilitar con la funcionalidad test-util. Úsela solo como dependencia de desarrollo.
Consulte el ejemplo.
La causa más común del error CANNOT_READ_ALL_DATA es que la definición de la fila en la aplicación no coincide con la de ClickHouse.
Considere la siguiente tabla:
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
Luego, si EventLog está definido en la aplicación con tipos que no coinciden, por ejemplo:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: String, // <- ¡debería ser u32 en cambio!
}
Al insertar los datos, puede aparecer el siguiente error:
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
En este ejemplo, esto se soluciona definiendo correctamente la estructura EventLog:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: u32
}
- Los tipos de datos
Variant, Dynamic y JSON (nuevo) todavía no se admiten.
- La vinculación de parámetros del lado del servidor todavía no se admite; consulta esta incidencia para seguir su estado.
Si tienes alguna pregunta o necesitas ayuda, no dudes en ponerte en contacto con nosotros en el Slack de la comunidad o a través de GitHub issues.