-
Notifications
You must be signed in to change notification settings - Fork 626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persistent message map #1996
base: master
Are you sure you want to change the base?
Persistent message map #1996
Changes from all commits
0527f01
c0f5d0c
eaa8907
2845740
60219a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -15,6 +15,7 @@ import ( | |||||||
"github.com/d5/tengo/v2/stdlib" | ||||||||
lru "github.com/hashicorp/golang-lru" | ||||||||
"github.com/kyokomi/emoji/v2" | ||||||||
"github.com/philippgille/gokv" | ||||||||
"github.com/sirupsen/logrus" | ||||||||
) | ||||||||
|
||||||||
|
@@ -29,21 +30,24 @@ type Gateway struct { | |||||||
Message chan config.Message | ||||||||
Name string | ||||||||
Messages *lru.Cache | ||||||||
MessageStore gokv.Store | ||||||||
CanonicalStore gokv.Store | ||||||||
|
||||||||
logger *logrus.Entry | ||||||||
} | ||||||||
|
||||||||
type BrMsgID struct { | ||||||||
br *bridge.Bridge | ||||||||
ID string | ||||||||
Protocol string | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why removing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are directly storing these values in our persistent store, storing a reference to br, would just be garbage data when we read it back. matterbridge/gateway/gateway.go Lines 267 to 269 in 8587fa8
For this persistent feature we are removing this reference and storing the direct values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I understand not storing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the in memory cache yes. I could be misunderstanding something about how references work in golang, correct me if I'm wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you actually store There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh ok I see. Yes, that would make more sense and would make its functionality more clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually maybe I'm misunderstanding type BrMsgID struct {
Protocol bridge.Bridge.Protocol
DestName bridge.Bridge.Name
ChannelID string
ID string
} causes build errors:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BrMsgID stays the same with br *bridge.Bridge, where you need the protocol just use br.Protocol There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might also be misunderstanding what you mean, but this change is most important in this code snippet When we get the |
||||||||
DestName string | ||||||||
ChannelID string | ||||||||
ID string | ||||||||
} | ||||||||
|
||||||||
const apiProtocol = "api" | ||||||||
|
||||||||
// New creates a new Gateway object associated with the specified router and | ||||||||
// following the given configuration. | ||||||||
func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { | ||||||||
func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) (*Gateway, error) { | ||||||||
logger := rootLogger.WithFields(logrus.Fields{"prefix": "gateway"}) | ||||||||
|
||||||||
cache, _ := lru.New(5000) | ||||||||
|
@@ -59,12 +63,55 @@ func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { | |||||||
if err := gw.AddConfig(cfg); err != nil { | ||||||||
logger.Errorf("Failed to add configuration to gateway: %#v", err) | ||||||||
} | ||||||||
return gw | ||||||||
|
||||||||
persistentMessageStorePath := gw.BridgeValues().General.PersistentMessageStorePath | ||||||||
usePersistent := persistentMessageStorePath != "" | ||||||||
|
||||||||
if usePersistent { | ||||||||
rootPath := fmt.Sprintf("%s/%s", persistentMessageStorePath, gw.Name) | ||||||||
err := os.MkdirAll(rootPath, os.ModePerm) | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
|
||||||||
MessageStore, err := gw.getMessageMapStore(fmt.Sprintf("%s/Messages", rootPath)) | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
gw.MessageStore = MessageStore | ||||||||
|
||||||||
CanonicalStore, err := gw.getMessageMapStore(fmt.Sprintf("%s/Canonical", rootPath)) | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
gw.CanonicalStore = CanonicalStore | ||||||||
} | ||||||||
|
||||||||
return gw, nil | ||||||||
} | ||||||||
|
||||||||
func (gw *Gateway) SetMessageMap(canonicalMsgID string, msgIDs []*BrMsgID) { | ||||||||
usePersistent := gw.BridgeValues().General.PersistentMessageStorePath != "" | ||||||||
if usePersistent { | ||||||||
gw.setDestMessagesToStore(canonicalMsgID, msgIDs) | ||||||||
} else { | ||||||||
gw.Messages.Add(canonicalMsgID, msgIDs) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
// FindCanonicalMsgID returns the ID under which a message was stored in the cache. | ||||||||
func (gw *Gateway) FindCanonicalMsgID(protocol string, mID string) string { | ||||||||
ID := protocol + " " + mID | ||||||||
|
||||||||
usePersistent := gw.BridgeValues().General.PersistentMessageStorePath != "" | ||||||||
if usePersistent { | ||||||||
return gw.getCanonicalMessageFromStore(ID) | ||||||||
} else { | ||||||||
return gw.getCanonicalMessageFromMemCache(ID) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
func (gw *Gateway) getCanonicalMessageFromMemCache(ID string) string { | ||||||||
if gw.Messages.Contains(ID) { | ||||||||
return ID | ||||||||
} | ||||||||
|
@@ -259,13 +306,26 @@ func (gw *Gateway) getDestChannel(msg *config.Message, dest bridge.Bridge) []con | |||||||
} | ||||||||
|
||||||||
func (gw *Gateway) getDestMsgID(msgID string, dest *bridge.Bridge, channel *config.ChannelInfo) string { | ||||||||
var destID string | ||||||||
|
||||||||
usePersistent := gw.BridgeValues().General.PersistentMessageStorePath != "" | ||||||||
if usePersistent { | ||||||||
destID = gw.getDestMessagesFromStore(msgID, dest, channel) | ||||||||
} else { | ||||||||
destID = gw.getDestMessageFromMemCache(msgID, dest, channel) | ||||||||
} | ||||||||
|
||||||||
return strings.Replace(destID, dest.Protocol+" ", "", 1) | ||||||||
} | ||||||||
|
||||||||
func (gw *Gateway) getDestMessageFromMemCache(msgID string, dest *bridge.Bridge, channel *config.ChannelInfo) string { | ||||||||
if res, ok := gw.Messages.Get(msgID); ok { | ||||||||
IDs := res.([]*BrMsgID) | ||||||||
for _, id := range IDs { | ||||||||
// check protocol, bridge name and channelname | ||||||||
// for people that reuse the same bridge multiple times. see #342 | ||||||||
if dest.Protocol == id.br.Protocol && dest.Name == id.br.Name && channel.ID == id.ChannelID { | ||||||||
return strings.Replace(id.ID, dest.Protocol+" ", "", 1) | ||||||||
if dest.Protocol == id.Protocol && dest.Name == id.DestName && channel.ID == id.ChannelID { | ||||||||
return id.ID | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason it can't be in 1 file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in order to add this feature without significant refactor we are still following the whole concept of a "canonical" message.
We need two separate mappings one for message->canonical (
CanonicalStore
) and another for canonical->message[] (MessageStore
)This is all highly related to this another PR:
#1991 (comment)
#1991 (comment)
On the file system the directories look like this:
Each bridge gets its own subdirectory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but it's a k/v store, why can't you just put it in 1 file and use a "canonical" and "messages" prefix for the keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we can I suppose, I don't really see how that's beneficial.
I feel the code is more readable to have distinct mappings for their functionality?
Plus, removes the risk of breaking old message stores by needing to change a hypothetical prefix.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like I didn't see that you said a subdirectory per bridge.
In my opinion it's much nicer to have everything in 1 file instead of a lot of files. So I'm even proposing to just have one database containing everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Particularly for the subdirectories per bridge, I prefer that solution as its more flexible.
Flexible in the manner, that each folder is as if its its own table.
If a user needs to rename a bridge and don't want to lose their historical data they can just rename the directory.
Additionally Badger db does not support buckets/tables for each keystore so sub directories is really the only way to do it.
for example what I did in another PR with bbolt:
https://github.com/yousefmansy1/matterbridge/blob/5353b32c1a21d2655f8e12e76f81628373acd6f5/gateway/persistent.go#L21-L34
The way I'd like to treat it closer to a set of tables in a DB rather than one big KV store where we dump things in and "filter" with prefixes.
For the non technical user they can just treat the root directory as the whole db and ignore it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way unless we're always closing and opening the DB/KV handler with each Read/Write we do need individual stores for each one as each KV store actually holds a lock over the store. No other stores will be able to open it until it closes it's connection.
Its probably a little stupid but we'll never run into this blocking if each gateway has its own store it keeps for itself.