Docs
ukafka
Development Guide
Extranet Access

Public Network Access Configuration

Enable Cluster Authentication Configuration

Cluster Authentication Configuration

View SASL Access Point

[SASL_PLAINTEXT Protocol Address](/docs/ukafka/guide/node/address#SASL_PLAINTEXT Protocol Address)

Configure Public Network Forwarding

1. Create a Cloud Server in the Same Subnet as the UKafka Cluster

Note: This document is based on CentOS 7.9

2. Install nginx on the Cloud Server

yum install nginx nginx-all-modules.noarch

3. Configure nginx Proxy

  • Edit /etc/nginx/nginx.conf to add stream configuration
stream {
    log_format proxy '$remote_addr [$time_local] '
                 '$protocol $status $bytes_sent $bytes_received '
                 '$session_time "$upstream_addr" '
                 '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';

    access_log /var/log/nginx/tcp-access.log proxy;
    open_log_file_cache off;

    # Unified placement, easy to manage
    include /etc/nginx/tcpConf.d/*.conf;
}
  • Create /etc/nginx/tcpConf.d/ directory
mkdir -p /etc/nginx/tcpConf.d/
  • Edit /etc/nginx/tcpConf.d/kafka.conf configuration file
upstream tcp9093 {
    server 10.23.26.105:9093;
}
upstream tcp9094 {
    server 10.23.180.35:9094;
}
upstream tcp9095 {
    server 10.23.202.164:9095;
}

server {
    listen 9093;
    proxy_connect_timeout 8s;
    proxy_timeout 24h;
    proxy_pass tcp9093;
}
server {
    listen 9094;
    proxy_connect_timeout 8s;
    proxy_timeout 24h;
    proxy_pass tcp9094;
}
server {
    listen 9095;
    proxy_connect_timeout 8s;
    proxy_timeout 24h;
    proxy_pass tcp9095;
}
  • Start nginx
systemctl start nginx.service
  • Verify the status of nginx service
systemctl status nginx.service

4. Host Firewall Configuration

The host firewall needs to accept forwarding requests on ports 9093, 9094, and 9095.

Public Network Access

1. Configure Local Hosts

Modify the local /etc/hosts file, and direct all addresses listened by UKafka SASL_PLAINTEXT protocol to the public IP of the cloud server that transmits requests:

113.31.114.125 ukafka-q0j01x5g-kafka1
113.31.114.125 ukafka-q0j01x5g-kafka2
113.31.114.125 ukafka-q0j01x5g-kafka3

2. Access with Kafka Command Line Tool

Download and Configure Kafka Command Line Tool

Taking Kafka 2.6.1 command line tool as an example:

You can download the command line tool according to the instance version

# Download the command line tool
wget https://archive.apache.org/dist/kafka/2.6.1/kafka_2.13-2.6.1.tgz
 
# Extract
tar -zxvf kafka_2.13-2.6.1.tgz
 
# Enter the command line tool root directory
cd kafka_2.13-2.6.1

Configure the config/kafka_client_jaas.conf file as follows, where username and password are the username and password used when enabling cluster authentication configuration in the control panel:

cat > config/kafka_client_jaas.conf << EOF
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin_pass";
};
EOF

Send Message

  • Modify bin/kafka-console-producer.sh to add java.security.auth.login.config parameter
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
  • Modify config/producer.properties, specify security.protocol, sasl.mechanism
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
  • Run bin/kafka-console-producer.sh to send messageļ¼š
bin/kafka-console-producer.sh  --producer.config ./config/producer.properties --topic foo --broker-list ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095
>hello
>world
>foo
>bar
>

Receive Message

  • Modify bin/kafka-console-consumer.sh to add java.security.auth.login.config parameter
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
  • Modify config/consumer.properties, specify security.protocol, sasl.mechanism
group.id=test-consumer-group
 
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
  • Run bin/kafka-console-consumer.sh to receive message
bin/kafka-console-consumer.sh --consumer.config ./config/consumer.properties --topic foo --bootstrap-server ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095 --from-beginning
world
hello
foo
bar

3. Access with Java SDK

There are several ways to access Java SDK, such as configuring the kafka_client_jaas.conf file or directly setting the sasl.jaas.config property. For details, please refer to the three ways in the following code examples.

If you use the kafka_client_jaas.conf configuration file, fill in the username and password of the username and password when you enable the cluster authentication configuration in the console:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin_pass";
};

Producer Example

package cn.ucloud.ukafka.Example;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
 
import java.util.Properties;
import java.util.concurrent.Future;
 
public class KafkaProducerExample {
 
    public static void main(String[] args) throws Exception {
        // Method 1: Add environment variable that requires specifying the path of the configuration file
        System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");
 
        // Method 2: Add starting JVM parameter `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`
 
        Properties props = new Properties();
        props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
 
        // You need to specify security.protocol and sasl.mechanism
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
 
        // Method 3: Set jaas configuration content with Properties
        // props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");
 
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
 
        String topic = "foo";
        String value = "this is the message's value";
 
        ProducerRecord<String, String>  kafkaMessage =  new ProducerRecord<String, String>(topic, value);
        try {
            Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
            RecordMetadata recordMetadata = metadataFuture.get();
            System.out.println(String.format("Produce ok: topic:%s partition:%d offset:%d value:%s",
                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), value));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer Example

package cn.ucloud.ukafka.Example;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
 
public class KafkaConsumerExample {
 
    public static void main(String[] args) throws Exception {
        // Method 1: Add environment variable that requires specifying the path of the configuration file
        System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");
 
        // Method 2: Add starting JVM parameter `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`
 
        Properties props = new Properties();
        props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
        props.put("group.id", "test_group");
        props.put("auto.offset.reset", "earliest");
 
        // You need to specify security.protocol and sasl.mechanism
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
 
        // Method 3: Set jaas configuration content with Properties
        // props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");
 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<String> subscribedTopics = new ArrayList<String>();
        subscribedTopics.add("foo");
        consumer.subscribe(subscribedTopics);
 
        while (true){
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("Consume topic:%s partition:%d offset:%d value:%s",
                            record.topic(), record.partition(), record.offset(), record.value()));
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

The support and setting for authentication by different language Kafka SDKs are different. For specific settings, you need to check the documentation of the relevant SDK.