Мониторинг Kafka с помощью Proto Observability

Сбор метрик брокера, топиков, реплик и групп консьюмеров Apache Kafka.

Сбор метрик Kafka

Интеграция kafka собирает метрики Apache Kafka двумя путями:

  • JMX — метрики брокера, топиков, репликации, обработки запросов и сессий ZooKeeper.
  • Kafka Admin API — offset партиций (kafka_broker_offset) и лаг групп консьюмеров (kafka_consumer_offset, kafka_consumer_lag).

Конфигурация Kafka

JMX Remote должен быть включён и доступен с хоста, на котором работает агент. Конкретные флаги зависят от способа запуска брокера.

Передайте JMX-флаги Kafka через KAFKA_JMX_OPTS перед стартом брокера (например, в /etc/default/kafka или unit-файле systemd):

export KAFKA_JMX_PORT=9101
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true \
  -Dcom.sun.management.jmxremote.port=9101 \
  -Dcom.sun.management.jmxremote.rmi.port=9101 \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Djava.rmi.server.hostname=<FQDN_или_IP_хоста>"

В манифесте Deployment Kafka пробросьте IP пода в KAFKA_JMX_HOSTNAME через downward API и откройте JMX-порт контейнера:

env:
  - name: POD_IP
    valueFrom:
      fieldRef:
        fieldPath: status.podIP
  - name: KAFKA_JMX_PORT
    value: "9101"
  - name: KAFKA_JMX_HOSTNAME
    value: "$(POD_IP)"
  - name: KAFKA_JMX_OPTS
    value: >-
      -Dcom.sun.management.jmxremote=true
      -Dcom.sun.management.jmxremote.port=9101
      -Dcom.sun.management.jmxremote.rmi.port=9101
      -Dcom.sun.management.jmxremote.authenticate=false
      -Dcom.sun.management.jmxremote.ssl=false
      -Djava.rmi.server.hostname=$(POD_IP)      
ports:
  - name: jmx
    containerPort: 9101

Конфигурация ProtoOBP агента

Если агент запускается в виде службы на хосте

  1. Создайте файл /etc/protoobp-agent/conf.d/kafka.d/conf.yaml:

    init_config:
      is_jmx: true
      collect_default_metrics: true
      new_gc_metrics: true
    
    instances:
      - host: localhost
        port: 9101
        # user: username       # если на JMX включена аутентификация
        # password: password
        monitor_unlisted_consumer_groups: true
        monitor_all_broker_highwatermarks: true
    
  2. Перезапустите агента: systemctl restart protoobp-agent.

Если агент запускается в виде Docker контейнера

Добавьте autodiscovery-лейблы к контейнеру брокера. В docker-compose.yaml:

labels:
  com.protoobp.ad.check_names: '["kafka"]'
  com.protoobp.ad.init_configs: '[{"is_jmx": true, "collect_default_metrics": true, "new_gc_metrics": true}]'
  com.protoobp.ad.instances: '[{"host": "%%host%%", "port": "9101", "monitor_unlisted_consumer_groups": true, "monitor_all_broker_highwatermarks": true}]'

Если агент запускается в Kubernetes

Используйте annotation-based autodiscovery на Pod’е Kafka. Полностью рабочий пример развёртывания:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
        tags.proto.group/service: "my-kafka"   # тег service для метрик
        tags.proto.group/env: "dev"            # тег env
        tags.proto.group/version: "3.0"        # тег version
      annotations:
        ad.proto.group/kafka.checks: |-
          {
            "kafka": {
              "init_config": {
                "is_jmx": true,
                "collect_default_metrics": true,
                "new_gc_metrics": true
              },
              "instances": [
                {
                  "host": "%%host%%",
                  "port": "9101",
                  "monitor_unlisted_consumer_groups": true,
                  "monitor_all_broker_highwatermarks": true
                }
              ]
            }
          }          
    spec:
      containers:
        - name: kafka
          image: confluentinc/cp-kafka:7.7.0
          ports:
            - name: broker
              containerPort: 9092
            - name: jmx
              containerPort: 9101
          env:
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: KAFKA_JMX_PORT
              value: "9101"
            - name: KAFKA_JMX_HOSTNAME
              value: "$(POD_IP)"
            - name: KAFKA_JMX_OPTS
              value: >-
                -Dcom.sun.management.jmxremote=true
                -Dcom.sun.management.jmxremote.port=9101
                -Dcom.sun.management.jmxremote.rmi.port=9101
                -Dcom.sun.management.jmxremote.authenticate=false
                -Dcom.sun.management.jmxremote.ssl=false
                -Djava.rmi.server.hostname=$(POD_IP)                
            # ... остальные переменные Kafka (KAFKA_PROCESS_ROLES, KAFKA_LISTENERS и т.д.)

Проверка

Убедитесь, что чек запустился и собирает метрики:

docker exec protoobp-agent agent status | grep -A 8 "^    kafka$"

Ожидаемый вывод — status : OK и metric_count > 0 для каждого instance:

kafka
  instance_name : kafka-localhost-9101
  message       : <no value>
  metric_count  : 60
  status        : OK

Полный список найденных JMX-атрибутов:

docker exec protoobp-agent agent jmx list collected

Собираемые метрики

Все метрики Kafka имеют тип gauge — содержат мгновенное значение на момент сбора. Метрики, в имени которых уже стоит суффикс _rate, — это скорость в секунду, рассчитанная самой Kafka и пересланная агентом без дополнительной обработки.

Лейблы

Общие (на JMX-метриках)

Добавляются агентом и ProtoOBP backend’ом:

Лейбл Значение
host Хост, на котором работает агент
instance Имя JMX-экземпляра брокера (например, kafka-localhost-9101)
service Тег service (через com.protoobp.tags.service или POBP_TAGS)
env Тег env
docker_image Полный ref образа контейнера
image_name Имя образа без тега
image_tag Тег образа
short_image Короткое имя образа
git_commit_sha SHA коммита (если задано через label org.opencontainers.image.revision)

JMX-атрибуты MBean

На всех JMX-метриках, кроме kafka_can_connect (см. ниже):

Лейбл Значение
jmx_domain JMX-домен (kafka.server, kafka.network, kafka.controller, …)
type Тип MBean (BrokerTopicMetrics, RequestMetrics, …)
name Имя атрибута MBean (MessagesInPerSec, TotalTimeMs, …)

kafka_can_connect относится к самому JMX-подключению, MBean у неё нет — вместо jmx_domain/type/name она несёт env и jmx_server.

Специфичные для Kafka

Лейбл Где появляется Пример
topic kafka_topic_*, kafka_broker_offset, kafka_consumer_* orders
partition kafka_broker_offset, kafka_consumer_offset, kafka_consumer_lag 0
consumer_group kafka_consumer_offset, kafka_consumer_lag payments-worker
request kafka_request_*_time_* Produce / Fetch
delayedOperation kafka_request_*_purgatory_size Fetch / Produce
clientId kafka_replication_max_lag Replica
listener kafka_server_socket_connection_count PLAINTEXT
networkProcessor kafka_server_socket_connection_count 0

Метрики Admin API

kafka_broker_offset, kafka_consumer_offset и kafka_consumer_lag собираются через Kafka Admin API в обход JMX и несут только узкий набор: host, topic, partition и consumer_group (последний — у *_offset и *_lag). Базовых JMX-меток и enrichment-а контейнера у них нет — учитывайте это при джоинах с другими сериями в PromQL.

Здоровье JMX-подключения

Имя метрики Единица Описание
kafka_can_connect bool 1 — JMX-подключение к брокеру установлено, 0 — не удалось

Метрики уровня брокера

Несут базовый набор JMX-меток (host, instance, jmx_domain, type, name).

Имя метрики Единица В единицу Описание
kafka_messages_in_rate message second Скорость поступления сообщений на брокер
kafka_net_bytes_in_rate byte second Скорость входящего сетевого трафика на брокере
kafka_net_bytes_out_rate byte second Скорость исходящего сетевого трафика с брокера
kafka_net_bytes_rejected_rate byte second Скорость отклонённых байт (не прошедших валидацию)
kafka_log_flush_rate_rate flush second Частота принудительных сбросов журнала на диск
kafka_net_processor_avg_idle_pct_rate fraction Доля простоя сетевых процессоров (0–1). Низкие значения = насыщение сети
kafka_request_handler_avg_idle_pct_rate fraction Доля простоя потоков-обработчиков запросов (0–1). Низкие значения = насыщение CPU
kafka_request_channel_queue_size request Размер очереди входящих запросов в канале брокера
kafka_server_socket_connection_count connection Количество активных сетевых соединений (доп. метки listener, networkProcessor)

Метрики уровня топика

Несут лейбл topic дополнительно к базовому JMX-набору.

Имя метрики Единица В единицу Описание
kafka_topic_messages_in_rate message second Скорость поступления сообщений в топик
kafka_topic_net_bytes_in_rate byte second Скорость входящего трафика в топик
kafka_topic_net_bytes_out_rate byte second Скорость исходящего трафика из топика

Offset и consumer lag (Kafka Admin API)

Несут лейблы host, topic, partition (плюс consumer_group для двух последних) — JMX-атрибуты не применяются.

Имя метрики Единица Описание
kafka_broker_offset offset Текущий log end offset раздела топика на брокере
kafka_consumer_offset offset Последнее зафиксированное смещение группы консьюмеров для раздела
kafka_consumer_lag message Отставание группы от лидера: kafka_broker_offsetkafka_consumer_offset

Метрики репликации

Несут базовый набор JMX-меток.

Имя метрики Единица В единицу Описание
kafka_replication_active_controller_count controller Активных контроллеров на брокере (в здоровом кластере сумма по брокерам = 1)
kafka_replication_leader_count partition Разделов, для которых брокер является лидером
kafka_replication_partition_count partition Всего разделов, обслуживаемых брокером
kafka_replication_offline_partitions_count partition Разделов без активного лидера
kafka_replication_under_replicated_partitions partition Недореплицированных разделов на брокере
kafka_replication_under_min_isr_partition_count partition Разделов, где число реплик в ISR < min.insync.replicas
kafka_replication_max_lag message Максимальное отставание follower-реплики от лидера (доп. метка clientId)
kafka_replication_isr_expands_rate expansion second Частота расширений набора синхронных реплик
kafka_replication_isr_shrinks_rate shrink second Частота сокращений набора синхронных реплик
kafka_replication_leader_elections_rate election second Частота плановых выборов лидеров
kafka_replication_unclean_leader_elections_rate election second Частота «нечистых» выборов лидера — приводят к потере данных

Обработка запросов

Доп. метка request (Produce / FetchConsumer / FetchFollower / Metadata / UpdateMetadata) — на метриках времени; delayedOperation — на purgatory-метриках.

Имя метрики Единица В единицу Описание
kafka_request_produce_time_avg millisecond Среднее время обработки produce-запросов
kafka_request_produce_time_99percentile millisecond 99-й перцентиль produce-запросов
kafka_request_produce_failed_rate request second Неудачных produce-запросов в секунду
kafka_request_producer_request_purgatory_size request Produce-запросов в purgatory (доп. метка delayedOperation)
kafka_request_fetch_consumer_time_avg millisecond Среднее время fetch-запросов от консьюмеров
kafka_request_fetch_consumer_time_99percentile millisecond 99-й перцентиль fetch от консьюмеров
kafka_request_fetch_follower_time_avg millisecond Среднее время fetch от follower-брокеров
kafka_request_fetch_follower_time_99percentile millisecond 99-й перцентиль fetch от follower-брокеров
kafka_request_fetch_failed_rate request second Неудачных fetch-запросов в секунду
kafka_request_fetch_request_purgatory_size request Fetch-запросов в purgatory (доп. метка delayedOperation)
kafka_request_metadata_time_avg millisecond Среднее время metadata-запросов
kafka_request_metadata_time_99percentile millisecond 99-й перцентиль metadata-запросов
kafka_request_update_metadata_time_avg millisecond Среднее время UpdateMetadata-запросов
kafka_request_update_metadata_time_99percentile millisecond 99-й перцентиль UpdateMetadata-запросов

Сессии и ZooKeeper

Несут базовый набор JMX-меток.

Имя метрики Единица В единицу Описание
kafka_session_fetch_count session Открытых fetch-сессий на брокере
kafka_session_fetch_eviction session Вытесненных fetch-сессий
kafka_session_zookeeper_disconnect_rate event second Разрывов сессии с ZooKeeper
kafka_session_zookeeper_expire_rate event second Истечений сессии с ZooKeeper
kafka_session_zookeeper_readonly_rate event second Переходов сессии ZooKeeper в режим read-only
kafka_session_zookeeper_sync_rate event second Синхронизаций сессии с ZooKeeper

Ключевые метрики для дашбордов и алертов

Доступность брокера

  • kafka_can_connect == 0 — алерт: JMX недоступен (агент не получает метрики).
  • kafka_replication_active_controller_countsum() != 1 по кластеру = split brain или нет контроллера.
  • kafka_replication_offline_partitions_count > 0 — алерт: разделы без лидера.
  • kafka_replication_unclean_leader_elections_rate > 0 — алерт: была потеря данных.

Целостность реплик

  • kafka_replication_under_replicated_partitions > 0 — деградация репликации, дашборд per-broker.
  • kafka_replication_under_min_isr_partition_count > 0 — продюсеры с acks=all начинают получать ошибки записи.
  • kafka_replication_max_lag — растёт = follower не успевает за лидером.

Производительность брокера

  • kafka_messages_in_rate, kafka_net_bytes_in_rate, kafka_net_bytes_out_rate — основные графики throughput.
  • kafka_request_produce_time_99percentile, kafka_request_fetch_consumer_time_99percentile — латентность запросов.
  • kafka_net_processor_avg_idle_pct_rate < 0.3 или kafka_request_handler_avg_idle_pct_rate < 0.3 — алерт: насыщение сетевых/обработчиков.
  • kafka_request_channel_queue_size — растёт = бэклог запросов.

Consumer lag

  • kafka_consumer_lag — основной SLI для консьюмеров: разбивка по consumer_group и topic, алерт по абсолютному порогу или росту через окно.
  • kafka_consumer_offset без движения при ненулевом kafka_topic_messages_in_rate = группа замерла.

Топики

  • kafka_topic_messages_in_rate, kafka_topic_net_bytes_in_rate — топ топиков по нагрузке, обнаружение всплесков.