Skip to content

Commit

Permalink
fix the sample code
Browse files Browse the repository at this point in the history
  • Loading branch information
okapies committed May 30, 2014
1 parent 9487db1 commit 819f73b
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,27 @@ val bootstrap = Kafka.newRichClient("[host]:[port]")
// create a client for the leader of specific topic partition
val metadata = bootstrap.metadata("topic")
val client = for {
metadata <- metadata
leader <- Future.value(metadata.head.partitions(0).leader.get)
client <- Future.value(Kafka.newRichClient(s"${leader.host}:${leader.port}"))
} yield client
// get offset
val offset = client.map(_.offset("topic", 0, -1).get.offsets(0)).get
// produce a message
client.foreach(_.produce("topic", 0, "foobar"))
// fetch messages
val msgs = client.flatMap {
_.fetch("topic", 0, offset).map {
_.messages.map(_.message.value.toString("UTF-8"))
val client = metadata.map {
_.head.partitions(0).leader.map { l =>
Kafka.newRichClient(s"${l.host}:${l.port}")
}.get
}
client.foreach { cli =>
// get offset
val offset = cli.offset("topic", 0, -1).map(_.offsets(0))
// produce a message
val produced = offset.unit before cli.produce("topic", 0, "foobar").unit
// fetch messages
val msgs = produced before offset.flatMap { offset =>
cli.fetch("topic", 0, offset).map {
_.messages.map(_.message.value.toString("UTF-8"))
}
}
msgs.foreach(_.foreach(println))
}
msgs.foreach(_.foreach(println))
```

## Build
Expand Down

0 comments on commit 819f73b

Please sign in to comment.