Public Network Access Configuration
Enable Cluster Authentication Configuration
Cluster Authentication Configuration
View SASL Access Point
[SASL_PLAINTEXT Protocol Address](/docs/ukafka/guide/node/address#SASL_PLAINTEXT Protocol Address)
Configure Public Network Forwarding
1. Create a Cloud Server in the Same Subnet as the UKafka Cluster
Note: This document is based on CentOS 7.9
2. Install nginx on the Cloud Server
yum install nginx nginx-all-modules.noarch
3. Configure nginx Proxy
- Edit
/etc/nginx/nginx.conf
to add stream configuration
stream {
log_format proxy '$remote_addr [$time_local] '
'$protocol $status $bytes_sent $bytes_received '
'$session_time "$upstream_addr" '
'"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';
access_log /var/log/nginx/tcp-access.log proxy;
open_log_file_cache off;
# Unified placement, easy to manage
include /etc/nginx/tcpConf.d/*.conf;
}
- Create
/etc/nginx/tcpConf.d/
directory
mkdir -p /etc/nginx/tcpConf.d/
- Edit
/etc/nginx/tcpConf.d/kafka.conf
configuration file
upstream tcp9093 {
server 10.23.26.105:9093;
}
upstream tcp9094 {
server 10.23.180.35:9094;
}
upstream tcp9095 {
server 10.23.202.164:9095;
}
server {
listen 9093;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9093;
}
server {
listen 9094;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9094;
}
server {
listen 9095;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9095;
}
- Start nginx
systemctl start nginx.service
- Verify the status of nginx service
systemctl status nginx.service
4. Host Firewall Configuration
The host firewall needs to accept forwarding requests on ports 9093, 9094, and 9095.
Public Network Access
1. Configure Local Hosts
Modify the local /etc/hosts
file, and direct all addresses listened by UKafka SASL_PLAINTEXT protocol to the public IP of the cloud server that transmits requests:
113.31.114.125 ukafka-q0j01x5g-kafka1
113.31.114.125 ukafka-q0j01x5g-kafka2
113.31.114.125 ukafka-q0j01x5g-kafka3
2. Access with Kafka Command Line Tool
Download and Configure Kafka Command Line Tool
Taking Kafka 2.6.1 command line tool as an example:
You can download the command line tool according to the instance version
# Download the command line tool
wget https://archive.apache.org/dist/kafka/2.6.1/kafka_2.13-2.6.1.tgz
# Extract
tar -zxvf kafka_2.13-2.6.1.tgz
# Enter the command line tool root directory
cd kafka_2.13-2.6.1
Configure the config/kafka_client_jaas.conf
file as follows, where username
and password
are the username and password used when enabling cluster authentication configuration in the control panel:
cat > config/kafka_client_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin_pass";
};
EOF
Send Message
- Modify
bin/kafka-console-producer.sh
to addjava.security.auth.login.config
parameter
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
- Modify
config/producer.properties
, specifysecurity.protocol
,sasl.mechanism
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- Run
bin/kafka-console-producer.sh
to send messageļ¼
bin/kafka-console-producer.sh --producer.config ./config/producer.properties --topic foo --broker-list ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095
>hello
>world
>foo
>bar
>
Receive Message
- Modify
bin/kafka-console-consumer.sh
to addjava.security.auth.login.config
parameter
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
- Modify
config/consumer.properties
, specifysecurity.protocol
,sasl.mechanism
group.id=test-consumer-group
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- Run
bin/kafka-console-consumer.sh
to receive message
bin/kafka-console-consumer.sh --consumer.config ./config/consumer.properties --topic foo --bootstrap-server ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095 --from-beginning
world
hello
foo
bar
3. Access with Java SDK
There are several ways to access Java SDK, such as configuring the kafka_client_jaas.conf
file or directly setting the sasl.jaas.config
property. For details, please refer to the three ways in the following code examples.
If you use the kafka_client_jaas.conf
configuration file, fill in the username and password of the username and password when you enable the cluster authentication configuration in the console:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin_pass";
};
Producer Example
package cn.ucloud.ukafka.Example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception {
// Method 1: Add environment variable that requires specifying the path of the configuration file
System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");
// Method 2: Add starting JVM parameter `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`
Properties props = new Properties();
props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
// You need to specify security.protocol and sasl.mechanism
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
// Method 3: Set jaas configuration content with Properties
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
String topic = "foo";
String value = "this is the message's value";
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value);
try {
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
System.out.println(String.format("Produce ok: topic:%s partition:%d offset:%d value:%s",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), value));
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer Example
package cn.ucloud.ukafka.Example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
// Method 1: Add environment variable that requires specifying the path of the configuration file
System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");
// Method 2: Add starting JVM parameter `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`
Properties props = new Properties();
props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
props.put("group.id", "test_group");
props.put("auto.offset.reset", "earliest");
// You need to specify security.protocol and sasl.mechanism
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
// Method 3: Set jaas configuration content with Properties
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add("foo");
consumer.subscribe(subscribedTopics);
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Consume topic:%s partition:%d offset:%d value:%s",
record.topic(), record.partition(), record.offset(), record.value()));
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
The support and setting for authentication by different language Kafka SDKs are different. For specific settings, you need to check the documentation of the relevant SDK.