Skip to content

Instantly share code, notes, and snippets.

Last active December 4, 2023 22:10
Show Gist options
  • Save tuxfight3r/cec254c240c5d2654c03f3b067d21af0 to your computer and use it in GitHub Desktop.
Save tuxfight3r/cec254c240c5d2654c03f3b067d21af0 to your computer and use it in GitHub Desktop.
KafkaCat configuration for AWS MSK

KafkaCat Configuration for AWS MSK

Set the below environment variable with the following values

NOTE: Kafkacat is renamed to kcat recently and the config variable should be KCAT_CONFIG for version 1.7 onwards.

# you can export the variable or present the config with -F parameter for kafkacat
export KAFKACAT_CONFIG=/home/tools/persistent/kcat/kafkacat_config

Contents of kafkacat configuration

# cat kafkacat_config


Commands for creating PEM files from JKS Keystore

keytool -importkeystore -srckeystore keystore.jks -srcstoretype JKS -deststoretype PKCS12 -destkeystore keystore.p12
openssl pkcs12 -in keystore.p12 -out cert.pem
openssl pkcs12 -in keystore.p12 -nodes -nocerts -out cert.key.pem

Testing kafkacat

# List metadata for topic
./kafkacat -b $KAFKA_BROKER -L -t test_topic
./kafkacat -b $KAFKA_BROKER -F $KAFKACAT_CONFIG -L -t test_topic

# Consume 2 messages from the beginning
./kafkacat -b $KAFKA_BROKER -C -c2 -t test_topic -f 'Key: %k\nValue: %s\n'

# Consume 2 messages from the end.
./kafkacat -b $KAFKA_BROKER -C -c2 -o-2 -t test_topic -f 'Key: %k\nValue: %s\n'

# Consume 2 avro messages from the beginning
./kafkacat -b $KAFKA_BROKER -s avro -r $KAFKA_SCHEMA_REGISTRY_URL -C -c2 -o-2 -t test_topic -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n'

# Consume 2 avro messages from the end.
./kafkacat -b $KAFKA_BROKER -s avro -r $KAFKA_SCHEMA_REGISTRY_URL -C -c2 -o-2 -t test_topic -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n'

Kafkacat basic functions

export KCAT_CONFIG=/usr/local/kafkacat/
function kcat() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} "$@" ;}
function kcat_query_topic() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -L -t $1 ;}
function kcat_read_between_timestamps() {
    # expects $2 = s@timestamp and $3 e@timestamp
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -o $2 -o $3 ;}
function kcat_read_between_offsets() {
    # expects $2 = offset and $3 = count of messages
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -o $2 -c $3 ;}
function kcat_query_offset_for_timestamp() {
    # expects $1 in topic:partition:timestamp
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -Q -t $1 ;}

Read avro messages value

# Read all avro value messages from beginning
function kcat_read_avro_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -e ;}
# Read n avro value messages from beginning
function kcat_read_avro_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e ;}
# Read n avro value messages from end
function kcat_read_avro_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e ;}
# Read n avro value messages from beginning on specific partition
function kcat_read_avro_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -p ${3} ;}
# Read n avro value messages from end on specific partition
function kcat_read_avro_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -p ${3} ;}

Read text messages value

# Read all text messages from beginning
function kcat_read_txt_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -e ;}
# Read n text value messages from beginning
function kcat_read_txt_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e ;}
# Read n text value messages from end
function kcat_read_txt_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e ;}
# Read n text value messages from beginning on specific partition
function kcat_read_txt_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -p ${3} ;}
# Read n text value messages from end on specific partition
function kcat_read_txt_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -p ${3} ;}

Read avro messages with key=avro and value=avro

# Read all messages from beginning with key and value
function kcat_read_avro_from_begin_with_keys_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro  -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -e -f 'KEY: %k, VALUE: %s, PART: %p\n';}
# Read n messages from beginning with key and value
function kcat_read_avro_from_begin_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_avro_from_end_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_avro_from_begin_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end on specific partition with key and value
function kcat_read_avro_from_end_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}

Read text messages with key=text and value=text

# Read all messages from beginning with key and value
function kcat_read_txt_from_begin_with_keys_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning with key and value
function kcat_read_txt_from_begin_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_txt_from_end_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_txt_from_begin_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end on specific partition with key and value
function kcat_read_txt_from_end_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}

Read messages with key=avro and value=text

# Read all messages from beginning with key and value
function kcat_read_keyavro_valtxt_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from beginning with key and value
function kcat_read_keyavro_valtxt_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from end with key and value
function kcat_read_keyavro_valtxt_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_keyavro_valtxt_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" -p ${3} ;}
# Read n messages from end on specific partition with key and value
function kcat_read_keyavro_valtxt_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" -p ${3} ;}

Read messages with key=txt and value=avro

# Read all messages from beginning with key and value
function kcat_read_keytxt_valavro_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning with key and value
function kcat_read_keytxt_valavro_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_keytxt_valavro_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_keytxt_valavro_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' -p ${3} ;}
# Read n messages from end on specific partition with key and value
function kcat_read_keytxt_valavro_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' -p ${3} ;}
Copy link

First, thank you for sharing this. I have been trying to get Kcat working from a Fargate instance that I have set up as a bastion to consume messages from MSK to ensure that an MSK Connect job is actually working. I would like to set up Kcat in a secure way instead of just allowing unauthenticated clients. I did have the following questions though.

Questions about this example:

  • What type of Client Authentication was set up on the MSK side? Sasl/IAM?
  • Where do the certificates come from?
    • Did you download the CA cert ( from AWS Certificate Manager?
    • The certificates you have listed in your kcat config seem to have different names than the ones you are generating.
  • What is the ssl.key.password password for? Is this a shared secret with the MSK side?

Copy link

@nelisSpotOn hope the below answers helps.

  1. MSK is setup with TLS using a AWS private CA (
  2. you need to create CSR and get it signed by the AWS Private CA so it issues a certificate (follow the above document )
  3. Certnames are just for example, you need the certificate in pem format from step 2 and cacerts.pem can be downloaded here
  4. ssl.key.password is the password for your certificate key which gets generated on step2

Copy link

Is kcat capable of AWS MSK IAM Auth?

Copy link

tuxfight3r commented Dec 4, 2023

I have never tried it myself as all our msk's use private CA, but you can try the config file mentioned here.

Things to keep in mind:
kcat supports only the librdkafka configuration options as docmumented here:
However this has been a wanted feature and someone has created a fork of the librdkafka as discussed below

This means you need to rebuild librdkafka from the fork and rebuild kcat using that librdkafka to get the MSK IAM auth working for kcat.

Copy link

@tuxfight3r Thanks for the pointer, though that won't probably work as kcat is written in C, while AWS provides helper libs only for Java/JS/Python/Go/.NET: ( 😢

Odd is that kcat is cool and I couldn't find any other Kafka CLIs that are this much convenient.
I also couldn't find any 3rd-party CLIs (non-Apache Kafka and non-Java) that support AWS MSK IAM Auth at the moment =( The is the only one, though still WIP (birdayz/kaf#198)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment