MATERIALIZED VIEW がこのエンジンに接続されると、S3Queue テーブルエンジンはバックグラウンドでデータ収集を開始します。
テーブルの作成
S3Queue のパラメータは、S3 テーブルエンジンでサポートされているものと同じです。パラメータのセクションはこちらを参照してください。
例
設定
system.s3_queue_settings テーブルを使用します。24.10 から利用できます。
設定名 (24.7+)バージョン 24.7 以降では、S3Queue の設定は
s3queue_ プレフィックスの有無にかかわらず指定できます。- 新しい構文 (24.7+):
processing_threads_num、tracked_file_ttl_secなど - 従来の構文 (全バージョン):
s3queue_processing_threads_num、s3queue_tracked_file_ttl_secなど
モード
- unordered — 順不同モードでは、すでに処理されたすべてのファイルの集合が、ZooKeeper の永続ノードによって追跡されます。
- ordered — ordered モードでは、ファイルは辞書順で処理されます。つまり、ある時点で ‘BBB’ という名前のファイルが処理されたあとに、‘AA’ という名前のファイルがバケットに追加されても、そのファイルは無視されます。ZooKeeper に保存されるのは、正常に取り込まれたファイルのうち辞書順で最大の名前と、読み込みに失敗して再試行対象となるファイルの名前だけです。
ordered。24.6 以降はデフォルト値がなくなり、この設定は手動で指定する必要があります。以前のバージョンで作成されたテーブルでは、互換性のためデフォルト値は Ordered のままです。
after_processing
- keep。
- delete。
- move。
- tag。
keep。
move には追加の設定が必要です。同じバケット内で move する場合は、新しいパスプレフィックスを after_processing_move_prefix として指定する必要があります。
別の S3 バケットに move する場合は、対象バケットの URI を after_processing_move_uri として、S3 の認証情報を after_processing_move_access_key_id および after_processing_move_secret_access_key として指定する必要があります。
例:
after_processing_move_connection_string に Blob Storage の接続文字列、after_processing_move_container にコンテナー名を指定する必要があります。AzureQueue の設定を参照してください。
タグ付けを行うには、after_processing_tag_key と after_processing_tag_value にタグのキーと値を指定する必要があります。
after_processing_retries
- 0以上の整数。
10.
after_processing_move_access_key_id
- String.
after_processing_move_prefix
- String.
after_processing_move_secret_access_key
- String.
after_processing_move_uri
- String.
after_processing_tag_key
after_processing='tag' の場合に、正常に処理されたファイルに付与するタグのキーです。
設定可能な値:
- String.
after_processing_tag_value
after_processing='tag' の場合に、正常に処理されたファイルに付与するタグの値です。
設定可能な値:
- String。
keeper_path
s3queue_default_zookeeper_path、データベース UUID、およびテーブル UUID からこのパスを構築します。絶対パス (/ で始まる値) は指定どおりに使用され、相対パスは設定されたプレフィックスに追加されます。{database} や {uuid} などのマクロは、エンジンが ZooKeeper に接続する前に展開されます。
補助的な ZooKeeper クラスターを対象にするには、値の先頭に設定済みの名前を付けます。たとえば analytics_keeper:/clickhouse/queue/orders のように指定します。この名前は <auxiliary_zookeepers> に存在している必要があり、存在しない場合、エンジンは Unknown auxiliary ZooKeeper name ... を報告します。完全な文字列 (プレフィックスを含む) は SHOW CREATE TABLE に保持されるため、ステートメントをそのまま複製できます。
設定可能な値:
- String。
/。
loading_retries
- 正の整数。
0。
processing_threads_num
Unordered モードでのみ適用されます。
デフォルト値: CPU 数または 16。
parallel_inserts
processing_threads_num は 1 つの INSERT しか生成しないため、複数スレッドで実行されるのはファイルのダウンロードと解析だけです。
ただし、これでは並列度が制限されるため、より高いスループットを得るには parallel_inserts=true を使用してください。これにより、データを並列に挿入できるようになります (ただし、その結果として MergeTree family で生成されるデータパーツの数が増える点に注意してください) 。
INSERT は max_process*_before_commit 設定に従って生成されます。false。
enable_logging_to_s3queue_log
system.s3queue_log へのログ出力を有効にします。
デフォルト値: 0。
polling_min_timeout_ms
- 正の整数。
1000.
polling_max_timeout_ms
- 正の整数。
10000。
polling_backoff_ms
- 正の整数。
0.
tracked_files_limit
unordered モード使用時に、ZooKeeper ノード数を制限できます。ordered モードでは効果はありません。
制限に達すると、最も古い処理済みファイルが ZooKeeper ノードから削除され、再度処理されます。
設定可能な値:
- 正の整数。
1000.
tracked_file_ttl_sec
unordered モードで、処理済みファイルを ZooKeeper ノードに保持する最大秒数です (デフォルトでは無期限に保持されます) 。ordered モードでは効果はありません。
指定した秒数が経過すると、そのファイルは再インポートされます。
設定可能な値:
- 正の整数。
0。
cleanup_interval_min_ms
Ordered モード用です。追跡対象ファイルの有効期限 (TTL) と追跡対象ファイル数の上限を管理するバックグラウンドタスクの再スケジュール間隔の最小値を定義します。
デフォルト値: 10000。
cleanup_interval_max_ms
Ordered モード用です。追跡対象ファイルの有効期限 (TTL) と追跡対象ファイル集合の最大数を管理するバックグラウンドタスクの、再スケジュール間隔の上限を定義します。
デフォルト値: 30000。
buckets
Ordered モード向けです。24.6 から利用できます。S3Queue テーブルに複数のレプリカがあり、それぞれが keeper 内の同じメタデータディレクトリを使って動作している場合、buckets の値は少なくともレプリカ数以上にする必要があります。processing_threads 設定も使用している場合は、S3Queue 処理の実際の並列度を決めるため、buckets 設定の値をさらに増やすのが適切です。
use_persistent_processing_nodes
persistent_processing_nodes_ttl_seconds
use_persistent_processing_nodes が有効になっていると、処理ノードが削除されずに残ることがあります。この設定は、それらの処理ノードを安全にクリーンアップできる期間を定義します。
デフォルト値: 3600 (1 時間) 。
S3関連の設定
S3 のロールベースアクセス
extra_credentials パラメータ経由で roleARN を渡せます。
S3Queue の ordered モード
S3Queue の処理モードでは、ZooKeeper に保存するメタデータを少なくできますが、あとから追加されるファイルは、名前が英数字順でより後になる必要があるという制約があります。
S3Queue の ordered モードは、unordered と同様に (s3queue_)processing_threads_num 設定 (s3queue_ プレフィックスは省略可能) をサポートしており、サーバー上で S3 ファイルをローカル処理するスレッド数を制御できます。
パーティション化なしの ordered モードでは、プレフィックス全体の履歴を再度一覧化するのを避けるため、ClickHouse は最後に処理したキーから S3 のリスト取得を再開することがあります。bucket 化された ordered モードでは、未処理ファイルの取りこぼしを防ぐため、再開位置はすべての buckets の中で最小の処理済みキーとして保守的に選ばれます。
このリスト取得再開の最適化は、パーティション化なしの ordered モードで S3 をバックエンドとする queue に対してのみ使用されます (AzureQueue や partitioning_mode が設定されている場合は対象外です) 。
さらに、ordered モードでは (s3queue_)buckets という別の設定も導入されており、これは「論理スレッド」を意味します。つまり、複数のサーバーに S3Queue テーブルのレプリカがある分散環境では、この設定が処理ユニット数を定義します。たとえば、各 S3Queue レプリカ上の各処理スレッドは、処理対象として特定の bucket のロック取得を試みます。各 bucket には、ファイル名の hash に基づいて特定のファイルが割り当てられます。したがって、分散環境では (s3queue_)buckets 設定を少なくともレプリカ数以上、できればそれより大きくすることを強く推奨します。bucket 数がレプリカ数を上回っていても問題ありません。最も理想的なのは、(s3queue_)buckets 設定を number_of_replicas と (s3queue_)processing_threads_num の積に等しくすることです。
設定 (s3queue_)processing_threads_num は、バージョン 24.6 より前では使用を推奨しません。
設定 (s3queue_)buckets は、バージョン 24.6 以降で利用できます。
S3Queue テーブルエンジンからのSELECT
stream_like_engine_allow_direct_select を True に設定する必要があります。
S3Queue エンジンには、SELECT クエリ用の特別な設定 commit_on_select があります。読み取り後もキュー内のデータを保持するには False に、削除するには True に設定します。
説明
SELECT は、各ファイルをインポートできるのが一度限りであるため、ストリーミングインポートには (デバッグ用途を除き) あまり適していません。代わりに、materialized view を使ってリアルタイム処理のスレッドを作成するほうが実用的です。手順は次のとおりです。
- エンジンを使用して、S3 の指定したパスからデータを取り込むためのテーブルを作成し、それをデータストリームとして扱います。
- 必要な構造を持つテーブルを作成します。
- エンジンからのデータを変換し、あらかじめ作成したテーブルに格納する materialized view を作成します。
MATERIALIZED VIEW をエンジンに対して作成すると、バックグラウンドでデータの収集が開始されます。
例:
仮想カラム
_path— ファイルへのパス。_file— ファイル名。_size— ファイルのサイズ。_time— ファイルの作成時刻。
path 内のワイルドカード
path 引数では、bash 風のワイルドカードを使って複数のファイルを指定できます。処理対象のファイルは存在し、パスパターン全体に一致している必要があります。ファイルの一覧は SELECT の実行時に決定されます (CREATE 時ではありません) 。
*— 空文字列を含む、/を除く任意の文字列に置き換えられます。**— 空文字列を含む、/を含む任意の文字列に置き換えられます。?— 任意の 1 文字に置き換えられます。{some_string,another_string,yet_another_one}—'some_string'、'another_string'、'yet_another_one'のいずれかの文字列に置き換えられます。{N..M}— N から M までの範囲内の任意の数値 (両端を含む) に置き換えられます。N と M には先頭にゼロを付けることもできます (例:000..078) 。
{} を使う構文は、remote テーブル関数に似ています。
制限事項
- 重複した行は、次のような理由で発生する可能性があります。
-
ファイル処理の途中でパース中に例外が発生し、
s3queue_loading_retriesによって再試行が有効になっている場合。 -
S3Queueが複数のサーバーで設定されており、ZooKeeper 内の同じパスを参照している場合に、あるサーバーが処理済みファイルをコミットする前に Keeper セッションの有効期限が切れると、別のサーバーがそのファイルの処理を引き継ぐ可能性があります。その結果、そのファイルは最初のサーバーによって部分的または完全に処理済みである可能性があります。ただし、use_persistent_processing_nodes = 1の場合、バージョン 25.8 以降ではこれは当てはまりません。 - サーバーが異常終了した場合。
S3Queueが複数のサーバーで設定され、ZooKeeper 内の同じパスを参照しており、Orderedモードを使用している場合、s3queue_loading_retriesは機能しません。これはまもなく修正される予定です。
イントロスペクション
system.s3queue_metadata_cache テーブルと、永続的な system.s3queue_log テーブルを使用します。
system.s3queue_metadata_cache。このテーブルは永続化されず、S3Queueのインメモリ状態、つまり現在処理中のファイル、処理済みのファイル、または処理に失敗したファイルを表示します。
system.s3queue_log。永続テーブルです。system.s3queue_metadata_cacheと同じ情報を持ちますが、processedおよびfailedファイルに関するものです。
system.s3queue_logを使用するには、サーバー設定ファイルでその設定を定義してください。