Syntax highlighting of devops/kafka

= Kafka =

<<TableOfContents()>>

== Ссылки ==

 . [[https://kafka.apache.org/]] | [[https://kafka.apache.org/quickstart | Quickstart ]] | [[https://kafka.apache.org/documentation/|Documentation]]
 . [[https://docs.confluent.io/platform/current/kafka/authentication_ssl.html | Encrypt and Authenticate with TLS]]
 . [[https://medium.com/lydtech-consulting/securing-kafka-with-mutual-tls-and-acls-b235a077f3e3 | Securing Kafka with Mutual TLS and ACLs]]
 . [[https://www.ibm.com/docs/en/connect-direct/6.0.0?topic=sunk-sample-use-case-adding-pem-certificate-key-into-cdws-keystore | Sample Use Case: Adding a PEM Certificate with a key into CDWS KeyStore]]

== Установка и запуск ==

=== Один сервер ===

Устанавливаем Java

{{{#!highlight bash
apt update
apt install default-jre
java -version
}}}

Скачиваем архив по ссылке [[ https://dlcdn.apache.org/kafka/ ]], например ''kafka_2.12-3.6.0.tgz'''.

{{{#!highlight bash
wget https://dlcdn.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz
tar -zxvf kafka_2.12-3.6.0.tgz
cp -r kafka_2.12-3.6.0 /usr/local/kafka
cd /usr/local/kafka
}}}

Запускаем и проверяем

{{{#!highlight bash
# --- запускаем сервис zookeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# --- запускаем сервис kafka
bin/kafka-server-start.sh  -daemon config/server.properties

# --- запускаем консьюмера
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
# ожидание сообщений...

# --- запускаем продюсера (в другой консоле)
bin/kafka-console-producer.sh --topic quickstart-events  --bootstrap-server localhost:9092
>hello world

# наблюдаем "hello world" в консьюмере
}}}

Остановка сервисов

{{{#!highlight bash
bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh
}}}



=== Несколько серверов ===

{{{#!wiki note
ToDo
}}}

== TLS ==

Инструкции по генерации новых сертификатов для обмена по протоколу TLS можно легко найти в сети, например - [[https://medium.com/lydtech-consulting/securing-kafka-with-mutual-tls-and-acls-b235a077f3e3 | Securing Kafka with Mutual TLS and ACLs]]. Статья содержит подробное описание и ссылку на [[https://github.com/lydtechconsulting/kafka-mutual-tls|проект в github]].

В случае, если сертификаты сгенерированы ранее в формате PEM, то действия будут немного отличаться. Нам потребуется выполнить ряд действий по преобразования PEM в [[https://en.wikipedia.org/wiki/Java_KeyStore|JKS]].

{{{#!wiki note
Согласно [[https://kafka.apache.org/documentation/#security_ssl_signing|документации]] Kafka умеет работать с PEM-сертификатами. Однако рабочие варианты использования данной возможности пока не найдены. Имеет смысл проработать этот вопрос в перспективе.
}}}


Входные данные - набор сертификатов: корневой, серверный и клиентский. Выходные жанные - хранилища в формате JKS: серверное, клиентское

Источник вдохновения - [[https://www.ibm.com/docs/en/connect-direct/6.0.0?topic=sunk-sample-use-case-adding-pem-certificate-key-into-cdws-keystore | Sample Use Case: Adding a PEM Certificate with a key into CDWS KeyStore]]

{{{#!highlight bash

pwd
/usr/local/kafka/cert

# --- входные данные
ls -1 .
ca-cert.pem
ca-key.pem
client-cert.pem
client-key.pem
server-cert.pem
server-key.pem

# --- создаем хранилища для сервера kafka
keytool -import -alias CA -keystore kafka.server.keystore.jks -file ca-cert.pem  # secret secret yes
openssl pkcs12 -export -name server -in server-cert.pem -inkey server-key.pem -out server.p12 # secret secret
keytool -importkeystore -destkeystore kafka.server.keystore.jks -deststoretype jks -srckeystore server.p12 -srcstoretype pkcs12 -alias server # secret secret
keytool -list -v -keystore kafka.server.keystore.jks # secret
keytool -import -alias CA -keystore kafka.server.truststore.jks -file ca-cert.pem # secret secret yes

# --- создаем хранилища для клиентов kafka
keytool -import -alias CA -keystore kafka.client.keystore.jks -file ca-cert.pem # secret secret yes
openssl pkcs12 -export -name client -in client-cert.pem -inkey client-key.pem -out client.p12 # secret secret
keytool -importkeystore -destkeystore kafka.client.keystore.jks -deststoretype jks -srckeystore client.p12 -srcstoretype pkcs12 -alias client # secret secret
keytool -list -v -keystore kafka.client.keystore.jks # secret
keytool -import -alias CA -keystore kafka.client.truststore.jks -file ca-cert.pem # secret secret yes

# --- выходные данные
ls -1 *.jks
kafka.client.keystore.jks
kafka.client.truststore.jks
kafka.server.keystore.jks
kafka.server.truststore.jks
}}}

Изменяем файл конфигурации сервера

{{{#!highlight properties
# /usr/local/kafka/config/server.properties
# ...
ssl.truststore.location=/usr/local/kafka/certs/kafka.server.truststore.jks
ssl.truststore.password=secret
ssl.keystore.location=/usr/local/kafka/certs/kafka.server.keystore.jks
ssl.keystore.password=secret
ssl.key.password=secret
ssl.client.auth=required

listeners=PLAINTEXT://:9092,SSL://:9093
}}}

В конфигурации указываем на созданные jks хранилища, пароли к ним, а также включаем одновременное прослушивание TLS и PLANTEXT на разных портах. Это позволяет клиентам подключаться по TLS и без TLS. Передача сообщения `hello world` из начала раздела будет работать.

Перезапускаем сервис

{{{#!highlight bash
bin/kafka-server-stop.sh
bin/kafka-server-start.sh  -daemon config/server.properties
}}}

{{{#!wiki note
Первый запуск после изменения конфигурации рекомендуется выполнить без опции `-daemon`, чтобы убедиться что сервис стартует корректно
}}}

Изменяем файлы конфигурации продюсера и консьюмера
{{{#!highlight properties
# config/producer.properties
# config/consumer.properties

ssl.truststore.location=/usr/local/kafka/certs/kafka.client.truststore.jks
ssl.truststore.password=secret

ssl.keystore.location=/usr/local/kafka/certs/kafka.client.keystore.jks
ssl.keystore.password=secret

ssl.key.password=secret
ssl.endpoint.identification.algorithm=

security.protocol=SSL
}}}

Проверяем подключение клиентов по TLS:

{{{#!highlight bash
# --- consumer
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9093 --consumer.config config/consumer.properties

# --- producer
bin/kafka-console-producer.sh --producer.config config/producer.properties --topic quickstart-events  --bootstrap-server localhost:9093
# ...
# передаем сообщение, наблюдаем в consumer
}}}



== REST Proxy ==

{{{#!wiki note
ToDo
}}}