Skip to Content
Data SynchronizationKafka Data Import

Synchronize from Kafka

Log into your UCloud Global account and go to the User Console, then search for “Kafka Message Queue UKafka” under all products or under Big Data, and enter the Kafka Message Queue UKafka Console. Click the Create Cluster button. On the create cluster page, make selections based on the configurations provided and place your order.

Friendly reminder:
  1. UKafka is a paid product. Please refer to the pricing on the create page.
  2. Ensure that the UKafka cluster and the UClickhouse cloud data warehouse cluster are in the same region (same VPC). If they are different, use Private Network VPC -> Network Interconnection.

Prerequisites

The UKafka product has been created, and a Topic and Group have been created. Please refer to UKafka Creation.

Steps

Connect to the Cluster

  • Set up a cloud host in the same region as the cluster (same subnet), and install the Clickhouse-client on the cloud host. Older version download: Download Clickhouse-client. Newer version download: Download Clickhouse-client.

    It is recommended to choose the Clickhouse-client version corresponding to the kernel version actually created. For example, if the cluster kernel version created is 21.8.14.5, download the following rpm packages:

    wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-21.8.14.5-2.noarch.rpm wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-21.8.14.5-2.x86_64.rpm
  • Execute the installation

    rpm -ivh clickhouse-common-static-21.8.14.5-2.x86_64.rpm rpm -ivh clickhouse-client-21.8.14.5-2.noarch.rpm
  • Connect to the cluster with clickhouse-client

    clickhouse-client --host=any node IP address --port=9000 --user=admin --password=password set during cluster creation

    The above command will enter interactive mode. The default username is admin, the default port is 9000, and the node IP can be checked in the cluster details.

Create Database

CREATE DATABASE IF NOT EXISTS ck_test ON CLUSTER ck_cluster;

Create Kafka Consumer Table

CREATE TABLE ck_test.lineorder_kafka ON CLUSTER ck_cluster ( LO_ORDERKEY UInt32, LO_LINENUMBER UInt8, LO_CUSTKEY UInt32, LO_PARTKEY UInt32, LO_SUPPKEY UInt32, LO_ORDERDATE Date, LO_ORDERPRIORITY LowCardinality(String), LO_SHIPPRIORITY UInt8, LO_QUANTITY UInt8, LO_EXTENDEDPRICE UInt32, LO_ORDTOTALPRICE UInt32, LO_DISCOUNT UInt8, LO_REVENUE UInt32, LO_SUPPLYCOST UInt32, LO_TAX UInt8, LO_COMMITDATE Date, LO_SHIPMODE LowCardinality(String) ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'broker list', kafka_topic_list = 'topic list', kafka_group_name = 'consumer group name', kafka_format = 'CSV', kafka_num_consumers = 1, kafka_max_block_size = 65536, kafka_skip_broken_messages = 0, kafka_auto_offset_reset = 'latest';
Note: The Kafka consumer table cannot be used directly as a result table. Its function is only to consume data from Kafka; it does not actually store the data.

Parameter description:

NameRequiredDescription
kafka_broker_listYesA comma-separated list of Kafka broker addresses.
kafka_topic_listYesA comma-separated list of Topic names.
kafka_group_nameYesThe consumer group name for Kafka.
kafka_formatYesThe message format supported by UClickHouse.
Supported formats can be found at Input and Output Formats.
kafka_row_delimiterNoRow delimiter for separating different data rows.
Default is “\n”, but you can set it according to the actual separation format used when writing data.
kafka_num_consumersNoNumber of consumers for a single table. Default value is 1.
If one consumer’s throughput is not sufficient, specify more consumers.
The total number of consumers should not exceed the number of partitions in the Topic, because each partition can only be assigned one consumer.
kafka_max_block_sizeNoMaximum batch size of Kafka messages in bytes. Default value is 65536 bytes.
kafka_skip_broken_messagesNoThe tolerance level of kafka message parser for dirty data. Default value is 0.
If kafka_skip_broken_messages=N, the engine will skip N unparseable Kafka messages (one message equals one line of data).
kafka_commit_every_batchNoFrequency of executing Kafka commit, described below.
Commit is executed after writing all the data of a whole Block data block.
A commit is executed after writing each Batch of data.
kafka_ auto_offset_resetNoWhere to start reading Kafka data from. Options are described below.
earliest: Start reading Kafka data from the earliest offset.
latest: Start reading Kafka data from the latest offset.
Note: The 21.8 version of ClickHouse cloud database clusters does not support this parameter.

For more parameters, refer to the official Kafka documentation.

Create UClickhouse Local Table

Create Local Table

CREATE TABLE ck_test.lineorder_local ON CLUSTER ck_cluster ( LO_ORDERKEY UInt32, LO_LINENUMBER UInt8, LO_CUSTKEY UInt32, LO_PARTKEY UInt32, LO_SUPPKEY UInt32, LO_ORDERDATE Date, LO_ORDERPRIORITY LowCardinality(String), LO_SHIPPRIORITY UInt8, LO_QUANTITY UInt8, LO_EXTENDEDPRICE UInt32, LO_ORDTOTALPRICE UInt32, LO_DISCOUNT UInt8, LO_REVENUE UInt32, LO_SUPPLYCOST UInt32, LO_TAX UInt8, LO_COMMITDATE Date, LO_SHIPMODE LowCardinality(String) ) ENGINE = MergeTree ORDER BY (LO_ORDERKEY);

Create Distributed Table

If you only need to sync data to the local table, you can skip this step.

CREATE TABLE IF NOT EXISTS ck_test.lineorder on cluster ck_cluster AS ck_test.lineorder_local ENGINE = Distributed( ck_cluster, ck_test, lineorder_local, rand() );

Create Materialized View

Create a materialized view to synchronize data consumed from the Kafka consumer table to the UClickHouse target table. If your target table is a local table, replace the distributed table name with the local table name before synchronization.

CREATE MATERIALIZED VIEW consumer ON CLUSTER ck_cluster TO lineorder AS SELECT * FROM lineorder_kafka;

Write Messages in UKafka

Please refer to UKafka.

Query Verification

clickhouse-client --host=any node IP address --port=9000 --user=admin --password=password set during cluster creation --query="select count(*) from lineorder"