Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

godoc: Minor API doc edits. Timestamps are Unix milliseconds. #1111

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,26 +995,30 @@ type AlterUserScramCredentialsResult struct {
Errors map[string]Error
}

// OffsetSpec specifies desired offsets while using ListOffsets.
// OffsetSpec specifies the desired offsets for ListOffsets. Use one of the
// defined constants or NewOffsetSpecForTimestamp.
type OffsetSpec int64

const (
// MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side.
MaxTimestampOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP)
// EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition.
EarliestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_EARLIEST)
// LatestOffsetSpec is used to describe the latest offset for the TopicPartition.
LatestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_LATEST)
// MaxTimestampOffsetSpec retrieves the offset with the largest Timestamp.
// This may be different then LatestOffsetSpec as Timestamps can be set client side.
MaxTimestampOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP
// EarliestOffsetSpec retrieves the earliest offset for the TopicPartition.
EarliestOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_EARLIEST
// LatestOffsetSpec retrieves the latest offset for the TopicPartition.
LatestOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_LATEST
)

// NewOffsetSpecForTimestamp creates an OffsetSpec corresponding to the timestamp.
func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec {
return OffsetSpec(timestamp)
// NewOffsetSpecForTimestamp creates an OffsetSpec to retrieve the earliest offset whose
// timestamp is greater than or equal to the timestamp in Unix milliseconds.
func NewOffsetSpecForTimestamp(timestamp_ms int64) OffsetSpec {
return OffsetSpec(timestamp_ms)
Comment on lines +1014 to +1015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewOffsetSpecForTimestamp(timestamp_ms int64) OffsetSpec {
return OffsetSpec(timestamp_ms)
func NewOffsetSpecForTimestamp(timestampMs int64) OffsetSpec {
return OffsetSpec(timestampMs)

(nit)

}

// ListOffsetsResultInfo describes the result of ListOffsets request for a Topic Partition.
// ListOffsetsResultInfo describes the result of a ListOffsets request for one Topic Partition.
type ListOffsetsResultInfo struct {
Offset Offset
Offset Offset
// Timestamp is in Unix milliseconds.
Timestamp int64
LeaderEpoch *int32
Error Error
Expand Down Expand Up @@ -3218,15 +3222,16 @@ func (a *AdminClient) DescribeUserScramCredentials(
return result, nil
}

// ListOffsets describe offsets for the
// specified TopicPartiton based on an OffsetSpec.
// ListOffsets returns offsets for a set of
// TopicPartitions based on an OffsetSpec.
//
// Parameters:
//
// - `ctx` - context with the maximum amount of time to block, or nil for
// indefinite.
// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it
// holds either the OffsetSpec enum value or timestamp. Must not be nil.
// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec. An
// OffsetSpec is either a defined enum value or a timestamp created by
// NewOffsetSpecForTimestamp. Must not be nil.
// - `options` - ListOffsetsAdminOption options.
//
// Returns a ListOffsetsResult.
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high
//
// The timestamps to query are represented as `.Offset` in the `times`
// argument and the looked up offsets are represented as `.Offset` in the returned
// `offsets` list.
// `offsets` list. Timestamps are in Unix milliseconds.
//
// The function will block for at most timeoutMs milliseconds.
//
Expand Down
9 changes: 5 additions & 4 deletions kafka/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
"unsafe"
)

// Error provides a Kafka-specific error container
// Error provides a Kafka-specific error container. Check that Code() != ErrorNoError
// when embedded in result structs.
type Error struct {
code ErrorCode
str string
Expand Down Expand Up @@ -85,8 +86,8 @@ func newErrorFromCErrorDestroy(cError *C.rd_kafka_error_t) Error {
return newErrorFromCError(cError)
}

// Error returns a human readable representation of an Error
// Same as Error.String()
// Error returns a human readable representation of an Error.
// Same as Error.String().
func (e Error) Error() string {
return e.String()
}
Expand All @@ -107,7 +108,7 @@ func (e Error) String() string {
return errstr
}

// Code returns the ErrorCode of an Error
// Code returns the ErrorCode of this Error.
func (e Error) Code() ErrorCode {
return e.code
}
Expand Down
3 changes: 2 additions & 1 deletion kafka/error_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func WriteErrorCodes(f *os.File) {
*/
import "C"

// ErrorCode is the integer representation of local and broker error codes
// ErrorCode is the integer representation of local and broker error codes.
// ErrNoError (0) represents success.
type ErrorCode int

// String returns a human readable representation of an error code
Expand Down
2 changes: 1 addition & 1 deletion kafka/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func OffsetTail(relativeOffset Offset) Offset {
//
// The timestamps to query are represented as `.Offset` in the `times`
// argument and the looked up offsets are represented as `.Offset` in the returned
// `offsets` list.
// `offsets` list. Timestamps are in Unix milliseconds.
//
// The function will block for at most timeoutMs milliseconds.
//
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutM
//
// The timestamps to query are represented as `.Offset` in the `times`
// argument and the looked up offsets are represented as `.Offset` in the returned
// `offsets` list.
// `offsets` list. Timestamps are in Unix milliseconds.
//
// The function will block for at most timeoutMs milliseconds.
//
Expand Down