From d4caa87c9843958fcdd2dcd039a102407e9340b7 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 3 Jan 2025 16:52:59 +0100 Subject: [PATCH] AMQP-0.9.1 consumer (#57) --- README.md | 3 +- cmd/root.go | 76 +++++++++++++- main_test.go | 16 +-- pkg/amqp091/consumer.go | 215 +++++++++++++++++++++++++++++++++++++++ pkg/amqp091/publisher.go | 11 +- pkg/amqp10/consumer.go | 31 +----- pkg/common/common.go | 6 ++ pkg/config/config.go | 2 +- pkg/mgmt/mgmt.go | 6 +- pkg/stomp/consumer.go | 15 ++- 10 files changed, 334 insertions(+), 47 deletions(-) create mode 100644 pkg/amqp091/consumer.go diff --git a/README.md b/README.md index 31caabf..00f00aa 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ ## omq -`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0 -and partially AMQP 0.9.1 (only for publishing). It is developed mostly for RabbitMQ but might be useful for other brokers +`omq` is a messaging system client for testing purposes. It currently supports AMQP 1.0, AMQP 0.9.1, STOMP and MQTT 3.1/3.1.1/5.0. It is developed mostly for RabbitMQ but might be useful for other brokers as well (some tests against ActiveMQ were performed). `omq` starts a group of publishers and a group of consumers, in both cases all publishers/consumers are identical, diff --git a/cmd/root.go b/cmd/root.go index 2fea272..eceecbd 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -33,13 +33,16 @@ import ( var cfg config.Config var ( amqp_amqp = &cobra.Command{} + amqp_amqp091 = &cobra.Command{} amqp_stomp = &cobra.Command{} amqp_mqtt = &cobra.Command{} stomp_stomp = &cobra.Command{} stomp_amqp = &cobra.Command{} + stomp_amqp091 = &cobra.Command{} stomp_mqtt = &cobra.Command{} mqtt_mqtt = &cobra.Command{} mqtt_amqp = &cobra.Command{} + mqtt_amqp091 = &cobra.Command{} mqtt_stomp = &cobra.Command{} amqp091_amqp091 = &cobra.Command{} amqp091_amqp = &cobra.Command{} @@ -53,6 +56,7 @@ var ( amqpAppProperties []string amqpAppPropertyFilters []string amqpPropertyFilters []string + streamOffset string ) var ( @@ -114,6 +118,9 @@ func RootCmd() *cobra.Command { amqpConsumerFlags.StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{}, "AMQP property filters, eg. key1=&p:prefix") + amqp091PublisherFlags := pflag.NewFlagSet("amqp091-publisher", pflag.ContinueOnError) + amqp091ConsumerFlags := pflag.NewFlagSet("amqp091-consumer", pflag.ContinueOnError) + amqp_amqp = &cobra.Command{ Use: "amqp-amqp", Aliases: []string{"amqp"}, @@ -126,6 +133,17 @@ func RootCmd() *cobra.Command { amqp_amqp.Flags().AddFlagSet(amqpPublisherFlags) amqp_amqp.Flags().AddFlagSet(amqpConsumerFlags) + amqp_amqp091 = &cobra.Command{ + Use: "amqp-amqp091", + Run: func(cmd *cobra.Command, args []string) { + cfg.PublisherProto = config.AMQP + cfg.ConsumerProto = config.AMQP091 + start(cfg) + }, + } + amqp_amqp.Flags().AddFlagSet(amqpPublisherFlags) + amqp_amqp.Flags().AddFlagSet(amqp091ConsumerFlags) + amqp_stomp = &cobra.Command{ Use: "amqp-stomp", Run: func(cmd *cobra.Command, args []string) { @@ -167,6 +185,16 @@ func RootCmd() *cobra.Command { } stomp_amqp.Flags().AddFlagSet(amqpConsumerFlags) + stomp_amqp091 = &cobra.Command{ + Use: "stomp-amqp091", + Run: func(cmd *cobra.Command, args []string) { + cfg.PublisherProto = config.STOMP + cfg.ConsumerProto = config.AMQP091 + start(cfg) + }, + } + stomp_amqp.Flags().AddFlagSet(amqp091ConsumerFlags) + stomp_mqtt = &cobra.Command{ Use: "stomp-mqtt", Run: func(cmd *cobra.Command, args []string) { @@ -200,6 +228,17 @@ func RootCmd() *cobra.Command { mqtt_amqp.Flags().AddFlagSet(mqttPublisherFlags) mqtt_amqp.Flags().AddFlagSet(amqpConsumerFlags) + mqtt_amqp091 = &cobra.Command{ + Use: "mqtt-amqp091", + Run: func(cmd *cobra.Command, args []string) { + cfg.PublisherProto = config.MQTT + cfg.ConsumerProto = config.AMQP091 + start(cfg) + }, + } + mqtt_amqp.Flags().AddFlagSet(mqttPublisherFlags) + mqtt_amqp.Flags().AddFlagSet(amqp091ConsumerFlags) + mqtt_stomp = &cobra.Command{ Use: "mqtt-stomp", Run: func(cmd *cobra.Command, args []string) { @@ -219,6 +258,8 @@ func RootCmd() *cobra.Command { start(cfg) }, } + amqp091_amqp091.Flags().AddFlagSet(amqp091PublisherFlags) + amqp091_amqp091.Flags().AddFlagSet(amqp091ConsumerFlags) amqp091_amqp = &cobra.Command{ Use: "amqp091-amqp", @@ -228,6 +269,8 @@ func RootCmd() *cobra.Command { start(cfg) }, } + amqp091_amqp.Flags().AddFlagSet(amqp091PublisherFlags) + amqp091_amqp.Flags().AddFlagSet(amqpConsumerFlags) amqp091_mqtt = &cobra.Command{ Use: "amqp091-mqtt", @@ -237,6 +280,7 @@ func RootCmd() *cobra.Command { start(cfg) }, } + amqp091_mqtt.Flags().AddFlagSet(amqp091PublisherFlags) amqp091_stomp = &cobra.Command{ Use: "amqp091-stomp", @@ -246,6 +290,7 @@ func RootCmd() *cobra.Command { start(cfg) }, } + amqp091_stomp.Flags().AddFlagSet(amqp091PublisherFlags) versionCmd = &cobra.Command{ Use: "version", @@ -304,7 +349,7 @@ func RootCmd() *cobra.Command { "The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)") rootCmd.PersistentFlags().StringVar(&cfg.ConsumerId, "consumer-id", "omq-consumer-%d", "Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random)") - rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", + rootCmd.PersistentFlags().StringVar(&streamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)") rootCmd.PersistentFlags().Int32Var(&cfg.ConsumerPriority, "consumer-priority", 0, "Consumer priority") rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, @@ -353,13 +398,16 @@ func RootCmd() *cobra.Command { "The DNS name that will return members to synchronize with") rootCmd.AddCommand(amqp_amqp) + rootCmd.AddCommand(amqp_amqp091) rootCmd.AddCommand(amqp_stomp) rootCmd.AddCommand(amqp_mqtt) rootCmd.AddCommand(stomp_stomp) rootCmd.AddCommand(stomp_amqp) + rootCmd.AddCommand(stomp_amqp091) rootCmd.AddCommand(stomp_mqtt) rootCmd.AddCommand(mqtt_mqtt) rootCmd.AddCommand(mqtt_amqp) + rootCmd.AddCommand(mqtt_amqp091) rootCmd.AddCommand(mqtt_stomp) rootCmd.AddCommand(amqp091_amqp091) rootCmd.AddCommand(amqp091_amqp) @@ -645,6 +693,12 @@ func sanitizeConfig(cfg *config.Config) error { } } + offset, err := parseStreamOffset(streamOffset) + if err != nil { + return fmt.Errorf("invalid stream offset value") + } + cfg.StreamOffset = offset + // AMQP application properties cfg.Amqp.AppProperties = make(map[string][]string) for _, val := range amqpAppProperties { @@ -688,6 +742,26 @@ func sanitizeConfig(cfg *config.Config) error { return nil } +func parseStreamOffset(offset string) (any, error) { + switch offset { + case "": + return "", nil + case "next", "first", "last": + return offset, nil + default: + // check if streamOffset can be parsed as unsigned integer (chunkID) + if chunkID, err := strconv.ParseInt(offset, 10, 64); err == nil { + return chunkID, nil + } + // check if streamOffset can be parsed as an ISO 8601 timestamp + if timestamp, err := time.Parse(time.RFC3339, offset); err == nil { + return timestamp, nil + } + } + // return "", fmt.Errorf("invalid stream offset: %s", offset) + return offset, nil //, fmt.Errorf("invalid stream offset: %s", offset) +} + func handleInterupt(ctx context.Context, cancel context.CancelFunc) { go func() { c := make(chan os.Signal, 1) diff --git a/main_test.go b/main_test.go index 406d3f7..b47fa1e 100644 --- a/main_test.go +++ b/main_test.go @@ -109,17 +109,21 @@ var _ = Describe("OMQ CLI", func() { Eventually(session).Should(gbytes.Say(`omq_messages_consumed_total{priority="normal"} 1`)) }, Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"), - Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"), - Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"), + Entry("amqp -> amqp091", "amqp", "/queues/", "amqp", "/queues/"), Entry("amqp -> stomp", "amqp", "/exchanges/amq.topic/", "stomp", "/topic/"), Entry("amqp -> mqtt", "amqp", "/exchanges/amq.topic/", "mqtt", "/topic/"), - Entry("stomp -> stomp", "stomp", "/topic/", "stomp", "/topic/"), - Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"), - Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"), - Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"), Entry("amqp091 -> amqp", "amqp091", "/queues/", "amqp", "/queues/"), + Entry("amqp091 -> amqp091", "amqp091", "/queues/", "amqp", "/queues/"), Entry("amqp091 -> mqtt", "amqp091", "/exchanges/amq.topic/", "mqtt", "/topic/"), Entry("amqp091 -> stomp", "amqp091", "/exchanges/amq.topic/", "stomp", "/topic/"), + Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"), + Entry("mqtt -> amqp091", "mqtt", "/topic/", "amqp", "/queues/"), + Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"), + Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"), + Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"), + Entry("stomp -> amqp091", "stomp", "/topic/", "amqp", "/queues/"), + Entry("stomp -> stomp", "stomp", "/topic/", "stomp", "/topic/"), + Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"), ) DescribeTable("supports message priorities for AMQP and STOMP", diff --git a/pkg/amqp091/consumer.go b/pkg/amqp091/consumer.go new file mode 100644 index 0000000..11d33b1 --- /dev/null +++ b/pkg/amqp091/consumer.go @@ -0,0 +1,215 @@ +package amqp091 + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/rabbitmq/omq/pkg/config" + "github.com/rabbitmq/omq/pkg/log" + "github.com/rabbitmq/omq/pkg/utils" + + "github.com/rabbitmq/omq/pkg/metrics" + + amqp091 "github.com/rabbitmq/amqp091-go" +) + +type Amqp091Consumer struct { + Id int + Connection *amqp091.Connection + Channel *amqp091.Channel + Terminus string + Messages <-chan amqp091.Delivery + Config config.Config + whichUri int + ctx context.Context +} + +func NewConsumer(ctx context.Context, cfg config.Config, id int) *Amqp091Consumer { + consumer := &Amqp091Consumer{ + Id: id, + Connection: nil, + Channel: nil, + Terminus: utils.InjectId(cfg.ConsumeFrom, id), + Config: cfg, + whichUri: 0, + ctx: ctx, + } + + if cfg.SpreadConnections { + consumer.whichUri = (id - 1) % len(cfg.ConsumerUri) + } + + consumer.Connect() + + return consumer +} + +func (c *Amqp091Consumer) Connect() { + if c.Channel != nil { + _ = c.Channel.Close() + } + if c.Connection != nil { + _ = c.Connection.Close() + } + c.Channel = nil + c.Connection = nil + + for c.Connection == nil { + if c.whichUri >= len(c.Config.ConsumerUri) { + c.whichUri = 0 + } + uri := c.Config.ConsumerUri[c.whichUri] + c.whichUri++ + config := amqp091.Config{ + Properties: amqp091.Table{ + "connection_name": fmt.Sprintf("omq-consumer-%d", c.Id), + }, + } + conn, err := amqp091.DialConfig(uri, config) + if err != nil { + log.Error("consumer connection failed", "id", c.Id, "error", err.Error()) + select { + case <-c.ctx.Done(): + return + case <-time.After(1 * time.Second): + continue + } + } else { + log.Debug("consumer connected", "id", c.Id, "uri", uri) + c.Connection = conn + } + } + + for c.Channel == nil { + channel, err := c.Connection.Channel() + if err != nil { + if err == context.Canceled { + return + } else { + log.Error("consumer failed to create a channel", "id", c.Id, "error", err.Error()) + time.Sleep(1 * time.Second) + } + } else { + c.Channel = channel + } + } +} + +func (c *Amqp091Consumer) Subscribe() { + if c.Connection != nil { + _ = c.Channel.Qos(c.Config.ConsumerCredits, 0, false) + // TODO add auto-ack and exclusive options + consumeArgs := amqp091.Table{} + if c.Config.StreamOffset != "" { + consumeArgs["x-stream-offset"] = c.Config.StreamOffset + } + sub, err := c.Channel.Consume(strings.TrimPrefix(c.Terminus, "/queues/"), "", false, false, false, false, consumeArgs) + if err != nil { + log.Error("subscription failed", "id", c.Id, "queue", c.Terminus, "error", err.Error()) + return + } + c.Messages = sub + } +} + +func (c *Amqp091Consumer) Start(consumerReady chan bool) { + c.Subscribe() + close(consumerReady) + log.Info("consumer started", "id", c.Id, "terminus", c.Terminus) + previousMessageTimeSent := time.Unix(0, 0) + + for i := 1; i <= c.Config.ConsumeCount; { + for c.Messages == nil { + select { + case <-c.ctx.Done(): + c.Stop("context cancelled") + return + default: + c.Subscribe() + } + } + + select { + case <-c.ctx.Done(): + c.Stop("context cancelled") + return + case msg := <-c.Messages: + payload := msg.Body + priority := int(msg.Priority) + timeSent, latency := utils.CalculateEndToEndLatency(&payload) + metrics.EndToEndLatency.UpdateDuration(timeSent) + + if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) { + metrics.MessagesConsumedOutOfOrderMetric(priority).Inc() + log.Info("out of order message received. This message was sent before the previous message", + "this messsage", timeSent, + "previous message", previousMessageTimeSent) + } + previousMessageTimeSent = timeSent + + log.Debug("message received", + "id", c.Id, + "terminus", c.Terminus, + "size", len(payload), + "priority", priority, + "latency", latency) + + if c.Config.ConsumerLatency > 0 { + log.Debug("consumer latency", "id", c.Id, "latency", c.Config.ConsumerLatency) + time.Sleep(c.Config.ConsumerLatency) + } + + outcome, err := c.outcome(msg.DeliveryTag) + if err != nil { + if err == context.Canceled { + c.Stop("context canceled") + return + } + log.Error("failed to "+outcome+" message", "id", c.Id, "terminus", c.Terminus, "error", err) + } else { + metrics.MessagesConsumedMetric(priority).Inc() + i++ + log.Debug("message "+pastTense(outcome), "id", c.Id, "terminus", c.Terminus) + } + } + } + + c.Stop("--cmessages value reached") + log.Debug("consumer finished", "id", c.Id) +} + +func (c *Amqp091Consumer) outcome(tag uint64) (string, error) { + // don't generate random numbers if not necessary + if c.Config.Amqp.ReleaseRate == 0 && c.Config.Amqp.RejectRate == 0 { + return "acknowledge", c.Channel.Ack(tag, false) + } + // TODO implement NACKing + log.Error("AMQP 0.9.1 doesn't support release/reject rates yet") + os.Exit(1) + return "", nil +} + +func pastTense(outcome string) string { + switch outcome { + case "accept": + return "accepted" + case "release": + return "released" + case "reject": + return "rejected" + } + return "huh?" +} + +func (c *Amqp091Consumer) Stop(reason string) { + if c.Channel != nil { + _ = c.Channel.Close() + } + if c.Connection != nil { + _ = c.Connection.Close() + } + log.Debug("consumer stopped", "id", c.Id, "reason", reason) +} diff --git a/pkg/amqp091/publisher.go b/pkg/amqp091/publisher.go index 9908ee4..741b2c3 100644 --- a/pkg/amqp091/publisher.go +++ b/pkg/amqp091/publisher.go @@ -2,6 +2,7 @@ package amqp091 import ( "context" + "fmt" "math/rand" "strings" "sync" @@ -53,7 +54,6 @@ func NewPublisher(ctx context.Context, cfg config.Config, id int) *Amqp091Publis } func (p *Amqp091Publisher) Connect() { - var conn *amqp091.Connection var err error if p.Connection != nil { @@ -67,9 +67,14 @@ func (p *Amqp091Publisher) Connect() { } uri := p.Config.PublisherUri[p.whichUri] p.whichUri++ - conn, err = amqp091.Dial(uri) + config := amqp091.Config{ + Properties: amqp091.Table{ + "connection_name": fmt.Sprintf("omq-publisher-%d", p.Id), + }, + } + conn, err := amqp091.DialConfig(uri, config) if err != nil { - log.Error("connection failed", "id", p.Id, "error", err.Error()) + log.Error("publisher connection failed", "id", p.Id, "error", err.Error()) select { case <-time.After(1 * time.Second): continue diff --git a/pkg/amqp10/consumer.go b/pkg/amqp10/consumer.go index d68b920..ac9a38f 100644 --- a/pkg/amqp10/consumer.go +++ b/pkg/amqp10/consumer.go @@ -3,16 +3,12 @@ package amqp10 import ( "context" "crypto/tls" - "fmt" "math/rand" - "os" - "strconv" "time" "github.com/rabbitmq/omq/pkg/config" "github.com/rabbitmq/omq/pkg/log" "github.com/rabbitmq/omq/pkg/utils" - "github.com/relvacode/iso8601" "github.com/rabbitmq/omq/pkg/metrics" @@ -268,13 +264,7 @@ func buildLinkFilters(cfg config.Config) []amqp.LinkFilter { var filters []amqp.LinkFilter if cfg.StreamOffset != "" { - // parse stream offset - offset, err := parseStreamOffset(cfg.StreamOffset) - if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s\n", err) - os.Exit(1) - } - filters = append(filters, amqp.NewLinkFilter("rabbitmq:stream-offset-spec", 0, offset)) + filters = append(filters, amqp.NewLinkFilter("rabbitmq:stream-offset-spec", 0, cfg.StreamOffset)) } if cfg.StreamFilterValues != "" { @@ -299,22 +289,3 @@ func buildLinkFilters(cfg config.Config) []amqp.LinkFilter { } return filters } - -func parseStreamOffset(offset string) (any, error) { - switch offset { - case "": - return nil, nil - case "next", "first", "last": - return offset, nil - default: - // check if streamOffset can be parsed as unsigned integer (chunkID) - if chunkID, err := strconv.ParseUint(offset, 10, 64); err == nil { - return chunkID, nil - } - // check if streamOffset can be parsed as an ISO 8601 timestamp - if timestamp, err := iso8601.ParseString(offset); err == nil { - return timestamp, nil - } - } - return nil, fmt.Errorf("invalid stream offset: %s", offset) -} diff --git a/pkg/common/common.go b/pkg/common/common.go index fb16be4..ae16b19 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -58,6 +58,12 @@ func NewConsumer(ctx context.Context, protocol config.Protocol, cfg config.Confi return nil, fmt.Errorf("failed to create an AMQP-1.0 consumer") } return c, nil + case config.AMQP091: + c := amqp091.NewConsumer(ctx, cfg, id) + if c == nil { + return nil, fmt.Errorf("failed to create an AMQP-1.0 consumer") + } + return c, nil case config.STOMP: c := stomp.NewConsumer(ctx, cfg, id) if c == nil { diff --git a/pkg/config/config.go b/pkg/config/config.go index e79aeb0..718bea4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -97,7 +97,7 @@ type Config struct { MessageDurability bool MessagePriority string // to allow for "unset" value and STOMP takes strings anyway MessageTTL time.Duration - StreamOffset string + StreamOffset any StreamFilterValues string StreamFilterValueSet string ConsumerPriority int32 diff --git a/pkg/mgmt/mgmt.go b/pkg/mgmt/mgmt.go index 3b4b190..c384692 100644 --- a/pkg/mgmt/mgmt.go +++ b/pkg/mgmt/mgmt.go @@ -73,15 +73,15 @@ func (m *Mgmt) connection() rmq.IConnection { func (m *Mgmt) DeclareQueues(cfg config.Config) { log.Info("Declaring queues...") - // declare queues for AMQP publishers + // declare queues for AMQP 1.0 and 0.9.1 publishers if (cfg.PublisherProto == config.AMQP || cfg.PublisherProto == config.AMQP091) && strings.HasPrefix(cfg.PublishTo, "/queues/") { queueName := strings.TrimPrefix(cfg.PublishTo, "/queues/") for i := 1; i <= cfg.Publishers; i++ { m.DeclareAndBind(cfg, utils.InjectId(queueName, i), i) } } - // declare queues for AMQP consumers - if cfg.ConsumerProto == config.AMQP { + // declare queues for AMQP 1.0 and 0.9.1 consumers + if cfg.ConsumerProto == config.AMQP || cfg.ConsumerProto == config.AMQP091 { if strings.HasPrefix(cfg.ConsumeFrom, "/queues/") { for i := 1; i <= cfg.Consumers; i++ { queueName := strings.TrimPrefix(cfg.ConsumeFrom, "/queues/") diff --git a/pkg/stomp/consumer.go b/pkg/stomp/consumer.go index cecb48b..555a703 100644 --- a/pkg/stomp/consumer.go +++ b/pkg/stomp/consumer.go @@ -181,7 +181,7 @@ func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error { var subscribeOpts []func(*frame.Frame) error subscribeOpts = append(subscribeOpts, - stomp.SubscribeOpt.Header("x-stream-offset", cfg.StreamOffset), + stomp.SubscribeOpt.Header("x-stream-offset", offsetToString(cfg.StreamOffset)), stomp.SubscribeOpt.Header("prefetch-count", strconv.Itoa(cfg.ConsumerCredits))) if cfg.ConsumerPriority != 0 { @@ -202,3 +202,16 @@ func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error { } return subscribeOpts } + +func offsetToString(offset any) string { + if s, ok := offset.(string); ok { + return s + } + if t, ok := offset.(time.Time); ok { + return "timestamp=" + strconv.FormatInt(t.Unix(), 10) + } + if i, ok := offset.(int); ok { + return "offset=" + strconv.Itoa(i) + } + return "next" +}