Skip to content

Consumer offset management

Nick edited this page May 31, 2018 · 10 revisions

librdkafka currently supports two consumer offset management methods:

  • broker based offsets (default, requires broker 0.8.2 or later)
  • local file based offsets

Enabling offset management

Offset management is configured through topic configuration properties and enabled by passing start_offset as RD_KAFKA_OFFSET_STORED to rd_kafka_consume_start().

The various rdkafka tools, such as rdkafka_example and kafkacat accepts the -o stored command-line argument.

Configuration properties

  • enable.auto.commit - If true (default), periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will manually have to call rd_kafka_offset_store() to store an offset (optional).
  • auto.commit.interval.ms - The frequency in milliseconds that the consumer offsets are commited (written) to offset storage.
  • enable.auto.offset.store - If true (default) the application will automatically commit the offset of consumed messages to the local offset store (which will automatically be committed to the broker if you have enable.auto.commit set to true).

Local file based offsets

Offsets are written to a local file, defaulting to {offset.store.path}/topicname-partition*.offset.

Topic configuration properties

  • offset.store.method - Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).
  • offset.store.path - Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. Defaults to the current directory.
  • offset.store.sync.interval.ms - fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write.

Broker based offsets

The OffsetCommit API was added to Apache Kafka 0.8.2 and thus require you to run a broker of that version or later. This is the preferred method.

With this method offsets are written to the Kafka cluster through the Kafka protocol. This is not to be confused with Zookeeper based offsets which the official 0.8 Scala Kafka clients use, but the new 0.9 Java client uses broker based offset storage exclusively.

The consumer group id must be configured using the group.id configuration property. No additional configuration is required for this method.