Kafka provides several command-line interface (CLI) utilities that are useful for making administrative changes to your clusters. The tools are implemented in Java classes, and a set of scripts are provided to call those classes properly. These tools provide basic functions, but you may find they are lacking for more complex operations. This chapter will describe the tools that are available as part of the Apache Kafka open source project. More information about advanced tools that have been developed in the community, outside of the core project, can be found on the Apache Kafka website.
While Apache Kafka implements authentication and authorization to control topic operations, most cluster operations are not yet supported. This means that these CLI tools can be used without any authentication required, which will allow operations such as topic changes to be executed with no security check or audit. This functionality is under development and should be added soon.
The kafka-topics.sh
tool provides easy access to most topic operations (configuration changes have been deprecated and moved to the kafka-configs.sh
tool). It allows you to create, modify, delete, and list information about topics in the cluster. To use this command, you are required to provide the Zookeeper connect string for the cluster with the --zookeeper
argument. In the examples that follow, the Zookeeper connect string is assumed to be zoo1.example.com:2181/kafka-cluster
.
Many of the command-line tools for Kafka operate directly on the metadata stored in Zookeeper rather than connecting to the brokers themselves. For this reason, it is important to make sure the version of the tools that you are using matches the version of the brokers in the cluster. The safest approach is to run the tools on the Kafka brokers themselves, using the deployed version.
You need three arguments to create a new topic in a cluster (these arguments must be provided, even though some of them have broker-level defaults configured already):
The name of the topic that you wish to create.
The number of replicas of the topic to maintain within the cluster.
The number of partitions to create for the topic.
It is also possible to explicitly set the replicas for a topic during creation, or set configuration overrides for the topic. Neither of these operations will be covered here. Configuration overrides can be found later in this chapter, and they can be provided to kafka-topics.sh
using the --config
command-line parameter. Partiton reassignment is also covered later in this chapter.
Topic names may contain alphanumeric characters, as well as underscores, dashes, and periods.
It is permitted, but not recommended, to have topic names that start with two underscores. Topics of this form are considered internal topics for the cluster (such as the __consumer_offsets
topic for the consumer group offset storage). It is also not recommended to use both periods and underscores in a single cluster because when the topic names are used in metric names inside Kafka, periods are changed to underscores (e.g., “topic.1” becomes “topic_1” in metrics).
Execute kafka-topics.sh
as follows:
#
kafka-topics.sh --zookeeper <zookeeper connect> --create --topic <string>--replication-factor <integer> --partitions <integer>
#
The command will cause the cluster to create a topic with the specified name and number of partitions. For each partition, the cluster will select the specified number of replicas appropriately. This means that if the cluster is set up for rack-aware replica assignment, the replicas for each partition will be in separate racks. If rack-aware assignment is not desired, specify the --disable-rack-aware
command-line argument.
For example, create a topic named “my-topic” with eight partitions that have two replicas each:
#
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create--topic my-topic --replication-factor 2 --partitions 8
Created topic "my-topic".
#
When using this script in automation, you may want to use the --if-not-exists
argument, which will not return an error if the topic already exists.
It is sometimes necessary to increase the number of partitions for a topic. Partitions are the way topics are scaled and replicated across a cluster, and the most common reason to increase the partition count is to spread out a topic further, or decrease the throughput for a single partition. Topics may also be increased if a consumer needs to expand to run more copies in a single group, as a partition can only be consumed by a single member in a group.
Topics that are produced with keyed messages can be very difficult to add partitions to from a consumer’s point of view. This is because the mapping of keys to partitions will change when the number of partitions is changed. For this reason, it is advisable to set the number of partitions for a topic that will contain keyed messages once, when the topic is created, and avoid resizing the topic.
While an --if-exists
argument is provided for the --alter
command, using it is not recommended. Using this argument will cause the command to not return an error if the topic being changed does not exist. This can mask problems where a topic does not exist that should have been created.
For example, increase the number of partitions for a topic named “my-topic” to 16:
#
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster--alter --topic my-topic --partitions 16
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
#
It is not possible to reduce the number of partitions for a topic. The reason this is not supported is because deleting a partition from a topic would cause part of the data in that topic to be deleted as well, which would be inconsistent from a client point of view. In addition, trying to redistribute the data to remaining partitions would be difficult and result in out-of-order messages. Should you need to reduce the number of partitions, you will need to delete the topic and recreate it.
Even a topic with no messages in it uses cluster resources, including disk space, open filehandles, and memory. If a topic is no longer needed, it can be deleted in order to free up these resources. In order to perform this action, the brokers in the cluster must have been configured with the delete.topic.enable
option set to true. If this option has been set to false, then the request to delete the topic will be ignored.
Deleting a topic will also delete all its messages. This is not a reversible operation, so make sure it executed carefully.
For example, delete the topic named “my-topic”:
#
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster--delete --topic my-topic
Topic my-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set
to true.
#
Topic deletion is an asynchronous operation. This means that the preceding command will mark a topic for deletion, but the deletion will not happen immediately. The controller will notify the brokers of the pending deletion as soon as possible (after existing controller tasks complete), and the brokers will then invalidate the metadata for the topic and delete the files from disk. It is highly recommended that operators not delete more than one or two topics at a time, and give those ample time to complete before deleting other topics, due to limitations in the way the controller executes these operations.
The topics tool can list all topics in a cluster. The list is formatted with one topic per line, in no particular order.
For example, list topics in the cluster:
#
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster--list
my-topic - marked for deletion
other-topic
#
It is also possible to get detailed information on one or more topics in the cluster. The output includes the partition count, topic configuration overrides, and a listing of each partition with its replica assignments. This can be limited to a single topic by providing a --topic
argument to the command.
For example, describe all topics in the cluster:
#
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describeTopic:other-topic PartitionCount:8 ReplicationFactor:2 Configs:
Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr: 0,1
#
The describe
command also has several useful options for filtering the output. These can be helpful for diagnosing cluster issues. For each of these, do not specify the --topic
argument (because the intention is to find all topics or partitions in a cluster that match the criteria). These options will not work with the list
command (detailed in the previous section).
In order to find all topics that have configuration overrides, use the --topics-with-overrides
argument. This will describe only the topics that have configurations that differ from the cluster defaults.
There are two filters used to find partitions that have problems. The --under-replicated-partitions
argument will show all partitions where one or more of the replicas for the partition are not in-sync with the leader. The --unavailable-partitions
argument shows all partitions without a leader. This is a more serious situation that means that the partition is currently offline and unavailable for producer or consumer clients.
For example, show under-replicated partitions:
#
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster--describe --under-replicated-partitions
Topic: other-topic Partition: 2 Leader: 0 Replicas: 1,0
Isr: 0
Topic: other-topic Partition: 4 Leader: 0 Replicas: 1,0
Isr: 0
#
Consumer groups in Kafka are managed in two places: for older consumers, the information is maintained in Zookeeper, whereas for the new consumer it is maintained within the Kafka brokers. The kafka-consumer-groups.sh
tool can be used to list and describe both types of groups. It can also be used to delete consumer groups and offset information, but only for groups running under the old consumer (maintained in Zookeeper). When working with older consumer groups, you will access the Kafka cluster by specifying the --zookeeper
command-line parameter for the tool. For new consumer groups, you will need to use the --bootstrap-server
parameter with the hostname and port number of the Kafka broker to connect to instead.
To list consumer groups using the older consumer clients, execute with the --zookeeper
and --list
parameters. For the new consumer, use the --bootstrap-server
, --list
, and --new-consumer
parameters.
For example, list old consumer groups:
#
kafka-consumer-groups.sh --zookeeperzoo1.example.com:2181/kafka-cluster --list
console-consumer-79697
myconsumer
#
For example, list new consumer groups:
#
kafka-consumer-groups.sh --new-consumer --bootstrap-serverkafka1.example.com:9092 --list
kafka-python-test
my-new-consumer
#
For any group listed, you can get more details by changing the --list
parameter to --describe
and adding the --group
parameter. This will list all the topics that the group is consuming, as well as the offsets for each topic partition.
For example, get consumer group details for the old consumer group named “testgroup”:
#
kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster--describe --group testgroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
myconsumer my-topic 0 1688 1688 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 1 1418 1418 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 2 1314 1315 1 myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 3 2012 2012 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 4 1089 1089 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 5 1429 1432 3 myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 6 1634 1634 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 7 2261 2261 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0
#
Table 9-1 shows the fields provided in the output.
Field | Description |
---|---|
GROUP |
The name of the consumer group. |
TOPIC |
The name of the topic being consumed. |
PARTITION |
The ID number of the partition being consumed. |
CURRENT-OFFSET |
The next offset to be consumed by the consumer group for this topic partition. This is the position of the consumer within the partition. |
LOG-END-OFFSET |
The current high-water mark offset from the broker for the topic partition. This is the offset of the next message to be produced to this partition. |
LAG |
The difference between the consumer Current-Offset and the broker Log-End-Offset for this topic partition. |
OWNER |
The member of the consumer group that is currently consuming this topic partition. This is an arbitrary string provided by the consumer client and does not necessarily contain the hostname of the consumer. |
Deletion of consumer groups is only supported for old consumer clients. This will remove the entire group from Zookeeper, including all stored offsets for all topics that the group is consuming. In order to perform this action, all consumers in the group should be shut down. If this step is not performed first, you may have undefined behavior from the consumer as the Zookeeper metadata for the group will be removed while it is using it.
For example, delete the consumer group named “testgroup”:
#
kafka-consumer-groups.sh --zookeeperzoo1.example.com:2181/kafka-cluster --delete --group testgroup
Deleted all consumer group information for group testgroup in
zookeeper.
#
It is also possible to use the same command to delete offsets for a single topic that the group is consuming without deleting the entire group. Again, it is recommended that the consumer group be stopped, or configured to not consume the topic to be deleted, before performing this action.
For example, delete the offsets for the topic “my-topic” from the consumer group named “testgroup”:
#
kafka-consumer-groups.sh --zookeeperzoo1.example.com:2181/kafka-cluster --delete --group testgroup
--topic my-topic
Deleted consumer group information for group testgroup topic
my-topic in zookeeper.
#
In addition to displaying and deleting the offsets for a consumer group using the old consumer client, it is also possible to retrieve the offsets and store new offsets in a batch. This is useful for resetting the offsets for a consumer when there is a problem that requires messages to be reread, or for advancing offsets past a message that the consumer is having a problem with (e.g., if there is a badly formatted message that the consumer cannot handle).
There is currently no tool available to manage the offsets for a consumer client that is committing offsets to Kafka. This function is only available for consumers that are committing offsets to Zookeeper. In order to manage offsets for a group that is committing to Kafka, you must use the APIs available in the client to commit offsets for the group.
There is no named script to export offsets, but we are able to use the kafka-run-class.sh
script to execute the underlying Java class for the tool in the proper environment. Exporting offsets will produce a file that contains each topic partition for the group and its offsets in a defined format that the import tool can read. The file that is created will have one topic partition per line, with the following format:
/consumers/GROUPNAME/offsets/topic/TOPICNAME/PARTITIONID:OFFSET.
For example, export the offsets for the consumer group named “testgroup” to a file named offsets:
#
kafka-run-class.sh kafka.tools.ExportZkOffsets--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
#
cat offsets/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
#
The import offset tool is the opposite of exporting. It takes the file produced by exporting offsets in the previous section and uses it to set the current offsets for the consumer group. A common practice is to export the current offsets for the consumer group, make a copy of the file (such that you preserve a backup), and edit the copy to replace the offsets with the desired values. Note that for the import command, the --group
option is not used. This is because the consumer group name is embedded in the file to be imported.
Before performing this step, it is important that all consumers in the group are stopped. They will not read the new offsets if they are written while the consumer group is active. The consumers will just overwrite the imported offsets.
For example, import the offsets for the consumer group named “testgroup” from a file named offsets:
#
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnectzoo1.example.com:2181/kafka-cluster --input-file offsets
#
Configurations can be overridden while the cluster is running for topics and for client quotas. There is the intention to add more dynamic configurations in the future, which is why these changes have been put in a separate CLI tool, kafka-configs.sh
. This allows you to set configurations for specific topics and client IDs. Once set, these configurations are permanent for the cluster. They are stored in Zookeeper, and they are read by each broker when it starts. In tools and documentation, dynamic configurations like this are referred to as per-topic or per-client configurations, as well as overrides.
As with the previous tools, you are required to provide the Zookeeper connect string for the cluster with the --zookeeper
argument. In the examples that follow, the Zookeeper connect string is assumed to be zoo1.example.com:2181/kafka-cluster
.
There are many configurations that apply to topics that can be changed for individual topics in order to accommodate different use cases within a single cluster. Most of these configurations have a default specified in the broker configuration, which will apply unless an override is set.
The format of the command to change a topic configuration is:
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name <topic name>
--add-config <key>=<value>[,<key>=<value>...]
Table 9-2 shows the valid configurations (keys) for topics.
Configuration Key | Description |
---|---|
|
If set to |
|
The compression type used by the broker when writing message batches for this topic to disk. Current values are |
|
How long, in milliseconds, deleted tombstones will be retained for this topic. Only valid for log compacted topics. |
|
How long, in milliseconds, to wait before deleting log segments and indices for this topic from disk. |
|
How many messages are received before forcing a flush of this topic’s messages to disk. |
|
How long, in milliseconds, before forcing a flush of this topic’s messages to disk. |
|
How many bytes of messages can be produced between entries in the log segment’s index. |
|
The maximum size of a single message for this topic, in bytes. |
|
The message format version that the broker will use when writing messages to disk. Must be a valid API version number (e.g., “0.10.0”). |
|
The maximum allowed difference, in milliseconds, between the message timestamp and the broker timestamp when the message is received. This is only valid if the |
|
Which timestamp to use when writing messages to disk. Current values are |
|
How frequently the log compactor will attempt to compact partitions for this topic, expressed as a ratio of the number of uncompacted log segments to the total number of log segments. Only valid for log compacted topics. |
|
The minimum number of replicas that must be in-sync for a partition of the topic to be considered available. |
|
If set to |
|
The amount of messages, in bytes, to retain for this topic. |
|
How long messages should be retained for this topic, in milliseconds. |
|
The amount of messages, in bytes, that should be written to a single log segment in a partition. |
|
The maximum size, in bytes, of a single log segment index. |
|
A maximum number of milliseconds that is randomized and added to |
|
How frequently, in milliseconds, the log segment for each partition should be rotated. |
|
If set to |
For example, set the retention for the topic named “my-topic” to 1 hour (3,600,000 ms):
#
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster--alter --entity-type topics --entity-name my-topic --add-config
retention.ms=3600000
Updated config for topic: "my-topic".
#
For Kafka clients, the only configurations that can be overridden are the producer and consumer quotas. These are both a rate, in bytes per second, that all clients with the specified client ID are allowed to either produce or consume on a per-broker basis. This means that if you have five brokers in a cluster, and you specify a producer quota of 10 MB/sec for a client, that client will be allowed to produce 10 MB/sec on each broker at the same time for a total of 50 MB/sec.
The client ID is not necessarily the same as the consumer group name. Consumers can set their own client ID, and you may have many consumers that are in different groups that specify the same client ID. It is considered a best practice to set the client ID for each consumer group to something unique that identifies that group. This allows a single consumer group to share a quota, and it makes it easier to identify in logs what group is responsible for requests.
The format of the command to change a client configuration is:
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type clients --entity-name <client ID>
--add-config <key>=<value>[,<key>=<value>...]
Table 9-3 shows the configurations (keys) for clients.
Configuration Key | Description |
---|---|
|
The amount of messages, in bytes, that a single client ID is allowed to produce to a single broker in one second. |
|
The amount of messages, in bytes, that a single client ID is allowed to consume from a single broker in one second. |
All configuration overrides can be listed using the command-line tool. This will allow you to examine the specific configuration for a topic or client. Similar to other tools, this is done using the --describe
command.
For example, show all configuration overrides for the topic named “my-topic”:
#
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster--describe --entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000,segment.ms=3600000
#
The configuration description will only show overrides—it does not include the cluster default configurations. Currently, there is no way to dynamically discover, either via Zookeeper or the Kafka protocol, the configuration of the brokers themselves. This means that when using this tool to discover topic or client settings in automation, the tool must have separate knowledge of the cluster default configuration.
Dynamic configurations can be removed entirely, which will cause the entity to revert back to the cluster defaults. To delete a configuration override, use the --alter
command along with the --delete-config
parameter.
For example, delete a configuration override for retention.ms
for a topic named “my-topic”:
#
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster--alter --entity-type topics --entity-name my-topic
--delete-config retention.ms
Updated config for topic: "my-topic".
#
The Kafka tools contain two scripts for working with the management of partitions—one allows for the reelection of leader replicas, and the other is a low-level utility for assigning partitions to brokers. Together, these tools can assist with the proper balancing of message traffic within a cluster of Kafka brokers.
As described in Chapter 6, partitions can have multiple replicas for reliability. However, only one of these replicas can be the leader for the partition, and all produce and consume operations happen on that broker. This is defined by Kafka internals as the first in-sync replica in the replica list, but when a broker is stopped and restarted, it does not resume leadership of any partitions automatically.
There is a broker configuration for automatic leader rebalancing, but it is not recommended for production use. There are significant performance impacts caused by the automatic balancing module, and it can cause a lengthy pause in client traffic for larger clusters.
One way to cause brokers to resume leadership is to trigger a preferred replica election. This tells the cluster controller to select the ideal leader for partitions (the first replica in the list of replicas that is in sync). The operation is generally nonimpacting, as the clients can track leadership changes automatically. This operation can be manually triggered using the kafka-preferred-replica-election.sh
utility.
For example, start a preferred replica election for all topics in a cluster with one topic that has eight partitions:
#
kafka-preferred-replica-election.sh --zookeeperzoo1.example.com:2181/kafka-cluster
Successfully started preferred replica election for partitions
Set([my-topic,5], [my-topic,0], [my-topic,7], [my-topic,4],
[my-topic,6], [my-topic,2], [my-topic,3], [my-topic,1])
#
For clusters with a large number of partitions, it is possible that a single preferred replica election will not be able to run. The request must be written to a Zookeeper znode within the cluster metadata, and if the request is larger than the size for a znode (by default, 1 MB), it will fail. In this case, you will need to create a file that contains a JSON object listing the partitions to elect for and break the request into multiple steps. The format for the JSON file is:
{
"partitions"
:
[
{
"partition"
:
1
,
"topic"
:
"foo"
},
{
"partition"
:
2
,
"topic"
:
"foobar"
}
]
}
For example, start a preferred replica election with a specified list of partitions in a file named “partitions.json”:
#
kafka-preferred-replica-election.sh --zookeeperzoo1.example.com:2181/kafka-cluster --path-to-json-file
partitions.json
Successfully started preferred replica election for partitions
Set([my-topic,1], [my-topic,2], [my-topic,3])
#
From time to time, it may be necessary to change the replica assignments for a partition. Some examples of when this might be needed are:
If a topic’s partitions are not balanced across the cluster, causing uneven load on brokers
If a broker is taken offline and the partition is under-replicated
If a new broker is added and needs to receive a share of the cluster load
The kafka-reassign-partitions.sh
can be used to perform this operation. This tool must be used in at least two steps. The first step uses a broker list and a topic list to generate a set of moves. The second step executes the moves that were generated. There is an optional third step that uses the generated list to verify the progress or completion of the partition reassignments.
To generate a set of partition moves, you must create a file that contains a JSON object listing the topics. The JSON object is formatted as follows (the version number is currently always 1):
{
"topics"
:
[
{
"topic"
:
"foo"
},
{
"topic"
:
"foo1"
}
],
"version"
:
1
}
For example, generate a set of partition moves to move the topics listed in the file “topics.json” to the brokers with IDs 2 and 3:
#
kafka-reassign-partitions.sh --zookeeperzoo1.example.com:2181/kafka-cluster --generate
--topics-to-move-json-file topics.json --broker-list 2,3
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"my-topic","partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"replicas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"my-topic","partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]},{"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"my-topic","partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"replicas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"my-topic","partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]},{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[2,3]},{"topic":"my-topic","partition":10,"replicas":[3,2]},{"topic":"my-topic","partition":1,"replicas":[2,3]},{"topic":"my-topic","partition":4,"replicas":[3,2]},{"topic":"my-topic","partition":7,"replicas":[2,3]},{"topic":"my-topic","partition":6,"replicas":[3,2]},{"topic":"my-topic","partition":15,"replicas":[2,3]},{"topic":"my-topic","partition":0,"replicas":[3,2]},{"topic":"my-topic","partition":3,"replicas":[2,3]},{"topic":"my-topic","partition":11,"replicas":[2,3]},{"topic":"my-topic","partition":8,"replicas":[3,2]},{"topic":"my-topic","partition":12,"replicas":[3,2]},{"topic":"my-topic","partition":13,"replicas":[2,3]},{"topic":"my-topic","partition":2,"replicas":[3,2]},{"topic":"my-topic","partition":14,"replicas":[3,2]},{"topic":"my-topic","partition":9,"replicas":[2,3]}]}
#
The broker list is provided on the tool command line as a comma-separated list of broker IDs. The tool will output, on standard output, two JSON objects describing the current partition assignment for the topics and the proposed partition assignment. The format of these JSON objects is:
{"partitions": [{"topic": "my-topic", "partition": 0, "replicas": [1,2] }], "version":_1_}
. The proposed partition assignment is created to distribute the specified topics evenly across all brokers specified.
The first JSON object can be saved in case the reassignment needs to be reverted. The second JSON object—the one that shows the proposed assignment—should be saved to a new file. This file is then provided back to the kafka-reassign-partitions.sh
tool for the second step. It is possible to skip the first step, if a specific partition assignment is desired (such as when the replication factor for a partition is being changed, as described later in this chapter). In that case, the proposed assignment JSON can be manually created used the same format.
To execute the proposed partition reassignment from the file “reassign.json”, run the following command:
#
kafka-reassign-partitions.sh --zookeeperzoo1.example.com:2181/kafka-cluster --execute
--reassignment-json-file reassign.json
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"my-topic","partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"replicas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"my-topic","partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]},{"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"my-topic","partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"replicas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"my-topic","partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]},{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[2,3]},{"topic":"my-topic","partition":0,"replicas":[3,2]},{"topic":"my-topic","partition":7,"replicas":[2,3]},{"topic":"my-topic","partition":13,"replicas":[2,3]},{"topic":"my-topic","partition":4,"replicas":[3,2]},{"topic":"my-topic","partition":12,"replicas":[3,2]},{"topic":"my-topic","partition":6,"replicas":[3,2]},{"topic":"my-topic","partition":11,"replicas":[2,3]},{"topic":"my-topic","partition":10,"replicas":[3,2]},{"topic":"my-topic","partition":9,"replicas":[2,3]},{"topic":"my-topic","partition":2,"replicas":[3,2]},{"topic":"my-topic","partition":14,"replicas":[3,2]},{"topic":"my-topic","partition":3,"replicas":[2,3]},{"topic":"my-topic","partition":1,"replicas":[2,3]},{"topic":"my-topic","partition":15,"replicas":[2,3]},{"topic":"my-topic","partition":8,"replicas":[3,2]}]}
#
This will start the reassignment of the specified partition replicas to the new brokers. The cluster controller performs this action by adding the new replicas to the replica list for each partition (increasing the replication factor). The new replicas will then copy all existing messages for each partition from the current leader. Depending on the size of the partitions on disk, this can take a significant amount of time as the data is copied across the network to the new replicas. Once replication is complete, the controller removes the old replicas from the replica list (reducing the replication factor to the original size).
When removing many partitions from a single broker, such as if that broker is being removed from the cluster, it is a best practice to shut down and restart the broker before starting the reassignment. This will move leadership for the partitions on that particular broker to other brokers in the cluster (as long as automatic leader elections are not enabled). This can significantly increase the performance of reassignments and reduce the impact on the cluster as the replication traffic will be distributed to many brokers.
While the reassignment is running, and after it is complete, the kafka-reassign-partitions.sh
tool can be used to verify the status of the reassignment. This will show what reassignments are currently in progress, what reassignments have completed, and, if there was an error, what reassignments have failed. In order to do this, you must have the file with the JSON object that was used in the execute step.
For example, verify a running partition reassignment from the file “reassign.json”:
#
kafka-reassign-partitions.sh --zookeeperzoo1.example.com:2181/kafka-cluster --verify
--reassignment-json-file reassign.json
Status of partition reassignment:
Reassignment of partition [my-topic,5] completed successfully
Reassignment of partition [my-topic,0] completed successfully
Reassignment of partition [my-topic,7] completed successfully
Reassignment of partition [my-topic,13] completed successfully
Reassignment of partition [my-topic,4] completed successfully
Reassignment of partition [my-topic,12] completed successfully
Reassignment of partition [my-topic,6] completed successfully
Reassignment of partition [my-topic,11] completed successfully
Reassignment of partition [my-topic,10] completed successfully
Reassignment of partition [my-topic,9] completed successfully
Reassignment of partition [my-topic,2] completed successfully
Reassignment of partition [my-topic,14] completed successfully
Reassignment of partition [my-topic,3] completed successfully
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [my-topic,15] completed successfully
Reassignment of partition [my-topic,8] completed successfully
#
There is an undocumented feature of the partition reassignment tool that will allow you to increase or decrease the replication factor for a partition. This may be necessary in situations where a partition was created with the wrong replication factor, for example if there were not enough brokers available when the topic was created, as the replication factor for existing partitions is not changed even if the default replication factor for the cluster is changed in the configuration. Altering the replication factor for a topic is done by creating a JSON object with the format used in the execute step of partition reassignment that adds or removes broker IDs from the replicas
list, in order to set the replication factor to the desired count. The cluster will complete the reassignment and keep the replication factor at the new size.
For example, consider the current assignment for a topic named “my-topic” with one partition and a replication factor of 1:
{
"partitions"
:
[
{
"topic"
:
"my-topic"
,
"partition"
:
0
,
"replicas"
:
[
1
]
}
],
"version"
:
1
}
Providing the following JSON object in the execute step of partition reassignment will result in the replication factor being increased to 2, by adding a replica on the broker with ID 2. If the broker ID specified does not exist, that broker will be silently dropped from the replica set:
{
"partitions"
:
[
{
"partition"
:
0
,
"replicas"
:
[
1
,
2
],
"topic"
:
"my-topic"
}
],
"version"
:
1
}
Similarly, the replication factor for a partition can be reduced by providing a JSON object with a smaller list of replicas.
If you have to go looking for the specific content of a message, perhaps because you ended up with a “poison pill” message in your topic that the consumer cannot handle, there is a helper tool you can use to decode the log segments for a partition. This will allow you to view individual messages without needing to consume and decode them. The tool takes a comma-separated list of log segment files as an argument and can print out either message summary information or detailed message data.
For example, decode the log segment file named 00000000000052368601.log, showing the message summaries:
#
kafka-run-class.sh kafka.tools.DumpLogSegments --files00000000000052368601.log
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359
...
For example, decode the log segment file named 00000000000052368601.log, showing the message, without the key, as raw bytes:
#
kafka-run-class.sh kafka.tools.DumpLogSegments --files00000000000052368601.log --print-data-log
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321 payload: test message 1
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641 payload: test message 2
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431 payload: test message 3
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359 payload: test message 4
...
It is also possible to use this tool to validate the index file that goes along with a log segment. The index is used for finding messages within a log segment, and if corrupted will cause errors in consumption. Validation is performed whenever a broker starts up in an unclean state (i.e., it was not stopped normally), but it can be performed manually as well. There are two options for checking indices, depending on how much checking you want to do. The option --index-sanity-check
will just check that the index is in a useable state, while --verify-index-only
will check for mismatches in the index without printing out all the index entries.
For example, validate that the index file for the log segment file named 00000000000052368601.log is not corrupted:
#
kafka-run-class.sh kafka.tools.DumpLogSegments --files00000000000052368601.index,00000000000052368601.log
--index-sanity-check
Dumping 00000000000052368601.index
00000000000052368601.index passed sanity check.
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
...
Partition replication works similar to a regular Kafka consumer client: the follower broker starts replicating at the oldest offset and checkpoints the current offset to disk periodically. When replication stops and restarts, it picks up from the last checkpoint. It is possible for previously replicated log segments to get deleted from a broker, and the follower will not fill in the gaps in this case.
To validate that the replicas for a topic’s partitions are the same across the cluster, you can use the kafka-replica-verification.sh
tool for verification. This tool will fetch messages from all the replicas for a given set of topic partitions and check that all messages exist on all replicas. You must provide the tool with a regular expression that matches the topics you wish to validate. If none is provided, all topics are validated. You must also provide an explicit list of brokers to connect to.
The replica verification tool will have an impact on your cluster similar to reassigning partitions, as it must read all messages from the oldest offset in order to verify the replica. In addition, it reads from all replicas for a partition in parallel, so it should be used with caution.
For example, verify the replicas for the topics starting with “my-” on brokers 1 and 2:
#
kafka-replica-verification.sh --broker-listkafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'
2016-11-23 18:42:08,838: verification process is started.
2016-11-23 18:42:38,789: max lag is 0 for partition [my-topic,7]
at offset 53827844 among 10 partitions
2016-11-23 18:43:08,790: max lag is 0 for partition [my-topic,7]
at offset 53827878 among 10 partitions
While working with Apache Kafka, you will often find it is necessary to manually consume messages, or produce some sample messages, in order to validate what’s going on with your applications. There are two utilities provided to help with this: kafka-console-consumer.sh
and kafka-console-producer.sh
. These are wrappers around the Java client libraries that allow you to interact with Kafka topics without having to write an entire application to do it.
While it is possible to write applications that wrap around the console consumer or producer (e.g., to consume messages and pipe them to another application for processing), this type of application is quite fragile and should be avoided. It is difficult to interact with the console consumer in a way that does not lose messages. Likewise, the console producer does not allow for using all features, and properly sending bytes is tricky. It is best to use either the Java client libraries directly, or a third-party client library for other languages that use the Kafka protocol directly.
The kafka-console-consumer.sh
tool provides a means to consume messages out of one or more topics in your Kafka cluster. The messages are printed in standard output, delimited by a new line. By default, it outputs the raw bytes in the message, without the key, with no formatting (using the DefaultFormatter
). The required options for the console consumer are described in the following paragraphs.
It is very important to use a consumer that is the same version as your Kafka cluster. Older console consumers can potentially damage the cluster by interacting with Zookeeper in incorrect ways.
The first option that is required is to specify whether or not to use the new consumer, and for the configuration to point to the Kafka cluster itself. When using the older consumer, the only argument required for this is the --zookeeper
option followed by the connect string for the cluster. From the above examples, this might be --zookeeper
zoo1.example.com:2181/kafka-cluster
. If you are using the new consumer, you must specify both the --new-consumer
flag as well as the --bootstrap-server
option followed by a broker to connect to initially, such as --bootstrap-server kafka1.example.com:9092
.
Next, you must specify the topics to consume. Three options are provided for this: --topic
, --whitelist
, and --blacklist
. One, and only one, may be provided. The --topic
option specifies a single topic to consume. The --whitelist
and --blacklist
options each are followed by a regular expression (remember to properly escape the regular expression so that it is not changed by the shell command line). The whitelist will consume all topics that match the regular expression, whereas the blacklist will consume all topics except those matched by the regular expression.
For example, consume a single topic named “my-topic” using the old consumer:
#
kafka-console-consumer.sh --zookeeperzoo1.example.com:2181/kafka-cluster --topic my-topic
sample message 1
sample message 2
^CProcessed a total of 2 messages
#
The console consumer is terminated by sending a Control-C. In addition to the basic command-line options, it is possible to pass any normal consumer configuration options to the console consumer as well. This can be done in two ways, depending on how many options you need to pass and how you prefer to do it. The first is to provide a consumer configuration file by specifying --consumer.config
CONFIGFILE
, where CONFIGFILE
is the full path to a file that contains the configuration options. The other way is to specify the options on the command line with one or more arguments of the form --consumer-property KEY=VALUE
, where KEY
is the configuration option name and VALUE
is the value to set it to. This can be useful for consumer options like setting the consumer group ID.
There is a --property
command-line option for both the console consumer and the console producer, but this should not be confused with the --consumer-property
and --producer-property
options. The --property
option is only used for passing configurations to the message formatter, and not the client itself.
There are a few other commonly used options for the console consumer that you should know:
--formatter CLASSNAME
Specifies a message formatter class to be used to decode the messages. This defaults to kafka.tools.DefaultFormatter
.
--from-beginning
Consume messages in the topic(s) specified from the oldest offset. Otherwise, consumption starts from the latest offset.
--max-messages NUM
Consume at most NUM
messages before exiting.
--partition NUM
Consume only from the partition with ID NUM
(requires the new consumer).
There are three message formatters available to use besides the default:
kafka.tools.LoggingMessageFormatter
Outputs messages using the logger, rather than standard out. Messages are printed at the INFO level, and include the timestamp, key, and value.
kafka.tools.ChecksumMessageFormatter
Prints only message checksums.
kafka.tools.NoOpMessageFormatter
Consumes messages but does not output them at all.
The kafka.tools.DefaultMessageFormatter
also has several useful options that can be passed using the --property
command-line option:
print.timestamp
Set to “true” to display the timestamp of each message (if available).
print.key
Set to “true” to display the message key in addition to the value.
key.separator
Specify the delimiter character to use between the message key and message value when printing.
line.separator
Specify the delimiter character to use between messages.
key.deserializer
Provide a class name that is used to deserialize the message key before printing.
value.deserializer
Provide a class name that is used to deserialize the message value before printing.
The deserializer classes must implement org.apache.kafka.
common.serialization.Deserializer
and the console consumer will call the toString
method on them to get the output to display. Typically, you would implement these deserializers as a Java class that you would insert into the classpath for the console consumer by setting the CLASSPATH
environment variable before executing kafka_console_consumer.sh
.
It is sometimes useful to see what offsets are being committed for the cluster’s consumer groups. You may want to see if a particular group is committing offsets at all, or how often offsets are being committed. This can be done by using the console consumer to consume the special internal topic called __consumer_offsets
. All consumer offsets are written as messages to this topic. In order to decode the messages in this topic, you must use the formatter class kafka.coordinator.GroupMetadata
Manager$OffsetsMessageFormatter
(in version 0.11, the formatter class name is kafka.coordinator.group.GroupMetadata
Manager$OffsetsMessageFormatter
).
For example, consume a single message from the offsets topic:
#
kafka-console-consumer.sh --zookeeperzoo1.example.com:2181/kafka-cluster --topic __consumer_offsets
--formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessage
Formatter' --max-messages 1
[my-group-name,my-topic,0]::[OffsetMetadata[481690879,NO_METADATA]
,CommitTime 1479708539051,ExpirationTime 1480313339051]
Processed a total of 1 messages
#
Similar to the console consumer, the kakfa-console-producer.sh
tool can be used to write messages into a Kafka topic in your cluster. By default, messages are read one per line, with a tab character separating the key and the value (if no tab character is present, the key is null). As with the console consumer, the producer reads in and produces raw bytes using the default serializer (which is DefaultEncoder
).
You can provide your own class for reading lines in order to do custom things. The class that you create must extend kafka.common.MessageReader
and will be responsible for creating the ProducerRecord
. Specify your class on the command line with the --line-reader
option, and make sure the JAR containing your class is in the classpath.
The console producer requires that two arguments are provided at a minimum. The parameter --broker-list
specifies one or more brokers, as a comma-separated list of hostname:port
entries for your cluster. The other required parameter is the --topic
option to specify the topic that you are producing messages to. When you are done producing, send an end-of-file (EOF) character to close the client. For normal terminals, this is done with a Control-D.
For example, produce two messages to a topic named “my-topic”:
#
kafka-console-producer.sh --broker-listkafka1.example.com:9092,kafka2.example.com:9092 --topic my-topic
sample message 1
sample message 2
^D
#
Just like the console consumer, it is possible to pass any normal producer configuration options to the console producer as well. This can be done in two ways, depending on how many options you need to pass and how you prefer to do it. The first is to provide a producer configuration file by specifying --producer.config CONFIGFILE
, where CONFIGFILE
is the full path to a file that contains the configuration options. The other way is to specify the options on the command line with one or more arguments of the form --producer-property KEY=VALUE
, where KEY
is the configuration option name and VALUE
is the value to set it to. This can be useful for producer options like message-batching configurations (such as linger.ms
or batch.size
).
The console producer has many command-line arguments available for adjusting its behavior. Some of the more useful options are:
--key-serializer CLASSNAME
Specifies a message encoder class to be used to serialize the message key. This defaults to kafka.serializer.DefaultEncoder
.
--value-serializer CLASSNAME
Specifies a message encoder class to be used to serialize the message value. This defaults to kafka.serializer.DefaultEncoder
.
--compression-codec STRING
Specify the type of compression to be used when producing messages. This can be one of none
, gzip
, snappy
, or lz4
. The default value is gzip
.
--sync
Produce messages synchronously, waiting for each message to be acknowledged before sending the next one.
Custom serializers must extend kafka.serializer.Encoder
. This can be used to take the strings from standard input and convert them to an encoding, such as Avro or Protobuf, which is appropriate for the topic.
The kafka.tools.LineMessageReader
class, which is responsible for reading standard input and creating producer records, also has several useful options that can be passed to the console producer using the --property
command-line option:
ignore.error
Set to “false” to throw an exception when parse.key
is set to true
and a key separator is not present. Defaults to true
.
parse.key
Set to false
to always set the key to null. Defaults to true
.
key.separator
Specify the delimiter character to use between the message key and message value when reading. Defaults to a tab character.
When producing messages, the LineMessageReader
will split the input on the first instance of the key.separator
. If there are no characters remaining after that, the value of the message will be empty. If no key separator character is present on the line, or if parse.key
is false, the key will be null.
There is a command-line tool, kafka-acls.sh
, provided for interacting with access controls for Kafka clients. Additional documentation on ACLs and security is provided on the Apache Kafka website.
There are some administrative tasks that are technically possible to do but should not be attempted except in the most extreme situations. Often this is when you are diagnosing a problem and have run out of options, or you have found a specific bug that you need to work around temporarily. These tasks are usually undocumented, unsupported, and pose some amount of risk to your application.
Several of the more common of these tasks are documented here so that in an emergency situation, there is a potential option for recovery. Their use is not recommended under normal cluster operations, and should be considered carefully before being executed.
The operations in this section involve working with the cluster metadata stored in Zookeeper directly. This can be a very dangerous operation, so you must be very careful to not modify the information in Zookeeper directly, except as noted.
Every Kafka cluster has a controller, which is a thread running within one of the brokers. The controller is responsible for overseeing cluster operations, and from time to time it is desirable to forcibly move the controller to a different broker. One such example is when the controller has suffered an exception or other problem that has left it running but not functional. Moving the controller in these situations does not have a high risk, but it is not a normal task and should not be performed regularly.
The broker that is currently the controller registers itself using a Zookeeper node at the top level of the cluster path that is named /controller
. Deleting this Zookeeper node manually will cause the current controller to resign, and the cluster will select a new controller.
The normal operational flow for a partition reassignment is:
Reassignment is requested (Zookeeper node is created).
Cluster controller adds partitions to new brokers being added.
New brokers begin replicating each partition until it is in-sync.
Cluster controller removes the old brokers from the partition replica list.
Because all the reassignments are started in parallel when requested, there is normally no reason to attempt to cancel an in-progress reassignment. One of the exceptions is when a broker fails in the middle of a reassignment and cannot immediately be restarted. This results in a reassignment that will never finish, which precludes starting any additional reassignments (such as to remove partitions from the failed broker and assign them to other brokers). In a case like this, it is possible to make the cluster forget about the existing reassignment.
To remove an in-progress partition reassignment:
Remove the /admin/reassign_partitions
Zookeeper node from the Kafka cluster path.
Force a controller move (see “Moving the Cluster Controller” for more detail).
When removing an in-progress partition move, any partitions that have not yet completed will not go through the step of having the old brokers removed from the replica list. This means that the replication factor for some partitions may be greater than intended. The broker will not allow some admin operations for topics that have partitions with inconsistent replication factors (such as increasing partitions). It is advisable to review the partitions that were still in progress and make sure their replication factor is correct with another partition reassignment.
When using the command-line tools to delete a topic, a Zookeeper node requests that the deletion is created. Under normal circumstances, this is executed by the cluster immediately. However, the command-line tool has no way of knowing whether topic deletion is enabled in the cluster. As a result, it will request deletion of topics regardless, which can result in a surprise if deletion is disabled. It is possible to delete the requests pending for deletion of topics to avoid this.
Topics are requested for deletion by creating a Zookeeper node as a child under /admin/delete_topic
, which is named with the topic name. Deleting these Zookeeper nodes (but not the parent /admin/delete_topic
node) will remove the pending requests.
If you are running a cluster with delete topics disabled, or if you find yourself in need of deleting some topics outside of the normal flow of operations, it is possible to manually delete them from the cluster. This requires a full shutdown of all brokers in the cluster, however, and cannot be done while any of the brokers in the cluster are running.
Modifying the cluster metadata in Zookeeper when the cluster is online is a very dangerous operation, and can put the cluster into an unstable state. Never attempt to delete or modify topic metadata in Zookeeper while the cluster is online.
To delete a topic from the cluster:
Shut down all brokers in the cluster.
Remove the Zookeeper path /brokers/topics/TOPICNAME
from the Kafka cluster path. Note that this node has child nodes that must be deleted first.
Remove the partition directories from the log directories on each broker. These will be named TOPICNAME-NUM
, where NUM
is the partition ID.
Running a Kafka cluster can be a daunting endeavor, with numerous configurations and maintenance tasks to keep the systems running at peak performance. In this chapter, we discussed many of the routine tasks, such as managing topic and client configurations that you will need to handle frequently. We also covered some of the more esoteric tasks that you’ll need for debugging problems, like examining log segments. Finally, we covered a few of the operations that, while not safe or routine, can be used to get you out of a sticky situation. All together, these tools will help you to manage your Kafka cluster.
Of course, managing the cluster is impossible without proper monitoring in place. Chapter 10 will discuss ways to monitor broker and cluster health and operations so you can be sure Kafka is working well (and know when it isn’t). We will also offer best practices for monitoring your clients, including both producers and consumers.