clickhouse_clickpipe リソースを使用して作成および管理できます。このページでは、プロバイダーのセットアップと、サポートされている各 ClickPipeタイプ の設定例を紹介します。
プロバイダーの設定
ClickPipes のサポートは、プロバイダーバージョン v3.14.0 から一般提供されています。
それ以前のバージョンを使用している場合は、alpha リリースが必要です。詳細については
provider changelog
を確認してください。
terraform {
required_providers {
clickhouse = {
source = "ClickHouse/clickhouse"
version = ">= 3.14.0"
}
}
}
provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}
リソースの概要
clickhouse_clickpipe リソースには、次のトップレベルの引数があります。
| 引数 | 必須 | 説明 |
|---|---|---|
name | はい | ClickPipe の名前。 |
service_id | はい | ClickHouse Cloud サービスの ID。 |
source | はい | ソースの構成 (ClickPipe ごとに 1 つの source ブロック) 。 |
destination | はい | 宛先の構成。 |
scaling | いいえ | レプリカ数とサイズ。デフォルトは 1 レプリカです。 |
field_mappings | いいえ | ソースと宛先のカラム間のカスタムフィールドマッピング。 |
settings | いいえ | ClickPipe の高度な設定。 |
stopped | いいえ | 停止状態で ClickPipe を作成するには true に設定します。デフォルトは false です。 |
id 属性と state 属性は読み取り専用で、作成後に ClickHouse Cloud によって設定されます。
宛先
destination ブロックは、すべてのソースタイプで共通です:
destination {
database = "default" # 移行先データベース。デフォルトは "default"。
table = "my_table" # ターゲットテーブル名。CDC(変更データキャプチャ)以外のすべてのソースで必須。
managed_table = true # ClickPipesにテーブルの作成と管理を委任する。デフォルトはtrue。
table_definition {
engine {
type = "MergeTree" # MergeTree、ReplacingMergeTree、SummingMergeTree、またはNull。
}
sorting_key = ["id", "ts"] # 省略可能。
partition_by = "toYYYYMM(ts)" # 省略可能。
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
database のみです。
ClickPipeタイプ別の例
Kafka
typeの値: kafka, confluent, msk, azureeventhub, redpanda, warpstream.
resource "clickhouse_clickpipe" "kafka_clickpipe" {
name = "My Kafka ClickPipe"
service_id = var.service_id
scaling {
replicas = 2
replica_cpu_millicores = 250
replica_memory_gb = 1.0
}
source {
kafka {
type = "confluent"
format = "JSONEachRow"
brokers = "broker.example.com:9092"
topics = "my_topic"
consumer_group = "clickpipes-consumer-group"
authentication = "PLAIN"
credentials {
username = "my_user"
password = var.kafka_password
}
offset {
strategy = "from_latest"
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
columns {
name = "timestamp"
type = "DateTime"
}
}
}
スキーマレジストリ対応のKafka
resource "clickhouse_clickpipe" "kafka_avro_clickpipe" {
name = "My Kafka Avro ClickPipe"
service_id = var.service_id
source {
kafka {
type = "confluent"
format = "AvroConfluent"
brokers = "broker.example.com:9092"
topics = "my_avro_topic"
credentials {
username = "my_user"
password = var.kafka_password
}
schema_registry {
url = "https://schema-registry.example.com"
authentication = "PLAIN"
credentials {
username = "sr_user"
password = var.schema_registry_password
}
}
}
}
destination {
table = "my_avro_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "event"
type = "String"
}
}
}
Amazon Kinesis
resource "clickhouse_clickpipe" "kinesis_clickpipe" {
name = "My Kinesis ClickPipe"
service_id = var.service_id
source {
kinesis {
format = "JSONEachRow"
stream_name = "my-stream"
region = "us-east-1"
iterator_type = "TRIM_HORIZON"
authentication = "IAM_USER"
access_key {
access_key_id = var.aws_access_key_id
secret_key = var.aws_secret_key
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
IAM role を使用した Kinesis
resource "clickhouse_clickpipe" "kinesis_iam_role_clickpipe" {
name = "My Kinesis ClickPipe (IAM Role)"
service_id = var.service_id
source {
kinesis {
format = "JSONEachRow"
stream_name = "my-stream"
region = "us-east-1"
iterator_type = "LATEST"
authentication = "IAM_ROLE"
iam_role = "arn:aws:iam::123456789012:role/my-kinesis-role"
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
Amazon S3
resource "clickhouse_clickpipe" "s3_clickpipe" {
name = "My S3 ClickPipe"
service_id = var.service_id
source {
object_storage {
type = "s3"
url = "https://my-bucket.s3.amazonaws.com/data/*.json"
format = "JSONEachRow"
authentication = "IAM_USER"
access_key {
access_key_id = var.aws_access_key_id
secret_key = var.aws_secret_key
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
SQS を使用した S3 の継続的インジェスト
resource "clickhouse_clickpipe" "s3_continuous_clickpipe" {
name = "My S3 Continuous ClickPipe"
service_id = var.service_id
source {
object_storage {
type = "s3"
url = "https://my-bucket.s3.amazonaws.com/data/*.json"
format = "JSONEachRow"
is_continuous = true
queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
authentication = "IAM_USER"
access_key {
access_key_id = var.aws_access_key_id
secret_key = var.aws_secret_key
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
Google Cloud Storage
service_account_key には、GCP サービスアカウントの JSON キーファイルの内容を base64 でエンコードしたものを指定する必要があります。
resource "clickhouse_clickpipe" "gcs_clickpipe" {
name = "My GCS ClickPipe"
service_id = var.service_id
source {
object_storage {
type = "gcs"
url = "gs://my-bucket/data/*.json"
format = "JSONEachRow"
authentication = "SERVICE_ACCOUNT"
service_account_key = var.gcs_service_account_key
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
Azure Blob Storage
resource "clickhouse_clickpipe" "abs_clickpipe" {
name = "My Azure Blob ClickPipe"
service_id = var.service_id
source {
object_storage {
type = "azureblobstorage"
azure_container_name = "my-container"
path = "data/*.json"
format = "JSONEachRow"
authentication = "CONNECTION_STRING"
connection_string = var.azure_connection_string
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
Postgres CDC (変更データキャプチャ)
resource "clickhouse_clickpipe" "postgres_cdc_clickpipe" {
name = "My Postgres CDC ClickPipe"
service_id = var.service_id
source {
postgres {
host = "postgres.example.com"
port = 5432
database = "mydb"
credentials {
username = "postgres_user"
password = var.postgres_password
}
settings {
replication_mode = "cdc"
# オプション設定
sync_interval_seconds = 60
pull_batch_size = 100000
allow_nullable_columns = true
initial_load_parallelism = 4
snapshot_num_rows_per_partition = 100000
snapshot_number_of_parallel_tables = 1
}
table_mappings {
source_schema_name = "public"
source_table = "users"
target_table = "public_users"
}
table_mappings {
source_schema_name = "public"
source_table = "orders"
target_table = "public_orders"
# オプション
excluded_columns = ["internal_notes"]
use_custom_sorting_key = true
sorting_keys = ["id", "created_at"]
table_engine = "ReplacingMergeTree"
}
}
}
destination {
database = "default"
}
}
IAM role を使用する Postgres
resource "clickhouse_clickpipe" "postgres_iam_role_clickpipe" {
name = "My Postgres CDC ClickPipe (IAM Role)"
service_id = var.service_id
source {
postgres {
host = "mydb.cluster.us-east-1.rds.amazonaws.com"
port = 5432
database = "mydb"
type = "rdspostgres"
authentication = "iam_role"
iam_role = "arn:aws:iam::123456789012:role/my-rds-role"
credentials {
username = "postgres_user"
}
settings {
replication_mode = "cdc"
}
table_mappings {
source_schema_name = "public"
source_table = "orders"
target_table = "public_orders"
}
}
}
destination {
database = "default"
}
}
MySQL CDC (変更データキャプチャ)
resource "clickhouse_clickpipe" "mysql_cdc_clickpipe" {
name = "My MySQL CDC ClickPipe"
service_id = var.service_id
source {
mysql {
host = "mysql.example.com"
port = 3306
type = "mysql"
credentials {
username = "mysql_user"
password = var.mysql_password
}
settings {
replication_mode = "cdc"
# オプション設定
sync_interval_seconds = 30
pull_batch_size = 10000
allow_nullable_columns = true
initial_load_parallelism = 4
snapshot_num_rows_per_partition = 100000
snapshot_number_of_parallel_tables = 2
}
table_mappings {
source_schema_name = "mydb"
source_table = "orders"
target_table = "mydb_orders"
}
table_mappings {
source_schema_name = "mydb"
source_table = "customers"
target_table = "mydb_customers"
# オプション
excluded_columns = ["password_hash"]
use_custom_sorting_key = true
sorting_keys = ["id"]
table_engine = "ReplacingMergeTree"
}
}
}
destination {
database = "default"
}
}
MongoDB CDC (変更データキャプチャ)
resource "clickhouse_clickpipe" "mongodb_cdc_clickpipe" {
name = "My MongoDB CDC ClickPipe"
service_id = var.service_id
source {
mongodb {
uri = "mongodb+srv://cluster0.example.mongodb.net"
read_preference = "secondaryPreferred"
credentials {
username = "mongo_user"
password = var.mongodb_password
}
settings {
replication_mode = "cdc"
# オプション設定
sync_interval_seconds = 30
pull_batch_size = 500
snapshot_num_rows_per_partition = 100000
snapshot_number_of_parallel_tables = 2
}
table_mappings {
source_database_name = "mydb"
source_collection = "users"
target_table = "mydb_users"
}
table_mappings {
source_database_name = "mydb"
source_collection = "orders"
target_table = "mydb_orders"
table_engine = "ReplacingMergeTree"
}
}
}
destination {
database = "default"
}
}
BigQuery
service_account_file には、GCP サービスアカウントの JSON キーファイルの内容を base64 エンコードしたものを指定する必要があります。
resource "clickhouse_clickpipe" "bigquery_snapshot_clickpipe" {
name = "My BigQuery ClickPipe"
service_id = var.service_id
source {
bigquery {
snapshot_staging_path = "gs://my-staging-bucket/staging/"
credentials {
service_account_file = var.gcp_service_account_key
}
settings {
replication_mode = "snapshot"
# オプション設定
initial_load_parallelism = 4
snapshot_num_rows_per_partition = 100000
snapshot_number_of_parallel_tables = 2
allow_nullable_columns = true
}
table_mappings {
source_dataset_name = "my_dataset"
source_table = "my_table"
target_table = "my_bigquery_table"
}
table_mappings {
source_dataset_name = "my_dataset"
source_table = "another_table"
target_table = "another_bigquery_table"
table_engine = "ReplacingMergeTree"
use_custom_sorting_key = true
sorting_keys = ["id"]
excluded_columns = ["internal_col"]
}
}
}
destination {
database = "default"
}
}
スケーリング
scaling ブロックをサポートしています。
scaling {
replicas = 2 # デフォルト: 1。最大: 10。
replica_cpu_millicores = 500 # 125 から 2000 の間。
replica_memory_gb = 2.0 # 0.5 から 8.0 の間。
}
既存の ClickPipes のインポート
terraform import clickhouse_clickpipe.example <service_id>:<clickpipe_id>