Kafka

Ссылки

Установка и запуск

Один сервер

Устанавливаем Java

apt update
apt install default-jre
java -version

Скачиваем архив по ссылке https://dlcdn.apache.org/kafka/, например kafka_2.12-3.6.0.tgz.

wget https://dlcdn.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz
tar -zxvf kafka_2.12-3.6.0.tgz
cp -r kafka_2.12-3.6.0 /usr/local/kafka
cd /usr/local/kafka

Запускаем и проверяем

# --- запускаем сервис zookeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# --- запускаем сервис kafka
bin/kafka-server-start.sh  -daemon config/server.properties

# --- запускаем консьюмера
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
# ожидание сообщений...

# --- запускаем продюсера (в другой консоле)
bin/kafka-console-producer.sh --topic quickstart-events  --bootstrap-server localhost:9092
>hello world

# наблюдаем "hello world" в консьюмере

Остановка сервисов

bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh

Несколько серверов

ToDo

TLS

Инструкции по генерации новых сертификатов для обмена по протоколу TLS можно легко найти в сети, например - Securing Kafka with Mutual TLS and ACLs. Статья содержит подробное описание и ссылку на проект в github.

В случае, если сертификаты сгенерированы ранее в формате PEM, то действия будут немного отличаться. Нам потребуется выполнить ряд действий по преобразования PEM в JKS.

Согласно документации Kafka умеет работать с PEM-сертификатами. Однако рабочие варианты использования данной возможности пока не найдены. Имеет смысл проработать этот вопрос в перспективе.

Входные данные - набор сертификатов: корневой, серверный и клиентский. Выходные жанные - хранилища в формате JKS: серверное, клиентское

Источник вдохновения - Sample Use Case: Adding a PEM Certificate with a key into CDWS KeyStore

pwd
/usr/local/kafka/cert

# --- входные данные
ls -1 .
ca-cert.pem
ca-key.pem
client-cert.pem
client-key.pem
server-cert.pem
server-key.pem

# --- создаем хранилища для сервера kafka
keytool -import -alias CA -keystore kafka.server.keystore.jks -file ca-cert.pem  # secret secret yes
openssl pkcs12 -export -name server -in server-cert.pem -inkey server-key.pem -out server.p12 # secret secret
keytool -importkeystore -destkeystore kafka.server.keystore.jks -deststoretype jks -srckeystore server.p12 -srcstoretype pkcs12 -alias server # secret secret
keytool -list -v -keystore kafka.server.keystore.jks # secret
keytool -import -alias CA -keystore kafka.server.truststore.jks -file ca-cert.pem # secret secret yes

# --- создаем хранилища для клиентов kafka
keytool -import -alias CA -keystore kafka.client.keystore.jks -file ca-cert.pem # secret secret yes
openssl pkcs12 -export -name client -in client-cert.pem -inkey client-key.pem -out client.p12 # secret secret
keytool -importkeystore -destkeystore kafka.client.keystore.jks -deststoretype jks -srckeystore client.p12 -srcstoretype pkcs12 -alias client # secret secret
keytool -list -v -keystore kafka.client.keystore.jks # secret
keytool -import -alias CA -keystore kafka.client.truststore.jks -file ca-cert.pem # secret secret yes

# --- выходные данные
ls -1 *.jks
kafka.client.keystore.jks
kafka.client.truststore.jks
kafka.server.keystore.jks
kafka.server.truststore.jks

Изменяем файл конфигурации сервера

# /usr/local/kafka/config/server.properties
# ...
ssl.truststore.location=/usr/local/kafka/certs/kafka.server.truststore.jks
ssl.truststore.password=secret
ssl.keystore.location=/usr/local/kafka/certs/kafka.server.keystore.jks
ssl.keystore.password=secret
ssl.key.password=secret
ssl.client.auth=required

listeners=PLAINTEXT://:9092,SSL://:9093

В конфигурации указываем на созданные jks хранилища, пароли к ним, а также включаем одновременное прослушивание TLS и PLANTEXT на разных портах. Это позволяет клиентам подключаться по TLS и без TLS. Передача сообщения hello world из начала раздела будет работать.

Перезапускаем сервис

bin/kafka-server-stop.sh
bin/kafka-server-start.sh  -daemon config/server.properties

Первый запуск после изменения конфигурации рекомендуется выполнить без опции -daemon, чтобы убедиться что сервис стартует корректно

Изменяем файлы конфигурации продюсера и консьюмера

# config/producer.properties
# config/consumer.properties

ssl.truststore.location=/usr/local/kafka/certs/kafka.client.truststore.jks
ssl.truststore.password=secret

ssl.keystore.location=/usr/local/kafka/certs/kafka.client.keystore.jks
ssl.keystore.password=secret

ssl.key.password=secret
ssl.endpoint.identification.algorithm=

security.protocol=SSL

Проверяем подключение клиентов по TLS:

# --- consumer
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9093 --consumer.config config/consumer.properties

# --- producer
bin/kafka-console-producer.sh --producer.config config/producer.properties --topic quickstart-events  --bootstrap-server localhost:9093
# ...
# передаем сообщение, наблюдаем в consumer

REST Proxy

ToDo