Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Documentation of jetstream options and Fetch clarification #1770

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 35 additions & 36 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,37 @@

This doc covers the basic usage of the `jetstream` package in `nats.go` client.

- [JetStream Simplified Client](#jetstream-simplified-client)
- [Overview](#overview)
- [Basic usage](#basic-usage)
- [Streams](#streams)
- [Stream management (CRUD)](#stream-management-crud)
- [Listing streams and stream names](#listing-streams-and-stream-names)
- [Stream-specific operations](#stream-specific-operations)
- [Consumers](#consumers)
- [Consumers management](#consumers-management)
- [Listing consumers and consumer
names](#listing-consumers-and-consumer-names)
- [Ordered consumers](#ordered-consumers)
- [Receiving messages from the
consumer](#receiving-messages-from-the-consumer)
- [Single fetch](#single-fetch)
- [Continuous polling](#continuous-polling)
- [Using `Consume()` receive messages in a
callback](#using-consume-receive-messages-in-a-callback)
- [Using `Messages()` to iterate over incoming
messages](#using-messages-to-iterate-over-incoming-messages)
- [Publishing on stream](#publishing-on-stream)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [KeyValue Store](#keyvalue-store)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Object Store](#object-store)
- [Basic usage of Object Store](#basic-usage-of-object-store)
- [Watching for changes on a store](#watching-for-changes-on-a-store)
- [Additional operations on a store](#additional-operations-on-a-store)
- [Examples](#examples)
- [Overview](#overview)
- [Basic usage](#basic-usage)
- [Streams](#streams)
- [Stream management (CRUD)](#stream-management-crud)
- [Listing streams and stream names](#listing-streams-and-stream-names)
- [Stream-specific operations](#stream-specific-operations)
- [Consumers](#consumers)
- [Consumers management](#consumers-management)
- [Listing consumers and consumer
names](#listing-consumers-and-consumer-names)
- [Ordered consumers](#ordered-consumers)
- [Receiving messages from the
consumer](#receiving-messages-from-the-consumer)
- [Single fetch](#single-fetch)
- [Continuous polling](#continuous-polling)
- [Using `Consume()` receive messages in a
callback](#using-consume-receive-messages-in-a-callback)
- [Using `Messages()` to iterate over incoming
messages](#using-messages-to-iterate-over-incoming-messages)
- [Publishing on stream](#publishing-on-stream)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [KeyValue Store](#keyvalue-store)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Object Store](#object-store)
- [Basic usage of Object Store](#basic-usage-of-object-store)
- [Watching for changes on a store](#watching-for-changes-on-a-store)
- [Additional operations on a store](#additional-operations-on-a-store)
- [Examples](#examples)

## Overview

Expand Down Expand Up @@ -118,15 +117,15 @@ func main() {
if err != nil {
// handle error
}

for msg := range msgs.Messages() {
msg.Ack()
fmt.Printf("Received a JetStream message via fetch: %s\n", string(msg.Data()))
messageCounter++
}

fmt.Printf("received %d messages\n", messageCounter)

if msgs.Error() != nil {
fmt.Println("Error during Fetch(): ", msgs.Error())
}
Expand Down Expand Up @@ -400,7 +399,7 @@ of messages/bytes. By default, `Fetch()` will wait 30 seconds before timing out
// receive up to 10 messages from the stream
msgs, err := c.Fetch(10)
if err != nil {
// handle error
// handle error
}

for msg := range msgs.Messages() {
Expand Down
12 changes: 9 additions & 3 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ type (
// for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)

// FetchBytes is used to retrieve up to a provided bytes from the
Expand All @@ -82,7 +84,9 @@ type (
// for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error)

// FetchNoWait is used to retrieve up to a provided number of messages
Expand All @@ -94,7 +98,9 @@ type (
// channel for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
FetchNoWait(batch int) (MessageBatch, error)

// Consume will continuously receive messages and handle them
Expand Down
27 changes: 24 additions & 3 deletions jetstream/jetstream_options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 The NATS Authors
// Copyright 2022-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -104,6 +104,9 @@ func WithGetMsgSubject(subject string) GetMsgOpt {
// PullMaxMessages limits the number of messages to be buffered in the client.
// If not provided, a default of 500 messages will be used.
// This option is exclusive with PullMaxBytes.
//
// PullMaxMessages implements both PullConsumeOpt and PullMessagesOpt, allowing
// it to configure Consumer.Consume and Consumer.Messages.
type PullMaxMessages int

func (max PullMaxMessages) configureConsume(opts *consumeOpts) error {
Expand All @@ -125,6 +128,9 @@ func (max PullMaxMessages) configureMessages(opts *consumeOpts) error {
// PullExpiry sets timeout on a single pull request, waiting until at least one
// message is available.
// If not provided, a default of 30 seconds will be used.
//
// PullExpiry implements both PullConsumeOpt and PullMessagesOpt, allowing
// it to configure Consumer.Consume and Consumer.Messages.
type PullExpiry time.Duration

func (exp PullExpiry) configureConsume(opts *consumeOpts) error {
Expand All @@ -148,6 +154,9 @@ func (exp PullExpiry) configureMessages(opts *consumeOpts) error {
// PullMaxBytes limits the number of bytes to be buffered in the client.
// If not provided, the limit is not set (max messages will be used instead).
// This option is exclusive with PullMaxMessages.
//
// PullMaxBytes implements both PullConsumeOpt and PullMessagesOpt, allowing
// it to configure Consumer.Consume and Consumer.Messages.
type PullMaxBytes int

func (max PullMaxBytes) configureConsume(opts *consumeOpts) error {
Expand All @@ -166,8 +175,11 @@ func (max PullMaxBytes) configureMessages(opts *consumeOpts) error {
return nil
}

// PullThresholdMessages sets the message count on which Consume will trigger
// PullThresholdMessages sets the message count on which consuming will trigger
// new pull request to the server. Defaults to 50% of MaxMessages.
//
// PullThresholdMessages implements both PullConsumeOpt and PullMessagesOpt,
// allowing it to configure Consumer.Consume and Consumer.Messages.
type PullThresholdMessages int

func (t PullThresholdMessages) configureConsume(opts *consumeOpts) error {
Expand All @@ -180,8 +192,11 @@ func (t PullThresholdMessages) configureMessages(opts *consumeOpts) error {
return nil
}

// PullThresholdBytes sets the byte count on which Consume will trigger
// PullThresholdBytes sets the byte count on which consuming will trigger
// new pull request to the server. Defaults to 50% of MaxBytes (if set).
//
// PullThresholdBytes implements both PullConsumeOpt and PullMessagesOpt,
// allowing it to configure Consumer.Consume and Consumer.Messages.
type PullThresholdBytes int

func (t PullThresholdBytes) configureConsume(opts *consumeOpts) error {
Expand All @@ -199,6 +214,9 @@ func (t PullThresholdBytes) configureMessages(opts *consumeOpts) error {
// than the idle heartbeat setting, the subscription will be removed
// and error will be passed to the message handler.
// If not provided, a default PullExpiry / 2 will be used (capped at 30 seconds)
//
// PullHeartbeat implements both PullConsumeOpt and PullMessagesOpt, allowing
// it to configure Consumer.Consume and Consumer.Messages.
type PullHeartbeat time.Duration

func (hb PullHeartbeat) configureConsume(opts *consumeOpts) error {
Expand All @@ -221,6 +239,9 @@ func (hb PullHeartbeat) configureMessages(opts *consumeOpts) error {

// StopAfter sets the number of messages after which the consumer is
// automatically stopped and no more messages are pulled from the server.
//
// StopAfter implements both PullConsumeOpt and PullMessagesOpt, allowing
// it to configure Consumer.Consume and Consumer.Messages.
type StopAfter int

func (nMsgs StopAfter) configureConsume(opts *consumeOpts) error {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/pull.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 The NATS Authors
// Copyright 2022-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion micro/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# NATS micro
# NATS micro [![GoDoc](https://pkg.go.dev/badge/github.com/nats-io/nats.go/micro.svg)](https://pkg.go.dev/github.com/nats-io/nats.go/micro)

- [Overview](#overview)
- [Basic usage](#basic-usage)
Expand Down
Loading