From 42660b0a60cdc982760dbe7a0f885e2080f0c488 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Sat, 11 Jan 2025 20:27:03 +0100 Subject: [PATCH 1/2] [IMPROVED] Documentation of jetstream options and Fetch clarification Signed-off-by: Piotr Piotrowski --- jetstream/README.md | 71 +++++++++++++++++----------------- jetstream/consumer.go | 12 ++++-- jetstream/jetstream_options.go | 27 +++++++++++-- jetstream/pull.go | 2 +- micro/README.md | 2 +- 5 files changed, 70 insertions(+), 44 deletions(-) diff --git a/jetstream/README.md b/jetstream/README.md index ed2162d51..28926a842 100644 --- a/jetstream/README.md +++ b/jetstream/README.md @@ -3,38 +3,37 @@ This doc covers the basic usage of the `jetstream` package in `nats.go` client. -- [JetStream Simplified Client](#jetstream-simplified-client) - - [Overview](#overview) - - [Basic usage](#basic-usage) - - [Streams](#streams) - - [Stream management (CRUD)](#stream-management-crud) - - [Listing streams and stream names](#listing-streams-and-stream-names) - - [Stream-specific operations](#stream-specific-operations) - - [Consumers](#consumers) - - [Consumers management](#consumers-management) - - [Listing consumers and consumer - names](#listing-consumers-and-consumer-names) - - [Ordered consumers](#ordered-consumers) - - [Receiving messages from the - consumer](#receiving-messages-from-the-consumer) - - [Single fetch](#single-fetch) - - [Continuous polling](#continuous-polling) - - [Using `Consume()` receive messages in a - callback](#using-consume-receive-messages-in-a-callback) - - [Using `Messages()` to iterate over incoming - messages](#using-messages-to-iterate-over-incoming-messages) - - [Publishing on stream](#publishing-on-stream) - - [Synchronous publish](#synchronous-publish) - - [Async publish](#async-publish) - - [KeyValue Store](#keyvalue-store) - - [Basic usage of KV bucket](#basic-usage-of-kv-bucket) - - [Watching for changes on a bucket](#watching-for-changes-on-a-bucket) - - [Additional operations on a bucket](#additional-operations-on-a-bucket) - - [Object Store](#object-store) - - [Basic usage of Object Store](#basic-usage-of-object-store) - - [Watching for changes on a store](#watching-for-changes-on-a-store) - - [Additional operations on a store](#additional-operations-on-a-store) - - [Examples](#examples) +- [Overview](#overview) +- [Basic usage](#basic-usage) +- [Streams](#streams) +- [Stream management (CRUD)](#stream-management-crud) +- [Listing streams and stream names](#listing-streams-and-stream-names) +- [Stream-specific operations](#stream-specific-operations) +- [Consumers](#consumers) +- [Consumers management](#consumers-management) +- [Listing consumers and consumer + names](#listing-consumers-and-consumer-names) +- [Ordered consumers](#ordered-consumers) +- [Receiving messages from the + consumer](#receiving-messages-from-the-consumer) + - [Single fetch](#single-fetch) + - [Continuous polling](#continuous-polling) + - [Using `Consume()` receive messages in a + callback](#using-consume-receive-messages-in-a-callback) + - [Using `Messages()` to iterate over incoming + messages](#using-messages-to-iterate-over-incoming-messages) +- [Publishing on stream](#publishing-on-stream) +- [Synchronous publish](#synchronous-publish) +- [Async publish](#async-publish) +- [KeyValue Store](#keyvalue-store) +- [Basic usage of KV bucket](#basic-usage-of-kv-bucket) +- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket) +- [Additional operations on a bucket](#additional-operations-on-a-bucket) +- [Object Store](#object-store) +- [Basic usage of Object Store](#basic-usage-of-object-store) +- [Watching for changes on a store](#watching-for-changes-on-a-store) +- [Additional operations on a store](#additional-operations-on-a-store) +- [Examples](#examples) ## Overview @@ -118,15 +117,15 @@ func main() { if err != nil { // handle error } - + for msg := range msgs.Messages() { msg.Ack() fmt.Printf("Received a JetStream message via fetch: %s\n", string(msg.Data())) messageCounter++ } - + fmt.Printf("received %d messages\n", messageCounter) - + if msgs.Error() != nil { fmt.Println("Error during Fetch(): ", msgs.Error()) } @@ -400,7 +399,7 @@ of messages/bytes. By default, `Fetch()` will wait 30 seconds before timing out // receive up to 10 messages from the stream msgs, err := c.Fetch(10) if err != nil { - // handle error + // handle error } for msg := range msgs.Messages() { diff --git a/jetstream/consumer.go b/jetstream/consumer.go index ee48a1ec3..2a36c3ef6 100644 --- a/jetstream/consumer.go +++ b/jetstream/consumer.go @@ -63,7 +63,9 @@ type ( // for delivered messages. // // Messages channel is always closed, thus it is safe to range over it - // without additional checks. + // without additional checks. After the channel is closed, + // MessageBatch.Error() should be checked to see if there was an error + // during message delivery (e.g. missing heartbeat). Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) // FetchBytes is used to retrieve up to a provided bytes from the @@ -82,7 +84,9 @@ type ( // for delivered messages. // // Messages channel is always closed, thus it is safe to range over it - // without additional checks. + // without additional checks. After the channel is closed, + // MessageBatch.Error() should be checked to see if there was an error + // during message delivery (e.g. missing heartbeat). FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) // FetchNoWait is used to retrieve up to a provided number of messages @@ -94,7 +98,9 @@ type ( // channel for delivered messages. // // Messages channel is always closed, thus it is safe to range over it - // without additional checks. + // without additional checks. After the channel is closed, + // MessageBatch.Error() should be checked to see if there was an error + // during message delivery (e.g. missing heartbeat). FetchNoWait(batch int) (MessageBatch, error) // Consume will continuously receive messages and handle them diff --git a/jetstream/jetstream_options.go b/jetstream/jetstream_options.go index a08d203fb..57aaadb43 100644 --- a/jetstream/jetstream_options.go +++ b/jetstream/jetstream_options.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 The NATS Authors +// Copyright 2022-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -104,6 +104,9 @@ func WithGetMsgSubject(subject string) GetMsgOpt { // PullMaxMessages limits the number of messages to be buffered in the client. // If not provided, a default of 500 messages will be used. // This option is exclusive with PullMaxBytes. +// +// PullMaxMessages implements PullConsumeOpt and PullMessagesOpt and thus can +// be used to configure both Consumer.Consume and Consumer.Messages. type PullMaxMessages int func (max PullMaxMessages) configureConsume(opts *consumeOpts) error { @@ -125,6 +128,9 @@ func (max PullMaxMessages) configureMessages(opts *consumeOpts) error { // PullExpiry sets timeout on a single pull request, waiting until at least one // message is available. // If not provided, a default of 30 seconds will be used. +// +// PullExpiry implements PullConsumeOpt and PullMessagesOpt and thus can +// be used to configure both Consumer.Consume and Consumer.Messages. type PullExpiry time.Duration func (exp PullExpiry) configureConsume(opts *consumeOpts) error { @@ -148,6 +154,9 @@ func (exp PullExpiry) configureMessages(opts *consumeOpts) error { // PullMaxBytes limits the number of bytes to be buffered in the client. // If not provided, the limit is not set (max messages will be used instead). // This option is exclusive with PullMaxMessages. +// +// PullMaxBytes implements PullConsumeOpt and PullMessagesOpt and thus can +// be used to configure both Consumer.Consume and Consumer.Messages. type PullMaxBytes int func (max PullMaxBytes) configureConsume(opts *consumeOpts) error { @@ -166,8 +175,11 @@ func (max PullMaxBytes) configureMessages(opts *consumeOpts) error { return nil } -// PullThresholdMessages sets the message count on which Consume will trigger +// PullThresholdMessages sets the message count on which consuming will trigger // new pull request to the server. Defaults to 50% of MaxMessages. +// +// PullThresholdMessages implements PullConsumeOpt and PullMessagesOpt and thus +// can be used to configure both Consumer.Consume and Consumer.Messages. type PullThresholdMessages int func (t PullThresholdMessages) configureConsume(opts *consumeOpts) error { @@ -180,8 +192,11 @@ func (t PullThresholdMessages) configureMessages(opts *consumeOpts) error { return nil } -// PullThresholdBytes sets the byte count on which Consume will trigger +// PullThresholdBytes sets the byte count on which consuming will trigger // new pull request to the server. Defaults to 50% of MaxBytes (if set). +// +// PullThresholdBytes implements PullConsumeOpt and PullMessagesOpt and thus +// can be used to configure both Consumer.Consume and Consumer.Messages. type PullThresholdBytes int func (t PullThresholdBytes) configureConsume(opts *consumeOpts) error { @@ -199,6 +214,9 @@ func (t PullThresholdBytes) configureMessages(opts *consumeOpts) error { // than the idle heartbeat setting, the subscription will be removed // and error will be passed to the message handler. // If not provided, a default PullExpiry / 2 will be used (capped at 30 seconds) +// +// PullHeartbeat implements PullConsumeOpt and PullMessagesOpt and thus can +// be used to configure both Consumer.Consume and Consumer.Messages. type PullHeartbeat time.Duration func (hb PullHeartbeat) configureConsume(opts *consumeOpts) error { @@ -221,6 +239,9 @@ func (hb PullHeartbeat) configureMessages(opts *consumeOpts) error { // StopAfter sets the number of messages after which the consumer is // automatically stopped and no more messages are pulled from the server. +// +// StopAfter implements PullConsumeOpt and PullMessagesOpt and thus can +// be used to configure both Consumer.Consume and Consumer.Messages. type StopAfter int func (nMsgs StopAfter) configureConsume(opts *consumeOpts) error { diff --git a/jetstream/pull.go b/jetstream/pull.go index 764bf2a1d..a97d75e7a 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 The NATS Authors +// Copyright 2022-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/micro/README.md b/micro/README.md index 99949eb8b..3ca09a562 100644 --- a/micro/README.md +++ b/micro/README.md @@ -1,4 +1,4 @@ -# NATS micro +# NATS micro [![GoDoc](https://pkg.go.dev/badge/github.com/nats-io/nats.go/micro.svg)](https://pkg.go.dev/github.com/nats-io/nats.go/micro) - [Overview](#overview) - [Basic usage](#basic-usage) From aa516e05dc254169082aecf5a427f5996ec908ca Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 14 Jan 2025 17:49:31 +0100 Subject: [PATCH 2/2] Rephrase jetstream options docs Signed-off-by: Piotr Piotrowski --- jetstream/jetstream_options.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/jetstream/jetstream_options.go b/jetstream/jetstream_options.go index 57aaadb43..60878d87e 100644 --- a/jetstream/jetstream_options.go +++ b/jetstream/jetstream_options.go @@ -105,8 +105,8 @@ func WithGetMsgSubject(subject string) GetMsgOpt { // If not provided, a default of 500 messages will be used. // This option is exclusive with PullMaxBytes. // -// PullMaxMessages implements PullConsumeOpt and PullMessagesOpt and thus can -// be used to configure both Consumer.Consume and Consumer.Messages. +// PullMaxMessages implements both PullConsumeOpt and PullMessagesOpt, allowing +// it to configure Consumer.Consume and Consumer.Messages. type PullMaxMessages int func (max PullMaxMessages) configureConsume(opts *consumeOpts) error { @@ -129,8 +129,8 @@ func (max PullMaxMessages) configureMessages(opts *consumeOpts) error { // message is available. // If not provided, a default of 30 seconds will be used. // -// PullExpiry implements PullConsumeOpt and PullMessagesOpt and thus can -// be used to configure both Consumer.Consume and Consumer.Messages. +// PullExpiry implements both PullConsumeOpt and PullMessagesOpt, allowing +// it to configure Consumer.Consume and Consumer.Messages. type PullExpiry time.Duration func (exp PullExpiry) configureConsume(opts *consumeOpts) error { @@ -155,8 +155,8 @@ func (exp PullExpiry) configureMessages(opts *consumeOpts) error { // If not provided, the limit is not set (max messages will be used instead). // This option is exclusive with PullMaxMessages. // -// PullMaxBytes implements PullConsumeOpt and PullMessagesOpt and thus can -// be used to configure both Consumer.Consume and Consumer.Messages. +// PullMaxBytes implements both PullConsumeOpt and PullMessagesOpt, allowing +// it to configure Consumer.Consume and Consumer.Messages. type PullMaxBytes int func (max PullMaxBytes) configureConsume(opts *consumeOpts) error { @@ -178,8 +178,8 @@ func (max PullMaxBytes) configureMessages(opts *consumeOpts) error { // PullThresholdMessages sets the message count on which consuming will trigger // new pull request to the server. Defaults to 50% of MaxMessages. // -// PullThresholdMessages implements PullConsumeOpt and PullMessagesOpt and thus -// can be used to configure both Consumer.Consume and Consumer.Messages. +// PullThresholdMessages implements both PullConsumeOpt and PullMessagesOpt, +// allowing it to configure Consumer.Consume and Consumer.Messages. type PullThresholdMessages int func (t PullThresholdMessages) configureConsume(opts *consumeOpts) error { @@ -195,8 +195,8 @@ func (t PullThresholdMessages) configureMessages(opts *consumeOpts) error { // PullThresholdBytes sets the byte count on which consuming will trigger // new pull request to the server. Defaults to 50% of MaxBytes (if set). // -// PullThresholdBytes implements PullConsumeOpt and PullMessagesOpt and thus -// can be used to configure both Consumer.Consume and Consumer.Messages. +// PullThresholdBytes implements both PullConsumeOpt and PullMessagesOpt, +// allowing it to configure Consumer.Consume and Consumer.Messages. type PullThresholdBytes int func (t PullThresholdBytes) configureConsume(opts *consumeOpts) error { @@ -215,8 +215,8 @@ func (t PullThresholdBytes) configureMessages(opts *consumeOpts) error { // and error will be passed to the message handler. // If not provided, a default PullExpiry / 2 will be used (capped at 30 seconds) // -// PullHeartbeat implements PullConsumeOpt and PullMessagesOpt and thus can -// be used to configure both Consumer.Consume and Consumer.Messages. +// PullHeartbeat implements both PullConsumeOpt and PullMessagesOpt, allowing +// it to configure Consumer.Consume and Consumer.Messages. type PullHeartbeat time.Duration func (hb PullHeartbeat) configureConsume(opts *consumeOpts) error { @@ -240,8 +240,8 @@ func (hb PullHeartbeat) configureMessages(opts *consumeOpts) error { // StopAfter sets the number of messages after which the consumer is // automatically stopped and no more messages are pulled from the server. // -// StopAfter implements PullConsumeOpt and PullMessagesOpt and thus can -// be used to configure both Consumer.Consume and Consumer.Messages. +// StopAfter implements both PullConsumeOpt and PullMessagesOpt, allowing +// it to configure Consumer.Consume and Consumer.Messages. type StopAfter int func (nMsgs StopAfter) configureConsume(opts *consumeOpts) error {