Learn about the Wavefront Kafka Integration.

Kafka Integration

Kafka is a distributed streaming platform. By setting up this integration, you can send Kafka metrics into Wavefront.

  1. Apache Kafka: This explains the installation and configuration of Telegraf to send Kafka metrics into Wavefront. Telegraf is a light-weight server process capable of collecting, processing, aggregating, and sending metrics to a Wavefront proxy.

  2. Kafka on Kubernetes: This explains the configuration of Wavefront Collector for Kubernetes to scrape Kafka metrics using auto-discovery.

In addition to setting up the metrics flow, this integration also installs dashboards:

  • Apache Kafka
  • Kafka on Kubernetes

Here’s the screenshot of Kafka on Kubernetes dashboard displaying Kafka metrics:

images/kafka_k8s_dashboard.png

To see a list of the metrics for this integration, select the integration from https://github.com/influxdata/telegraf/tree/master/plugins/inputs.

Kafka Setup

Use the instructions on this page for monitoring:

  • Apache Kafka - Standalone
  • Kafka on Kubernetes

Apache Kafka

Step 1. Install the Telegraf Agent

This integration uses the Jolokia input plugin for Telegraf to get the Kafka metrics via JMX. If you’ve already installed Telegraf on your servers, you can skip to Step 2.

Log in to your Wavefront instance and follow the instructions in the Setup tab to install Telegraf and a Wavefront proxy in your environment. If a proxy is already running in your environment, you can select that proxy and the Telegraf install command connects with that proxy. Sign up for a free trial to check it out!

Step 2. Download and Set up Jolokia

Jolokia is a JVM agent that exposes JMX data as JSON on an HTTP port (8778 by default).

  1. Download the latest version of the Jolokia JVM-Agent from here.
  2. Rename downloaded Jar file to jolokia-agent.jar.
  3. Save jolokia-agent.jar on your Kafka server in /opt/kafka/libs or any location accessible to Kafka.
  4. Configure Kafka to use Jolokia:
    1. Add the following snippet to kafka-server-start.sh:
       export JMX_PORT=9999
       export RMI_HOSTNAME=KAFKA_SERVER_IP_ADDRESS
       export KAFKA_JMX_OPTS="-javaagent:/opt/kafka/libs/jolokia-agent.jar=port=8778,host=$RMI_HOSTNAME -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=$RMI_HOSTNAME -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
      
    2. Restart the Kafka service.
    3. Verify that you can access Jolokia on port 8778 by running:
       curl http://KAFKA_SERVER_IP_ADDRESS:8778/jolokia/version
      

    Jolokia is working if you receive a non-empty JSON response with the Kafka metrics.

Step 3. Configure Jolokia Input Plugin

Create a file called jolokia-kafka.conf in /etc/telegraf/telegraf.d and enter the following:

## Read JMX metrics through Jolokia
 [[inputs.jolokia2_agent]]
   ## An array of Kafka servers URI to gather stats.
   urls = ["http://KAFKA_SERVER_IP_ADDRESS:8778/jolokia"]
   name_prefix = "kafka."

   ## List of metrics collected on above servers
   ## Each metric consists of a name, a jmx path and
   ## optionally, a list of fields to collect.
   ## This collects all heap memory usage metrics.
   [[inputs.jolokia2_agent.metric]]
     name = "heap_memory_usage"
     mbean  = "java.lang:type=Memory"
     paths = ["HeapMemoryUsage"]

   ## This collects thread counts metrics.
   [[inputs.jolokia2_agent.metric]]
     name = "thread_count"
     mbean  = "java.lang:type=Threading"
     paths = ["TotalStartedThreadCount","ThreadCount","DaemonThreadCount","PeakThreadCount"]

   ## This collects garbage collection metrics.
   [[inputs.jolokia2_agent.metric]]
     name = "garbage_collector"
     mbean  = "java.lang:type=GarbageCollector,name=*"
     paths = ["CollectionCount","CollectionTime"]
     tag_keys = ["name"]

   # Kafka Server Broker Topic Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "server_brokertopics_messagesinpersec"
     mbean  = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec"
   [[inputs.jolokia2_agent.metric]]
     name = "server_brokertopics_bytesinpersec"
     mbean  = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec"
   [[inputs.jolokia2_agent.metric]]
     name = "server_brokertopics_bytesoutpersec"
     mbean  = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec"

   # Kafka Server Request Handler Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "server_requesthandler_avgidlepct"
     mbean  = "kafka.server:name=RequestHandlerAvgIdlePercent,type=KafkaRequestHandlerPool"

   # Kafka Server Delayed Operation Purgatory Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "server_delayedoperationpugatory_fetch"
     mbean  = "kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch"
   [[inputs.jolokia2_agent.metric]]
     name = "server_delayedoperationpugatory_produce"
     mbean  = "kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce"

   # Kafka Server Replica Fetcher Manager Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "server_replicafetchmanager.maxlag"
     mbean  = "kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica"

   # Kafka Server Replica Manager Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "server_replicamanager_underreplicated"
     mbean  = "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions"
   [[inputs.jolokia2_agent.metric]]
     name = "server_replicamanager_partitioncount"
     mbean  = "kafka.server:type=ReplicaManager,name=PartitionCount"
   [[inputs.jolokia2_agent.metric]]
     name = "server_replicamanager_leadercount"
     mbean  = "kafka.server:type=ReplicaManager,name=LeaderCount"
   [[inputs.jolokia2_agent.metric]]
     name = "server_replicamanager_isrshrinkspersec"
     mbean  = "kafka.server:type=ReplicaManager,name=IsrShrinksPerSec"
   [[inputs.jolokia2_agent.metric]]
     name = "server_replicamanager_isrexpandspersec"
     mbean  = "kafka.server:type=ReplicaManager,name=IsrExpandsPerSec"

   # Kafka Network Request Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "network_requestmetrics_requests_fetch_consumer"
     mbean  = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer"
   [[inputs.jolokia2_agent.metric]]
     name = "network_requestmetrics_requests_fetch_follower"
     mbean  = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower"
   [[inputs.jolokia2_agent.metric]]
     name = "network_requestmetrics_requests_produce"
     mbean  = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce"
   [[inputs.jolokia2_agent.metric]]
     name = "network_requestmetrics_totaltime_fetch_consumer"
     mbean  = "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer"
   [[inputs.jolokia2_agent.metric]]
     name = "network_requestmetrics_totaltime_fetch_follower"
     mbean  = "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower"
   [[inputs.jolokia2_agent.metric]]
     name = "network_requestmetrics_totaltime_produce"
     mbean  = "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce"

   # Kafka Network Processor Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "network_processor_avgidlepct"
     mbean  = "kafka.network:name=NetworkProcessorAvgIdlePercent,type=SocketServer"

   # Kafka Controller Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "controller_activecontrollers"
     mbean  = "kafka.controller:type=KafkaController,name=ActiveControllerCount"
   [[inputs.jolokia2_agent.metric]]
     name = "controller_offlinepartitions"
     mbean  = "kafka.controller:type=KafkaController,name=OfflinePartitionsCount"
   [[inputs.jolokia2_agent.metric]]
     name = "controller_stats_leaderelectionrateandtime"
     mbean  = "kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs"
   [[inputs.jolokia2_agent.metric]]
     name = "controller_stats_uncleanleaderelections"
     mbean  = "kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec"

   # Zookeeper Metrics
   [[inputs.jolokia2_agent.metric]]
     name = "zookeeper_disconnects"
     mbean  = "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"
     paths = ["Count","OneMinuteRate","FiveMinuteRate","FifteenMinuteRate","MeanRate"]
   [[inputs.jolokia2_agent.metric]]
     name = "zookeeper_sync_connects"
     mbean  = "kafka.server:type=SessionExpireListener,name=ZooKeeperSyncConnectsPerSec"
     paths = ["Count","OneMinuteRate","FiveMinuteRate","FifteenMinuteRate","MeanRate"]
   [[inputs.jolokia2_agent.metric]]
     name = "zookeeper_auth_failures"
     mbean  = "kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec"
     paths = ["Count","OneMinuteRate","FiveMinuteRate","FifteenMinuteRate","MeanRate"]
   [[inputs.jolokia2_agent.metric]]
     name = "zookeeper_readonly_connects"
     mbean  = "kafka.server:type=SessionExpireListener,name=ZooKeeperReadOnlyConnectsPerSec"
     paths = ["Count","OneMinuteRate","FiveMinuteRate","FifteenMinuteRate","MeanRate"]
   [[inputs.jolokia2_agent.metric]]
     name = "zookeeper_authentications"
     mbean  = "kafka.server:type=SessionExpireListener,name=ZooKeeperSaslAuthenticationsPerSec"
     paths = ["Count","OneMinuteRate","FiveMinuteRate","FifteenMinuteRate","MeanRate"]
   [[inputs.jolokia2_agent.metric]]
     name = "zookeeper_expires"
     mbean  = "kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec"
     paths = ["Count","OneMinuteRate","FiveMinuteRate","FifteenMinuteRate","MeanRate"]

Note: Replace KAFKA_SERVER_IP_ADDRESS with the Kafka server IP address.

Step 4. Restart Telegraf

Run sudo service telegraf restart to restart your agent.

Kafka on Kubernetes

Note: These instructions are for monitoring Bitnami Kafka.

Prerequisite:

Make sure that Bitnami Kafka with bitnami/kafka-exporter and bitnami/jmx-exporter are deployed on your cluster.

You can use the following command to deploy Bitnami Kafka with kafka-exporter and jmx-exporter:

helm repo add bitnami https://charts.bitnami.com/bitnami

helm install <KAFKA_CLUSTER_NAME> --set metrics.kafka.enabled=true --set metrics.kafka.image.registry=docker.io --set metrics.kafka.image.repository=bitnami/kafka-exporter --set metrics.kafka.image.tag=1.3.1-debian-10-r64 --set metrics.kafka.image.pullPolicy=IfNotPresent bitnami/kafka --set metrics.jmx.enabled=true --set metrics.jmx.image.registry=docker.io --set metrics.jmx.image.repository=bitnami/jmx-exporter --set metrics.jmx.image.tag=0.16.1-debian-10-r17 --set metrics.jmx.image.pullPolicy=IfNotPresent

Configure the Wavefront Collector for Kubernetes

You can configure the Wavefront Collector for Kubernetes to scrape Kafka metrics by using annotation based discovery.

If you do not have the Wavefront Collector for Kubernetes installed on your Kubernetes cluster, follow these instructions to add it to your cluster by using Helm or performing Manual Installation. You can check the status of Wavefront Collector and Proxy if you are already monitoring the Kubernetes cluster on the Setup tab of the Kubernetes integration.

Annotation Based Discovery: By default, both the JMX exporter and Kafka exporter services are annotated with Prometheus scrape and port.

  • Annotate the jmx-metrics service to add the path and prefix kafkajmx..
    kubectl annotate service <KAFKA_CLUSTER_NAME>-jmx-metrics prometheus.io/path=/metrics prometheus.io/prefix=kafkajmx.
    

NOTE: Make sure that auto discovery enableDiscovery: true and annotation based discovery discovery.disable_annotation_discovery: false are enabled in the Wavefront Collector. They should be enabled by default.

Metrics

Metric Name Description
kafka.controller.activecontrollers.Value  
kafka.controller.offlinepartitions.Value  
kafka.controller.stats.leaderelectionrateandtime.* Statistics: 50thPercentile, 75thPercentile, 95thPercentile, 98thPercentile, 999thPercentile, 99thPercentile, Count, FifteenMinuteRate, FiveMinuteRate, Max, Mean, MeanRate, Min, OneMinuteRate, StdDev
kafka.controller.stats.uncleanleaderelections.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.garbage.collector.CollectionCount  
kafka.garbage.collector.CollectionTime  
kafka.heap.memory.usage.HeapMemoryUsage.* Statistics: committed, init, max, used
kafka.network.processor.avgidlepct.Value  
kafka.network.requestmetrics.totaltime.fetch.consumer.* Statistics: 50thPercentile, 75thPercentile, 95thPercentile, 98thPercentile, 999thPercentile, 99thPercentile, Count, Max, Mean, Min, StdDev
kafka.network.requestmetrics.totaltime.fetch.follower.* Statistics: 50thPercentile, 75thPercentile, 95thPercentile, 98thPercentile, 999thPercentile, 99thPercentile, Count, Max, Mean, Min, StdDev
kafka.network.requestmetrics.totaltime.produce.* Statistics: 50thPercentile, 75thPercentile, 95thPercentile, 98thPercentile, 999thPercentile, 99thPercentile, Count, Max, Mean, Min, StdDev
kafka.server.brokertopics.bytesinpersec.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.server.brokertopics.bytesoutpersec.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.server.brokertopics.messagesinpersec.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.server.delayedoperationpugatory.fetch.Value  
kafka.server.delayedoperationpugatory.produce.Value  
kafka.server.replicafetchmanager.maxlag.Value  
kafka.server.replicamanager.isrexpandspersec.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.server.replicamanager.isrshrinkspersec.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.server.replicamanager.leadercount.Value  
kafka.server.replicamanager.partitioncount.Value  
kafka.server.replicamanager.underreplicated.Value  
kafka.server.requesthandler.avgidlepct.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.thread.count.DaemonThreadCount  
kafka.thread.count.PeakThreadCount  
kafka.thread.count.ThreadCount  
kafka.thread.count.TotalStartedThreadCount  
kafka.zookeeper.auth.failures.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.zookeeper.authentications.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.zookeeper.disconnects.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.zookeeper.expires.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.zookeeper.readonly.connects.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate
kafka.zookeeper.sync.connects.* Statistics: Count, FifteenMinuteRate, FiveMinuteRate, MeanRate, OneMinuteRate