Kafka
Ссылки
- https://kafka.apache.org/ | Quickstart | Documentation
- Encrypt and Authenticate with TLS
- Securing Kafka with Mutual TLS and ACLs
- Sample Use Case: Adding a PEM Certificate with a key into CDWS KeyStore
Установка и запуск
Один сервер
Устанавливаем Java
apt update apt install default-jre java -version
Скачиваем архив по ссылке https://dlcdn.apache.org/kafka/, например kafka_2.12-3.6.0.tgz.
wget https://dlcdn.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz tar -zxvf kafka_2.12-3.6.0.tgz cp -r kafka_2.12-3.6.0 /usr/local/kafka cd /usr/local/kafka
Запускаем и проверяем
# --- запускаем сервис zookeper bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # --- запускаем сервис kafka bin/kafka-server-start.sh -daemon config/server.properties # --- запускаем консьюмера bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 # ожидание сообщений... # --- запускаем продюсера (в другой консоле) bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 >hello world # наблюдаем "hello world" в консьюмере
Остановка сервисов
bin/kafka-server-stop.sh bin/zookeeper-server-stop.sh
Несколько серверов
ToDo
TLS
Инструкции по генерации новых сертификатов для обмена по протоколу TLS можно легко найти в сети, например - Securing Kafka with Mutual TLS and ACLs. Статья содержит подробное описание и ссылку на проект в github.
В случае, если сертификаты сгенерированы ранее в формате PEM, то действия будут немного отличаться. Нам потребуется выполнить ряд действий по преобразования PEM в JKS.
Согласно документации Kafka умеет работать с PEM-сертификатами. Однако рабочие варианты использования данной возможности пока не найдены. Имеет смысл проработать этот вопрос в перспективе.
Входные данные - набор сертификатов: корневой, серверный и клиентский. Выходные жанные - хранилища в формате JKS: серверное, клиентское
Источник вдохновения - Sample Use Case: Adding a PEM Certificate with a key into CDWS KeyStore
pwd /usr/local/kafka/cert # --- входные данные ls -1 . ca-cert.pem ca-key.pem client-cert.pem client-key.pem server-cert.pem server-key.pem # --- создаем хранилища для сервера kafka keytool -import -alias CA -keystore kafka.server.keystore.jks -file ca-cert.pem # secret secret yes openssl pkcs12 -export -name server -in server-cert.pem -inkey server-key.pem -out server.p12 # secret secret keytool -importkeystore -destkeystore kafka.server.keystore.jks -deststoretype jks -srckeystore server.p12 -srcstoretype pkcs12 -alias server # secret secret keytool -list -v -keystore kafka.server.keystore.jks # secret keytool -import -alias CA -keystore kafka.server.truststore.jks -file ca-cert.pem # secret secret yes # --- создаем хранилища для клиентов kafka keytool -import -alias CA -keystore kafka.client.keystore.jks -file ca-cert.pem # secret secret yes openssl pkcs12 -export -name client -in client-cert.pem -inkey client-key.pem -out client.p12 # secret secret keytool -importkeystore -destkeystore kafka.client.keystore.jks -deststoretype jks -srckeystore client.p12 -srcstoretype pkcs12 -alias client # secret secret keytool -list -v -keystore kafka.client.keystore.jks # secret keytool -import -alias CA -keystore kafka.client.truststore.jks -file ca-cert.pem # secret secret yes # --- выходные данные ls -1 *.jks kafka.client.keystore.jks kafka.client.truststore.jks kafka.server.keystore.jks kafka.server.truststore.jks
Изменяем файл конфигурации сервера
# /usr/local/kafka/config/server.properties # ... ssl.truststore.location=/usr/local/kafka/certs/kafka.server.truststore.jks ssl.truststore.password=secret ssl.keystore.location=/usr/local/kafka/certs/kafka.server.keystore.jks ssl.keystore.password=secret ssl.key.password=secret ssl.client.auth=required listeners=PLAINTEXT://:9092,SSL://:9093
В конфигурации указываем на созданные jks хранилища, пароли к ним, а также включаем одновременное прослушивание TLS и PLANTEXT на разных портах. Это позволяет клиентам подключаться по TLS и без TLS. Передача сообщения hello world
из начала раздела будет работать.
Перезапускаем сервис
bin/kafka-server-stop.sh bin/kafka-server-start.sh -daemon config/server.properties
Первый запуск после изменения конфигурации рекомендуется выполнить без опции -daemon
, чтобы убедиться что сервис стартует корректно
Изменяем файлы конфигурации продюсера и консьюмера
# config/producer.properties # config/consumer.properties ssl.truststore.location=/usr/local/kafka/certs/kafka.client.truststore.jks ssl.truststore.password=secret ssl.keystore.location=/usr/local/kafka/certs/kafka.client.keystore.jks ssl.keystore.password=secret ssl.key.password=secret ssl.endpoint.identification.algorithm= security.protocol=SSL
Проверяем подключение клиентов по TLS:
# --- consumer bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9093 --consumer.config config/consumer.properties # --- producer bin/kafka-console-producer.sh --producer.config config/producer.properties --topic quickstart-events --bootstrap-server localhost:9093 # ... # передаем сообщение, наблюдаем в consumer
REST Proxy
ToDo