Skip to Content
Development GuideExtranet Access

Public Network Access Configuration

Enable Cluster Authentication Configuration

Cluster Authentication Configuration

View SASL Access Point

[SASL_PLAINTEXT Protocol Address](/docs/ukafka/guide/node/address#SASL_PLAINTEXT Protocol Address)

Configure Public Network Forwarding

1. Create a Cloud Server in the Same Subnet as the UKafka Cluster

Note: This document is based on CentOS 7.9

2. Install nginx on the Cloud Server

yum install nginx nginx-all-modules.noarch

3. Configure nginx Proxy

  • Edit /etc/nginx/nginx.conf to add stream configuration
stream { log_format proxy '$remote_addr [$time_local] ' '$protocol $status $bytes_sent $bytes_received ' '$session_time "$upstream_addr" ' '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"'; access_log /var/log/nginx/tcp-access.log proxy; open_log_file_cache off; # Unified placement, easy to manage include /etc/nginx/tcpConf.d/*.conf; }
  • Create /etc/nginx/tcpConf.d/ directory
mkdir -p /etc/nginx/tcpConf.d/
  • Edit /etc/nginx/tcpConf.d/kafka.conf configuration file
upstream tcp9093 { server; } upstream tcp9094 { server; } upstream tcp9095 { server; } server { listen 9093; proxy_connect_timeout 8s; proxy_timeout 24h; proxy_pass tcp9093; } server { listen 9094; proxy_connect_timeout 8s; proxy_timeout 24h; proxy_pass tcp9094; } server { listen 9095; proxy_connect_timeout 8s; proxy_timeout 24h; proxy_pass tcp9095; }
  • Start nginx
systemctl start nginx.service
  • Verify the status of nginx service
systemctl status nginx.service

4. Host Firewall Configuration

The host firewall needs to accept forwarding requests on ports 9093, 9094, and 9095.

Public Network Access

1. Configure Local Hosts

Modify the local /etc/hosts file, and direct all addresses listened by UKafka SASL_PLAINTEXT protocol to the public IP of the cloud server that transmits requests: ukafka-q0j01x5g-kafka1 ukafka-q0j01x5g-kafka2 ukafka-q0j01x5g-kafka3

2. Access with Kafka Command Line Tool

Download and Configure Kafka Command Line Tool

Taking Kafka 2.6.1 command line tool as an example:

You can download the command line tool according to the instance version

# Download the command line tool wget # Extract tar -zxvf kafka_2.13-2.6.1.tgz # Enter the command line tool root directory cd kafka_2.13-2.6.1

Configure the config/kafka_client_jaas.conf file as follows, where username and password are the username and password used when enabling cluster authentication configuration in the control panel:

cat > config/kafka_client_jaas.conf << EOF KafkaClient { required username="admin" password="admin_pass"; }; EOF

Send Message

  • Modify bin/ to add parameter
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi exec $(dirname $0)/ "$@"
  • Modify config/, specify security.protocol, sasl.mechanism
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
  • Run bin/ to send message:
bin/ --producer.config ./config/ --topic foo --broker-list ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095 >hello >world >foo >bar >

Receive Message

  • Modify bin/ to add parameter
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi exec $(dirname $0)/ "$@"
  • Modify config/, specify security.protocol, sasl.mechanism security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
  • Run bin/ to receive message
bin/ --consumer.config ./config/ --topic foo --bootstrap-server ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095 --from-beginning world hello foo bar

3. Access with Java SDK

There are several ways to access Java SDK, such as configuring the kafka_client_jaas.conf file or directly setting the sasl.jaas.config property. For details, please refer to the three ways in the following code examples.

If you use the kafka_client_jaas.conf configuration file, fill in the username and password of the username and password when you enable the cluster authentication configuration in the console:

KafkaClient { required username="admin" password="admin_pass"; };

Producer Example

package cn.ucloud.ukafka.Example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; public class KafkaProducerExample { public static void main(String[] args) throws Exception { // Method 1: Add environment variable that requires specifying the path of the configuration file System.setProperty("", "./config/kafka_client_jaas.conf"); // Method 2: Add starting JVM parameter `` Properties props = new Properties(); props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095"); // You need to specify security.protocol and sasl.mechanism props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); // Method 3: Set jaas configuration content with Properties // props.put("sasl.jaas.config", " required username=\"admin\" password=\"admin_pass\";"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); String topic = "foo"; String value = "this is the message's value"; ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value); try { Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage); RecordMetadata recordMetadata = metadataFuture.get(); System.out.println(String.format("Produce ok: topic:%s partition:%d offset:%d value:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), value)); } catch (Exception e) { e.printStackTrace(); } } }

Consumer Example

package cn.ucloud.ukafka.Example; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) throws Exception { // Method 1: Add environment variable that requires specifying the path of the configuration file System.setProperty("", "./config/kafka_client_jaas.conf"); // Method 2: Add starting JVM parameter `` Properties props = new Properties(); props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095"); props.put("", "test_group"); props.put("auto.offset.reset", "earliest"); // You need to specify security.protocol and sasl.mechanism props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); // Method 3: Set jaas configuration content with Properties // props.put("sasl.jaas.config", " required username=\"admin\" password=\"admin_pass\";"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); List<String> subscribedTopics = new ArrayList<String>(); subscribedTopics.add("foo"); consumer.subscribe(subscribedTopics); while (true){ try { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("Consume topic:%s partition:%d offset:%d value:%s", record.topic(), record.partition(), record.offset(), record.value())); } } catch (Exception e) { e.printStackTrace(); } } } }

The support and setting for authentication by different language Kafka SDKs are different. For specific settings, you need to check the documentation of the relevant SDK.