Мониторинг Kafka с помощью Proto Observability
Сбор метрик Kafka
Интеграция kafka собирает метрики Apache Kafka двумя путями:
- JMX — метрики брокера, топиков, репликации, обработки запросов и сессий ZooKeeper.
- Kafka Admin API — offset партиций (
kafka_broker_offset) и лаг групп консьюмеров (kafka_consumer_offset,kafka_consumer_lag).
Образ агента для JMX
Если ProtoOBP Агент работает в контейнере (Docker, Kubernetes), используйте
образ с суффиксом -jmx, например protoobp-agent:7.40.3-jmx. Без этого
суффикса в образе нет JRE и JMX-проверка падает с ошибкой запуска.
При установке агента сервисом на Linux-хост, где уже установлена Java от самой Kafka, можно использовать обычный образ — будет переиспользована Java брокера.
Конфигурация Kafka
JMX Remote должен быть включён и доступен с хоста, на котором работает агент. Конкретные флаги зависят от способа запуска брокера.
Передайте JMX-флаги Kafka через KAFKA_JMX_OPTS перед стартом брокера
(например, в /etc/default/kafka или unit-файле systemd):
export KAFKA_JMX_PORT=9101
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true \
-Dcom.sun.management.jmxremote.port=9101 \
-Dcom.sun.management.jmxremote.rmi.port=9101 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=<FQDN_или_IP_хоста>"
В манифесте Deployment Kafka пробросьте IP пода в KAFKA_JMX_HOSTNAME через
downward API и откройте JMX-порт контейнера:
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_JMX_PORT
value: "9101"
- name: KAFKA_JMX_HOSTNAME
value: "$(POD_IP)"
- name: KAFKA_JMX_OPTS
value: >-
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.port=9101
-Dcom.sun.management.jmxremote.rmi.port=9101
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=$(POD_IP)
ports:
- name: jmx
containerPort: 9101
`java.rmi.server.hostname`
JMX RMI отдаёт клиенту адрес второго сокета через значение
java.rmi.server.hostname. Этот адрес должен резолвиться и быть достижим
с хоста, где запущен агент:
- В Kubernetes —
status.podIPчерез downward API (см. пример выше). - В Docker Compose — имя сервиса (резолвится через embedded DNS).
- На физическом хосте — FQDN или адрес интерфейса, на который смотрит агент.
- Не указывайте
0.0.0.0илиlocalhost— агент получит этот адрес и упадёт сConnection refused.
Конфигурация ProtoOBP агента
Если агент запускается в виде службы на хосте
-
Создайте файл
/etc/protoobp-agent/conf.d/kafka.d/conf.yaml:init_config: is_jmx: true collect_default_metrics: true new_gc_metrics: true instances: - host: localhost port: 9101 # user: username # если на JMX включена аутентификация # password: password monitor_unlisted_consumer_groups: true monitor_all_broker_highwatermarks: true -
Перезапустите агента:
systemctl restart protoobp-agent.
Если агент запускается в виде Docker контейнера
Добавьте autodiscovery-лейблы к контейнеру брокера. В docker-compose.yaml:
labels:
com.protoobp.ad.check_names: '["kafka"]'
com.protoobp.ad.init_configs: '[{"is_jmx": true, "collect_default_metrics": true, "new_gc_metrics": true}]'
com.protoobp.ad.instances: '[{"host": "%%host%%", "port": "9101", "monitor_unlisted_consumer_groups": true, "monitor_all_broker_highwatermarks": true}]'
Если агент запускается в Kubernetes
Используйте annotation-based autodiscovery на Pod’е Kafka. Полностью рабочий пример развёртывания:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
tags.proto.group/service: "my-kafka" # тег service для метрик
tags.proto.group/env: "dev" # тег env
tags.proto.group/version: "3.0" # тег version
annotations:
ad.proto.group/kafka.checks: |-
{
"kafka": {
"init_config": {
"is_jmx": true,
"collect_default_metrics": true,
"new_gc_metrics": true
},
"instances": [
{
"host": "%%host%%",
"port": "9101",
"monitor_unlisted_consumer_groups": true,
"monitor_all_broker_highwatermarks": true
}
]
}
}
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:7.7.0
ports:
- name: broker
containerPort: 9092
- name: jmx
containerPort: 9101
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_JMX_PORT
value: "9101"
- name: KAFKA_JMX_HOSTNAME
value: "$(POD_IP)"
- name: KAFKA_JMX_OPTS
value: >-
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.port=9101
-Dcom.sun.management.jmxremote.rmi.port=9101
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=$(POD_IP)
# ... остальные переменные Kafka (KAFKA_PROCESS_ROLES, KAFKA_LISTENERS и т.д.)
Контейнер агента
Контейнер агента должен находиться в той же сети (Docker network / Pod CIDR), что и брокер, а образ агента — иметь суффикс-jmx. Также агенту нужен
смонтированный /var/run/docker.sock:/var/run/docker.sock:ro (Docker) или
ServiceAccount с доступом к Pod’ам (Kubernetes) для autodiscovery.
Проверка
Убедитесь, что чек запустился и собирает метрики:
docker exec protoobp-agent agent status | grep -A 8 "^ kafka$"
Ожидаемый вывод — status : OK и metric_count > 0 для каждого instance:
kafka
instance_name : kafka-localhost-9101
message : <no value>
metric_count : 60
status : OK
Полный список найденных JMX-атрибутов:
docker exec protoobp-agent agent jmx list collected
Собираемые метрики
Все метрики Kafka имеют тип gauge — содержат мгновенное значение на момент
сбора. Метрики, в имени которых уже стоит суффикс _rate, — это скорость в
секунду, рассчитанная самой Kafka и пересланная агентом без дополнительной
обработки.
Лейблы
Общие (на JMX-метриках)
Добавляются агентом и ProtoOBP backend’ом:
| Лейбл | Значение |
|---|---|
host |
Хост, на котором работает агент |
instance |
Имя JMX-экземпляра брокера (например, kafka-localhost-9101) |
service |
Тег service (через com.protoobp.tags.service или POBP_TAGS) |
env |
Тег env |
docker_image |
Полный ref образа контейнера |
image_name |
Имя образа без тега |
image_tag |
Тег образа |
short_image |
Короткое имя образа |
git_commit_sha |
SHA коммита (если задано через label org.opencontainers.image.revision) |
JMX-атрибуты MBean
На всех JMX-метриках, кроме kafka_can_connect (см. ниже):
| Лейбл | Значение |
|---|---|
jmx_domain |
JMX-домен (kafka.server, kafka.network, kafka.controller, …) |
type |
Тип MBean (BrokerTopicMetrics, RequestMetrics, …) |
name |
Имя атрибута MBean (MessagesInPerSec, TotalTimeMs, …) |
kafka_can_connect относится к самому JMX-подключению, MBean у неё нет —
вместо jmx_domain/type/name она несёт env и jmx_server.
Специфичные для Kafka
| Лейбл | Где появляется | Пример |
|---|---|---|
topic |
kafka_topic_*, kafka_broker_offset, kafka_consumer_* |
orders |
partition |
kafka_broker_offset, kafka_consumer_offset, kafka_consumer_lag |
0 |
consumer_group |
kafka_consumer_offset, kafka_consumer_lag |
payments-worker |
request |
kafka_request_*_time_* |
Produce / Fetch |
delayedOperation |
kafka_request_*_purgatory_size |
Fetch / Produce |
clientId |
kafka_replication_max_lag |
Replica |
listener |
kafka_server_socket_connection_count |
PLAINTEXT |
networkProcessor |
kafka_server_socket_connection_count |
0 |
Метрики Admin API
kafka_broker_offset, kafka_consumer_offset и kafka_consumer_lag
собираются через Kafka Admin API в обход JMX и несут только узкий набор:
host, topic, partition и consumer_group (последний — у *_offset и
*_lag). Базовых JMX-меток и enrichment-а контейнера у них нет — учитывайте
это при джоинах с другими сериями в PromQL.
Здоровье JMX-подключения
| Имя метрики | Единица | Описание |
|---|---|---|
kafka_can_connect |
bool | 1 — JMX-подключение к брокеру установлено, 0 — не удалось |
Метрики уровня брокера
Несут базовый набор JMX-меток (host, instance, jmx_domain, type, name).
| Имя метрики | Единица | В единицу | Описание |
|---|---|---|---|
kafka_messages_in_rate |
message | second | Скорость поступления сообщений на брокер |
kafka_net_bytes_in_rate |
byte | second | Скорость входящего сетевого трафика на брокере |
kafka_net_bytes_out_rate |
byte | second | Скорость исходящего сетевого трафика с брокера |
kafka_net_bytes_rejected_rate |
byte | second | Скорость отклонённых байт (не прошедших валидацию) |
kafka_log_flush_rate_rate |
flush | second | Частота принудительных сбросов журнала на диск |
kafka_net_processor_avg_idle_pct_rate |
fraction | Доля простоя сетевых процессоров (0–1). Низкие значения = насыщение сети | |
kafka_request_handler_avg_idle_pct_rate |
fraction | Доля простоя потоков-обработчиков запросов (0–1). Низкие значения = насыщение CPU | |
kafka_request_channel_queue_size |
request | Размер очереди входящих запросов в канале брокера | |
kafka_server_socket_connection_count |
connection | Количество активных сетевых соединений (доп. метки listener, networkProcessor) |
Метрики уровня топика
Несут лейбл topic дополнительно к базовому JMX-набору.
| Имя метрики | Единица | В единицу | Описание |
|---|---|---|---|
kafka_topic_messages_in_rate |
message | second | Скорость поступления сообщений в топик |
kafka_topic_net_bytes_in_rate |
byte | second | Скорость входящего трафика в топик |
kafka_topic_net_bytes_out_rate |
byte | second | Скорость исходящего трафика из топика |
Offset и consumer lag (Kafka Admin API)
Несут лейблы host, topic, partition (плюс consumer_group для двух
последних) — JMX-атрибуты не применяются.
| Имя метрики | Единица | Описание |
|---|---|---|
kafka_broker_offset |
offset | Текущий log end offset раздела топика на брокере |
kafka_consumer_offset |
offset | Последнее зафиксированное смещение группы консьюмеров для раздела |
kafka_consumer_lag |
message | Отставание группы от лидера: kafka_broker_offset − kafka_consumer_offset |
Метрики репликации
Несут базовый набор JMX-меток.
| Имя метрики | Единица | В единицу | Описание |
|---|---|---|---|
kafka_replication_active_controller_count |
controller | Активных контроллеров на брокере (в здоровом кластере сумма по брокерам = 1) | |
kafka_replication_leader_count |
partition | Разделов, для которых брокер является лидером | |
kafka_replication_partition_count |
partition | Всего разделов, обслуживаемых брокером | |
kafka_replication_offline_partitions_count |
partition | Разделов без активного лидера | |
kafka_replication_under_replicated_partitions |
partition | Недореплицированных разделов на брокере | |
kafka_replication_under_min_isr_partition_count |
partition | Разделов, где число реплик в ISR < min.insync.replicas |
|
kafka_replication_max_lag |
message | Максимальное отставание follower-реплики от лидера (доп. метка clientId) |
|
kafka_replication_isr_expands_rate |
expansion | second | Частота расширений набора синхронных реплик |
kafka_replication_isr_shrinks_rate |
shrink | second | Частота сокращений набора синхронных реплик |
kafka_replication_leader_elections_rate |
election | second | Частота плановых выборов лидеров |
kafka_replication_unclean_leader_elections_rate |
election | second | Частота «нечистых» выборов лидера — приводят к потере данных |
Обработка запросов
Доп. метка request (Produce / FetchConsumer / FetchFollower /
Metadata / UpdateMetadata) — на метриках времени; delayedOperation — на
purgatory-метриках.
| Имя метрики | Единица | В единицу | Описание |
|---|---|---|---|
kafka_request_produce_time_avg |
millisecond | Среднее время обработки produce-запросов | |
kafka_request_produce_time_99percentile |
millisecond | 99-й перцентиль produce-запросов | |
kafka_request_produce_failed_rate |
request | second | Неудачных produce-запросов в секунду |
kafka_request_producer_request_purgatory_size |
request | Produce-запросов в purgatory (доп. метка delayedOperation) |
|
kafka_request_fetch_consumer_time_avg |
millisecond | Среднее время fetch-запросов от консьюмеров | |
kafka_request_fetch_consumer_time_99percentile |
millisecond | 99-й перцентиль fetch от консьюмеров | |
kafka_request_fetch_follower_time_avg |
millisecond | Среднее время fetch от follower-брокеров | |
kafka_request_fetch_follower_time_99percentile |
millisecond | 99-й перцентиль fetch от follower-брокеров | |
kafka_request_fetch_failed_rate |
request | second | Неудачных fetch-запросов в секунду |
kafka_request_fetch_request_purgatory_size |
request | Fetch-запросов в purgatory (доп. метка delayedOperation) |
|
kafka_request_metadata_time_avg |
millisecond | Среднее время metadata-запросов | |
kafka_request_metadata_time_99percentile |
millisecond | 99-й перцентиль metadata-запросов | |
kafka_request_update_metadata_time_avg |
millisecond | Среднее время UpdateMetadata-запросов | |
kafka_request_update_metadata_time_99percentile |
millisecond | 99-й перцентиль UpdateMetadata-запросов |
Сессии и ZooKeeper
Несут базовый набор JMX-меток.
| Имя метрики | Единица | В единицу | Описание |
|---|---|---|---|
kafka_session_fetch_count |
session | Открытых fetch-сессий на брокере | |
kafka_session_fetch_eviction |
session | Вытесненных fetch-сессий | |
kafka_session_zookeeper_disconnect_rate |
event | second | Разрывов сессии с ZooKeeper |
kafka_session_zookeeper_expire_rate |
event | second | Истечений сессии с ZooKeeper |
kafka_session_zookeeper_readonly_rate |
event | second | Переходов сессии ZooKeeper в режим read-only |
kafka_session_zookeeper_sync_rate |
event | second | Синхронизаций сессии с ZooKeeper |
Ключевые метрики для дашбордов и алертов
Доступность брокера
kafka_can_connect == 0— алерт: JMX недоступен (агент не получает метрики).kafka_replication_active_controller_count—sum() != 1по кластеру = split brain или нет контроллера.kafka_replication_offline_partitions_count > 0— алерт: разделы без лидера.kafka_replication_unclean_leader_elections_rate > 0— алерт: была потеря данных.
Целостность реплик
kafka_replication_under_replicated_partitions > 0— деградация репликации, дашборд per-broker.kafka_replication_under_min_isr_partition_count > 0— продюсеры сacks=allначинают получать ошибки записи.kafka_replication_max_lag— растёт = follower не успевает за лидером.
Производительность брокера
kafka_messages_in_rate,kafka_net_bytes_in_rate,kafka_net_bytes_out_rate— основные графики throughput.kafka_request_produce_time_99percentile,kafka_request_fetch_consumer_time_99percentile— латентность запросов.kafka_net_processor_avg_idle_pct_rate < 0.3илиkafka_request_handler_avg_idle_pct_rate < 0.3— алерт: насыщение сетевых/обработчиков.kafka_request_channel_queue_size— растёт = бэклог запросов.
Consumer lag
kafka_consumer_lag— основной SLI для консьюмеров: разбивка поconsumer_groupиtopic, алерт по абсолютному порогу или росту через окно.kafka_consumer_offsetбез движения при ненулевомkafka_topic_messages_in_rate= группа замерла.
Топики
kafka_topic_messages_in_rate,kafka_topic_net_bytes_in_rate— топ топиков по нагрузке, обнаружение всплесков.