Synchronize from Kafka
Log into your UCloud Global account and go to the User Console , then search for “Kafka Message Queue UKafka” under all products or under Big Data, and enter the Kafka Message Queue UKafka Console . Click the Create Cluster button. On the create cluster page, make selections based on the configurations provided and place your order.
Friendly reminder:
- UKafka is a paid product. Please refer to the pricing on the create page.
- Ensure that the UKafka cluster and the UClickhouse cloud data warehouse cluster are in the same region (same VPC). If they are different, use Private Network VPC -> Network Interconnection.
Prerequisites
The UKafka product has been created, and a Topic and Group have been created. Please refer to UKafka Creation.
Steps
Connect to the Cluster
-
Set up a cloud host in the same region as the cluster (same subnet), and install the Clickhouse-client on the cloud host. Older version download: Download Clickhouse-client . Newer version download: Download Clickhouse-client .
It is recommended to choose the Clickhouse-client version corresponding to the kernel version actually created. For example, if the cluster kernel version created is 21.8.14.5, download the following rpm packages:
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-21.8.14.5-2.noarch.rpm wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-21.8.14.5-2.x86_64.rpm
-
Execute the installation
rpm -ivh clickhouse-common-static-21.8.14.5-2.x86_64.rpm rpm -ivh clickhouse-client-21.8.14.5-2.noarch.rpm
-
Connect to the cluster with clickhouse-client
clickhouse-client --host=any node IP address --port=9000 --user=admin --password=password set during cluster creation
The above command will enter interactive mode. The default username is admin, the default port is 9000, and the node IP can be checked in the cluster details.
Create Database
CREATE DATABASE IF NOT EXISTS ck_test ON CLUSTER ck_cluster;
Create Kafka Consumer Table
CREATE TABLE ck_test.lineorder_kafka ON CLUSTER ck_cluster
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'broker list',
kafka_topic_list = 'topic list',
kafka_group_name = 'consumer group name',
kafka_format = 'CSV',
kafka_num_consumers = 1,
kafka_max_block_size = 65536,
kafka_skip_broken_messages = 0,
kafka_auto_offset_reset = 'latest';
Note:
The Kafka consumer table cannot be used directly as a result table. Its function is only to consume data from Kafka; it does not actually store the data.
Parameter description:
Name | Required | Description |
---|---|---|
kafka_broker_list | Yes | A comma-separated list of Kafka broker addresses. |
kafka_topic_list | Yes | A comma-separated list of Topic names. |
kafka_group_name | Yes | The consumer group name for Kafka. |
kafka_format | Yes | The message format supported by UClickHouse. Supported formats can be found at Input and Output Formats . |
kafka_row_delimiter | No | Row delimiter for separating different data rows. Default is “\n”, but you can set it according to the actual separation format used when writing data. |
kafka_num_consumers | No | Number of consumers for a single table. Default value is 1. If one consumer’s throughput is not sufficient, specify more consumers. The total number of consumers should not exceed the number of partitions in the Topic, because each partition can only be assigned one consumer. |
kafka_max_block_size | No | Maximum batch size of Kafka messages in bytes. Default value is 65536 bytes. |
kafka_skip_broken_messages | No | The tolerance level of kafka message parser for dirty data. Default value is 0. If kafka_skip_broken_messages=N , the engine will skip N unparseable Kafka messages (one message equals one line of data). |
kafka_commit_every_batch | No | Frequency of executing Kafka commit, described below. Commit is executed after writing all the data of a whole Block data block. A commit is executed after writing each Batch of data. |
kafka_ auto_offset_reset | No | Where to start reading Kafka data from. Options are described below. earliest: Start reading Kafka data from the earliest offset. latest: Start reading Kafka data from the latest offset. Note: The 21.8 version of ClickHouse cloud database clusters does not support this parameter. |
For more parameters, refer to the official Kafka documentation .
Create UClickhouse Local Table
Create Local Table
CREATE TABLE ck_test.lineorder_local ON CLUSTER ck_cluster
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = MergeTree ORDER BY (LO_ORDERKEY);
Create Distributed Table
If you only need to sync data to the local table, you can skip this step.
CREATE TABLE IF NOT EXISTS ck_test.lineorder on cluster ck_cluster AS ck_test.lineorder_local ENGINE = Distributed(
ck_cluster,
ck_test,
lineorder_local,
rand()
);
Create Materialized View
Create a materialized view to synchronize data consumed from the Kafka consumer table to the UClickHouse target table. If your target table is a local table, replace the distributed table name with the local table name before synchronization.
CREATE MATERIALIZED VIEW consumer ON CLUSTER ck_cluster TO lineorder AS SELECT * FROM lineorder_kafka;
Write Messages in UKafka
Please refer to UKafka.
Query Verification
clickhouse-client --host=any node IP address --port=9000 --user=admin --password=password set during cluster creation --query="select count(*) from lineorder"