Skip to content

Sync producer

Magnus Edenhill edited this page Apr 17, 2014 · 6 revisions

Due to librdkafka's async nature it is not trivial to implement a synchronic produce interface in the library that can be used in parallel to the standard async interface.

The best solution at this time is to provide a sync interface in your application, which fortunately is a trivial thing to do.

Sync produce wrapper

This function produces a message, provides a local stack variable as the message opaque, and waits for the stack variable to change value by calling rd_kafka_poll() that in turn will call the delivery report callback sync_produce_dr_cb().

rd_kafka_resp_err_t sync_produce (rd_kafka_topic_t *rkt, int32_t partition,
                                  void *payload, size_t len,
                                  const void *key, size_t keylen) {
        rd_kafka_resp_err_t err = -12345;

        if (rd_kafka_produce(rkt, partition, 0, payload, len,
                             key, keylen, &err) == -1)
                return rd_kafka_errno2err(errno);

        while (err == -12345)
                rd_kafka_poll(rk, 1000);

        return err;
}

Delivery report callback

Simply stores the result of the produce request in the provided errp pointer, this errp pointer is pointing to the err variable in sync_produce().

static void sync_produce_dr_cb (rd_kafka_t *rk,
                                void *payload, size_t len,
                                rd_kafka_resp_err_t err,
                                void *opaque, void *msg_opaque) {
        rd_kafka_resp_err_t *errp = (rd_kafka_resp_err_t *)msg_opaque;
        *errp = err;
}

Initialization code

Application's initialization code for librdkafka must set up the delivery report callback.

void my_init_code (void) {
        char errstr[512];
        rd_kafka_conf_t *rk_conf = rd_kafka_conf_new();

        rd_kafka_conf_set_dr_cb(rk_conf, sync_produce_dr_cb);

        rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));
        if (!rk)
                ERROR(errstr);

        /* create topics, etc.. */
}