diff --git a/README.md b/README.md index fb8046f..83141da 100644 --- a/README.md +++ b/README.md @@ -29,25 +29,27 @@ val bootstrap = Kafka.newRichClient("[host]:[port]") // create a client for the leader of specific topic partition val metadata = bootstrap.metadata("topic") -val client = for { - metadata <- metadata - leader <- Future.value(metadata.head.partitions(0).leader.get) - client <- Future.value(Kafka.newRichClient(s"${leader.host}:${leader.port}")) -} yield client - -// get offset -val offset = client.map(_.offset("topic", 0, -1).get.offsets(0)).get - -// produce a message -client.foreach(_.produce("topic", 0, "foobar")) - -// fetch messages -val msgs = client.flatMap { - _.fetch("topic", 0, offset).map { - _.messages.map(_.message.value.toString("UTF-8")) +val client = metadata.map { + _.head.partitions(0).leader.map { l => + Kafka.newRichClient(s"${l.host}:${l.port}") + }.get +} + +client.foreach { cli => + // get offset + val offset = cli.offset("topic", 0, -1).map(_.offsets(0)) + + // produce a message + val produced = offset.unit before cli.produce("topic", 0, "foobar").unit + + // fetch messages + val msgs = produced before offset.flatMap { offset => + cli.fetch("topic", 0, offset).map { + _.messages.map(_.message.value.toString("UTF-8")) + } } + msgs.foreach(_.foreach(println)) } -msgs.foreach(_.foreach(println)) ``` ## Build