From 819f73bec5e5aed4b0d100f4d78de8f763ca6203 Mon Sep 17 00:00:00 2001 From: Yuta Okamoto Date: Sat, 31 May 2014 03:52:34 +0900 Subject: [PATCH] fix the sample code --- README.md | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index fb8046f..83141da 100644 --- a/README.md +++ b/README.md @@ -29,25 +29,27 @@ val bootstrap = Kafka.newRichClient("[host]:[port]") // create a client for the leader of specific topic partition val metadata = bootstrap.metadata("topic") -val client = for { - metadata <- metadata - leader <- Future.value(metadata.head.partitions(0).leader.get) - client <- Future.value(Kafka.newRichClient(s"${leader.host}:${leader.port}")) -} yield client - -// get offset -val offset = client.map(_.offset("topic", 0, -1).get.offsets(0)).get - -// produce a message -client.foreach(_.produce("topic", 0, "foobar")) - -// fetch messages -val msgs = client.flatMap { - _.fetch("topic", 0, offset).map { - _.messages.map(_.message.value.toString("UTF-8")) +val client = metadata.map { + _.head.partitions(0).leader.map { l => + Kafka.newRichClient(s"${l.host}:${l.port}") + }.get +} + +client.foreach { cli => + // get offset + val offset = cli.offset("topic", 0, -1).map(_.offsets(0)) + + // produce a message + val produced = offset.unit before cli.produce("topic", 0, "foobar").unit + + // fetch messages + val msgs = produced before offset.flatMap { offset => + cli.fetch("topic", 0, offset).map { + _.messages.map(_.message.value.toString("UTF-8")) + } } + msgs.foreach(_.foreach(println)) } -msgs.foreach(_.foreach(println)) ``` ## Build