-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Consumer offset management
librdkafka currently supports two consumer offset management methods:
- local file based offsets (default)
- broker based offsets
Offset management is configured through topic configuration properties and enabled by passing start_offset
as RD_KAFKA_OFFSET_STORED
to rd_kafka_consume_start()
.
The various rdkafka tools, such as rdkafka_example
and kafkacat accepts the -o stored
command-line argument.
-
auto.commit.enable
- If true (default), periodically commit offset of the last message handed to the application. This commited offset will be used when the process restarts to pick up where it left off. If false, the application will manually have to callrd_kafka_offset_store()
to store an offset (optional). -
auto.commit.interval.ms
- The frequency in milliseconds that the consumer offsets are commited (written) to offset storage. -
offset.store.method
- Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.1 or later on the broker).
Offsets are written to a local file, defaulting to "./topicname-partition.offset".
-
offset.store.path
- Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. Defaults to the current directory. -
offset.store.sync.interval.ms
- fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write.
The OffsetCommit API was added to Apache Kafka 0.8.1 and thus require you to run a broker of that version or later. This is the preferred method.
With this method offsets are written to the Kafka cluster through the Kafka protocol. This is not to be confused with Zookeeper based offsets which the official scala Kafka clients use, but word on the street is that these clients will move over to broker based offsets.
No additional configuration is required for this method.