Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator Discovery failing when Consumer specifies Group ID #952

Open
jceddy opened this issue Sep 21, 2023 · 0 comments
Open

Coordinator Discovery failing when Consumer specifies Group ID #952

jceddy opened this issue Sep 21, 2023 · 0 comments

Comments

@jceddy
Copy link

jceddy commented Sep 21, 2023

I have a KafkaConsumer that is stuck in a "joining group" loop.

It's a simple Kafka Python consumer, here is the python script:

from kafka import KafkaConsumer
import logging
import sys

root = logging.getLogger('kafka')
root.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
 root.addHandler(handler)

bootstrap_servers_sasl = ['node1.dev.company.local:9092', 'node2.dev.company.local:9092', 'node3.dev.company.local:9092']
topicName = 'test_sasl'

consumer = KafkaConsumer(
    topicName,
    bootstrap_servers = bootstrap_servers_sasl,
    security_protocol = 'SASL_PLAINTEXT',
    sasl_mechanism = 'SCRAM-SHA-512',
    sasl_plain_username = 'test_user',
    sasl_plain_password = 't3st_us3r',
    group_id = 'test_group'
)

try:
    for message in consumer:
        if message:
            print(f"Received message: {message.value.decode('utf-8')}")
except Exception as e:
    print(f"An exception occurred: {e}")
finally:
    consumer.close()

When I include group_id when creating the KafkaConsumer, I will see this in the log over and over again forever, and the consumer will never actually see an item published to the topic it is supposed to be monitoring.

2023-09-13 08:35:44,102 - kafka.cluster - INFO - Group coordinator for test_group is BrokerMetadata(nodeId='coordinator-0', host='node1.dev.company.local', port=9092, rack=None)
2023-09-13 08:35:44,102 - kafka.coordinator - INFO - Discovered coordinator coordinator-0 for group test_group
2023-09-13 08:35:44,102 - kafka.coordinator - INFO - (Re-)joining group test_group
2023-09-13 08:35:44,104 - kafka.coordinator - WARNING - Marking the coordinator dead (node coordinator-0) for group test_group: [Error 16] NotCoordinatorForGroupError.

If I don't include group_id everything works fine.

The Kafka brokers are Confluent Kafka, version 7.4.1-ccs.

The controller.log on the Kafka server side contains this:

[2023-09-13 08:56:03,345] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController)

I'm not sure whether that is indicative of a problem, or just general information.

Here is the server.properties file (from node1, there are three nodes in total):

broker.id=0

listeners=LISTENER_ONE://:9092,LISTENER_TWO://:9096

inter.broker.listener.name=LISTENER_ONE
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=true
security.protocol=SASL_PLAINTEXT

advertised.listeners=LISTENER_ONE://node1.dev.company.local:9092,LISTENER_TWO://node1.dev.company.local:9096

listener.security.protocol.map=LISTENER_ONE:SASL_PLAINTEXT,LISTENER_TWO:PLAINTEXT

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/var/lib/kafka

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

log.retention.hours=24

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=node1.dev.company.local:2181,node2.dev.company.local:2181,node3.dev.company.local:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=3

auto.create.topics.enable=false
inter.broker.protocol.version=3.4-IV0

I also ran kafka-console-consumer from one of the broker machines to test:

kafka-console-consumer --bootstrap-server node1.dev.company.local:9092,node2.dev.company.local:9092,node3.dev.company.local:9092 --group test_group --topic test_sasl --consumer.config consumer.config

With the following in consumer.config:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="test_user" \
        password="t3st_us3r";

I don't get any errors, but also no indication that it is processing any of the test messages I produce (although when I run it without --group, I also don't get any indication that it is processing messages...this is different than what I am seeing with the Kafka Python client).

I ran the following command:

kafka-topics --bootstrap-server node1.dev.company.local:9092,node2.dev.company.local:9092,node3.dev.company.local:9092 --describe --topic test_sasl --command-config admin-plaintext.config

Where admin-plaintext.config contains this:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="admin-username" \
        password="admin-password";

The response is:

Topic: test_sasl        TopicId: Y3hiju-ZQsOvdRgLk4vpZw PartitionCount: 1       ReplicationFactor: 2    Configs: cleanup.policy=delete,segment.bytes=1073741824,retention.ms=86400000,unclean.leader.election.enable=true
        Topic: test_sasl        Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2

I can provide any other information, but am not sure what would be the most useful information to include.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant