How to pull from a subject? #1079
-
Hi, I'm testing pull based subscriptions and I noticed a very strange behavior. I have a stream called var nc *nats.Conn
nc, err := nats.Connect("nats://127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
_, err = js.AddConsumer("new", &nats.ConsumerConfig{
Durable: "new_consumer",
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
log.Fatalln(err)
}
go func() {
for {
select {
case <-signals:
return
default:
}
sub, err := js.PullSubscribe(
"sub",
"new_consumer",
nats.Bind("new", "new_consumer"),
)
if err != nil {
log.Println(err)
continue
}
messages, err := sub.Fetch(1)
if err != nil {
log.Println(err)
continue
}
msg := messages[0]
log.Printf("consuming message %s\n" msg.Data)
msg.Ack()
time.Sleep(time.Second)
}
}()
//... rest of the code When subscription replaced with var nc *nats.Conn
nc, err := nats.Connect("nats://127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
_, err = js.AddConsumer("new", &nats.ConsumerConfig{
Durable: "new_consumer",
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
log.Fatalln(err)
}
js.Subscribe("sub", func(msg *nats.Msg) {
log.Printf("consuming message %s\n" msg.Data)
msg.Ack()
})
//... rest of the code Side Question
|
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Hi, this is because when you are creating the consumer via |
Beta Was this translation helpful? Give feedback.
Hi, this is because when you are creating the consumer via
AddConsumer
it is being created without a FilterSubject so it would match all the messages, and then laterjs.PullSubscribe("sub", "new_consumer", nats.Bind("new", "new_consumer")
theBind
will reuse the created consumer without having to use thesub
for the lookup to find the consumer so it becomes ignored.If you remove AddConsumer and then do
js.PullSubscribe("sub", "new_consumer")
without Bind, then the consumer would be auto created withsub
as a filter subject to narrow down the messages.