-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Manually setting the consumer start offset
The high-level Kafka consumer (KafkaConsumer in C++) will start consuming at the last committed offset by default, if there is no previously committed offset for the topic+partition and group it will fall back on the topic configuration property auto.offset.reset
which defaults to latest
, thus starting to consume at the end of the partition (only new messages will be consumed).
To manually set the starting offset for a partition the assign()
API allows you to specify the start offset for each partition by setting the .offset
field in the topic_partition_t
element to either an absolute offset (>=0) or one of the logical offsets (BEGINNING, END, STORED, TAIL(..)).
See following examples showing how to start consuming topic "mytopic" partition 3 at offset 1234.
From a rebalance_cb
:
void my_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions, void *opaque) {
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
rd_kafka_topic_partition_t *part;
if ((part = rd_kafka_topic_partition_list_find(partitions, "mytopic", 3)))
part->offset = 1234;
rd_kafka_assign(rk, partitions);
} else {
rd_kafka_assign(rk, NULL);
}
}
Direct assign()
with no subscription:
rd_kafka_topic_partition_list_t *partitions;
partitions = rd_kafka_topic_partition_list_new(0);
rd_kafka_topic_partition_list_add(partitions, "mytopic", 3)->offset = 1234;
rd_kafka_assign(rk, partitions);
rd_kafka_topic_partition_list_destroy(partitions);
From a rebalance_cb
:
class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
RdKafka::TopicPartition *part;
// find the partition, through std::find() or other means
...
if (part)
part->set_offset(1234);
consumer->assign(partitions);
} else {
consumer->unassign();
}
}
};
Direct assign()
with no subscription:
std::vector<RdKafka::TopicPartition*> partitions;
partitions.push_back(RdKafka::TopicPartition::create("mytopic", 3, 1234));
consumer->assign(partitions);