Generally speaking, The Modern C++ based Kafka API
is quite similar to the Kafka Java's API.
We'd recommend users to cross-reference them, --especially the examples.
Unlike Java's KafkaProducer, here we introduce two derived classes, -- KafkaSyncProducer
and KafkaAsyncProducer
--depending on different send
behaviors (synchronous/asynchronous).
- The "Sync" (in the name) means
send
is a blocking operation, and it will immediately get the RecordMetadata while the function returns. If anything wrong occurs, an exception would be thrown.
// Create configuration object
kafka::Properties props({
{"bootstrap.servers", brokers},
{"enable.idempotence", "true"},
});
// Create a producer instance.
kafka::KafkaSyncProducer producer(props);
// Read messages from stdin and produce to the broker.
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
for (std::string line; std::getline(std::cin, line);) {
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
auto record = kafka::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line.c_str(), line.size()));
// Send the message.
try {
kafka::Producer::RecordMetadata metadata = producer.send(record);
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
} catch (const kafka::KafkaException& e) {
std::cerr << "% Message delivery failed: " << e.error().message() << std::endl;
}
if (line.empty()) break;
};
// producer.close(); // No explicit close is needed, RAII will take care of it
-
ProducerConfig::BOOTSTRAP_SERVERS
is mandatory for ProducerConfig. -
ProducerRecord
would not take any ownership for thekey
orvalue
. Thus, the user must guarantee the memory block (pointed bykey
orvalue
) is valid until beingsend
. -
Since
send
is a blocking operation, the throughput will be highly impacted, but it is easier to make sure of the message delivery and logically it is simpler. -
At the end, the user could call
close
manually, or just leave it to the destructor (close
would be called anyway).
- The
Async
(in the name) meanssend
is an unblocking operation, and the result (including errors) could only be got from the delivery callback.
// Create configuration object
kafka::Properties props ({
{"bootstrap.servers", brokers},
{"enable.idempotence", "true"},
});
// Create a producer instance.
kafka::KafkaAsyncProducer producer(props);
// Read messages from stdin and produce to the broker.
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
for (std::string line; std::getline(std::cin, line);) {
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
auto record = kafka::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line.c_str(), line.size()));
// Send the message.
producer.send(record,
// The delivery report handler
[](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
if (!ec) {
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
} else {
std::cerr << "% Message delivery failed: " << ec.message() << std::endl;
}
},
// The memory block given by record.value() would be copied
kafka::KafkaProducer::SendOption::ToCopyRecordValue);
if (line.empty()) break;
}
-
Same with KafkaSyncProducer, the user must guarantee the memory block for
ProducerRecord
'skey
is valid until beingsend
. -
By default, the memory block for
ProducerRecord
'svalue
must be valid until the delivery callback is called; Otherwise, thesend
should be with optionKafkaProducer::SendOption::ToCopyRecordValue
. -
It's guaranteed that the delivery callback would be triggered anyway after
send
, -- a producer would even be waiting for it beforeclose
. So, it's a good way to release these memory resources in theProducer::Callback
function.
While we construct a KafkaAsyncProducer
with option KafkaClient::EventsPollingOption::Auto
(default), an internal thread would be created for MessageDelivery
callbacks handling.
This might not be what you want, since then you have to use 2 different threads to send the messages and handle the MessageDelivery
responses.
Here we have another choice, -- using KafkaClient::EventsPollingOption::Manual
, thus the MessageDelivery
callbacks would be called within member function pollEvents()
.
- Note, if you constructed the
KafkaAsyncProducer
withEventsPollingOption::Manual
, thesend()
would be anunblocked
operation. I.e, once themessage buffering queue
becomes full, thesend()
operation would throw an exception (or return anerror code
with the input reference parameter), -- instead of blocking there. This makes sense, since you might want to callpollEvents()
later, thus delivery-callback could be called for some messages (which could then be removed from themessage buffering queue
).
kafak::KafkaAsyncProducer producer(props, KafkaClient::EventsPollingOption::Manual);
// Prepare "msgsToBeSent"
auto std::map<int, std::pair<Key, Value>> msgsToBeSent = ...;
for (const auto& msg : msgsToBeSent) {
auto record = kafak::ProducerRecord(topic, partition, msg.second.first, msg.second.second, msg.first);
std::error_code ec;
producer.send(ec,
record,
// Ack callback
[&msg](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
// the message could be identified by `metadata.recordId()`
if (ec) {
LOG_ERROR("Cannot send out message with recordId={0}", metadata.recordId());
} else {
msgsToBeSend.erase(metadata.recordId()); // Quite safe here
}
});
if (ec) break;
}
// Here we call the `MessageDelivery` callbacks
// Note, we can only do this while the producer was constructed with `EventsPollingOption::MANUAL`.
producer.pollEvents();
-
A
ProducerRecord
could take extra information withheaders
.- Note, the
header
withinheaders
contains the pointer of the memory block for itsvalue
. The memory block MUST be valid until theProducerRecord
is read byproducer.send()
.
- Note, the
kafak::KafkaAsyncProducer producer(props);
auto record = kafka::ProducerRecord(topic, partition, Key(), Value());
for (const auto& msg : msgsToBeSent) {
// Prepare record headers
std::string session = msg.session;
std::uint32_t seqno = msg.seqno;
record.headers() = {
{ "session", { session.c_str(), session.size()} },
{ "seqno", { &seqno, sizeof(seqno)} }
};
record.setKey(msg.key);
record.setValue(msg.value);
producer.send(record,
// Ack callback
[&msg](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
if (ec) {
LOG_ERROR("Cannot send out message: {0}, err: {1}", metadata.toString(), ec);
}
});
}
Once an error occurs during send()
, KafkaSyncProducer
and KafkaAsyncProducer
behave differently.
-
KafkaSyncProducer
getsstd::error_code
by catching exceptions (witherror()
member function). -
KafkaAsyncProducer
getsstd::error_code
with delivery-callback (with a parameter of the callback function).
There are 2 kinds of possible errors,
-
Local errors,
-
RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC -- The topic doesn't exist
-
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION -- The partition doesn't exist
-
RD_KAFKA_RESP_ERR__INVALID_ARG -- Invalid topic (topic is null or the length is too long (>512))
-
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT -- No ack received within the time limit
-
-
Broker errors,
If the cluster's configuration is with auto.create.topics.enable=true
, the producer/consumer could trigger the brokers to create a new topic (with send
, subscribe
, etc)
Note, the default created topic may be not what you want (e.g, with default.replication.factor=1
configuration as default, etc), thus causing other unexpected problems.
If an ack failed to be received within MESSAGE_TIMEOUT_MS
, an exception would be thrown for a KafkaSyncSend, or, an error code would be received by the delivery callback for a KafkaAsyncProducer.
Enlarging the default BATCH_NUM_MESSAGES
and LINGER_MS
might improve message batching, thus enhancing the throughput.
While, on the other hand, LINGER_MS
would highly impact the latency.
The QUEUE_BUFFERING_MAX_MESSAGES
and QUEUE_BUFFERING_MAX_KBYTES
would determine the max in flight requests (some materials about Kafka would call it in this way)
. If the queue buffer is full, the send
operation would be blocked.
Larger QUEUE_BUFFERING_MAX_MESSAGES
/QUEUE_BUFFERING_MAX_KBYTES
might help to improve throughput as well, while also means more messages locally buffering.
-
Quick Answer,
-
The Kafka cluster should be configured with
min.insync.replicas = 2
at least -
Use a
KafkaSyncProducer
(with configuration{ProducerConfig::ACKS, "all"}
); or use aKafkaAsyncProducer
(with configuration{ProducerConfig::ENABLE_IDEMPOTENCE, "true"}
), together with proper error-handling within the delivery callbacks.
-
-
Complete Answer,
Excluding the user's main thread, KafkaSyncProducer would start another (N + 2) threads in the background, while KafkaAsyncProducer
would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
Most of these background threads are started internally by librdkafka.
Here is a brief introduction what they're used for,
-
Each broker (in the list of BOOTSTRAP_SERVERS) would take a separate thread to transmit messages towards a kafka cluster server.
-
Another 2 background threads would handle internal operations and kinds of timers, etc.
-
KafkaAsyncProducer
has one more background thread to keep polling the delivery callback event.
E.g, if a KafkaSyncProducer was created with property of BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890
, it would take 6 threads in total (including the main thread).
The Producer::Callback
is only available for a KafkaAsyncProducer
.
It will be handled by a background thread, not by the user's thread.
Note, should be careful if both the KafkaAsyncProducer::send()
and the Producer::Callback
might access the same container at the same time.