-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Sync producer
Due to librdkafka's async nature, it is not trivial to implement a synchronic produce interface in the library that can be used in parallel to the standard async interface.
The best solution at this time is to provide a sync interface in your application, which fortunately is a trivial thing to do.
Note: The code has data race, this is just a sample.
This function produces a message, provides a local stack variable as the message opaque, and waits for the stack variable to change value by calling rd_kafka_poll()
that in turn will call the delivery report callback sync_produce_dr_cb()
.
rd_kafka_resp_err_t sync_produce (rd_kafka_topic_t *rkt, int32_t partition,
void *payload, size_t len,
const void *key, size_t keylen) {
rd_kafka_resp_err_t err = -12345;
if (rd_kafka_produce(rkt, partition, 0, payload, len,
key, keylen, &err) == -1)
return rd_kafka_errno2err(errno);
while (err == -12345)
rd_kafka_poll(rk, 1000);
return err;
}
Simply stores the result of the produce request in the provided errp
pointer, this errp
pointer is pointing to the err
variable in msg_delivered()
.
static void msg_delivered (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->_private) {
rd_kafka_resp_err_t *errp = (rd_kafka_resp_err_t *)rkmessage->_private;
*errp = rkmessage->err;
}
}
Application's initialization code for librdkafka must set up the delivery report callback.
void my_init_code (void) {
char errstr[512];
rd_kafka_conf_t *rk_conf = rd_kafka_conf_new();
/* Set delivery report callback */
rd_kafka_conf_set_dr_msg_cb(rk_conf, msg_delivered);
/* Minimize wait-for-larger-batch delay (since there will be no batching) */
rd_kafka_conf_set(rk_conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr));
/* Minimize wait-for-socket delay (otherwise you will lose 100ms per message instead just the RTT) */
rd_kafka_conf_set(rk_conf, "socket.blocking.max.ms", "1", errstr, sizeof(errstr));
rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));
if (!rk)
ERROR(errstr);
/* create topics, etc.. */
}