diff --git a/examples/identity-update-stream/README.md b/examples/identity-update-stream/README.md new file mode 100644 index 00000000..91523368 --- /dev/null +++ b/examples/identity-update-stream/README.md @@ -0,0 +1,20 @@ +# Identity Update Stream Example + +## Background + +XMTP nodes expose a streaming API that provides callers with all identity association changes on the network in real-time. + +An identity association change event is created any time a blockchain account (wallet address) is associated with an Inbox ID or revoked from an Inbox ID. + +This API method is not supported in our web or mobile SDKs, since the intended use is to call it from a backend service. + +## Use-cases + +- An application may want to keep an up-to-date registry of all account address -> inbox ID mappings to power a search API. +- An application may want to notify users when a new wallet association is added to their InboxID + +## Usage + +This API can be called from any environment that supports GRPC and Protobuf. You can generate a client in your language of choice following the instructions [here](https://protobuf.dev/reference/). The .proto can be found [here](https://github.com/xmtp/proto/blob/main/proto/identity/api/v1/identity.proto). + +This [example](./main.go) uses a Golang client generated via Buf and connects to a local instance of our node software. To connect to the `dev` or `production` network you would need to replace `localhost:50051` with `grpc.dev.xmtp.network:443` or `grpc.production.xmtp.network:443` and configure the client to use TLS. diff --git a/examples/identity-update-stream/main.go b/examples/identity-update-stream/main.go new file mode 100644 index 00000000..c9cf1b89 --- /dev/null +++ b/examples/identity-update-stream/main.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "log" + "time" + + identityProto "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func main() { + ctx := context.Background() + + // Connect to the identity service + // To connect to the dev or production environment, you will need to use secure credentials + conn, err := grpc.Dial("localhost:5556", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Create identity service client + client := identityProto.NewIdentityApiClient(conn) + + // Subscribe to association changes + stream, err := client.SubscribeAssociationChanges(ctx, &identityProto.SubscribeAssociationChangesRequest{}) + if err != nil { + log.Fatalf("Failed to subscribe: %v", err) + } + + // Read from the stream + for { + msg, err := stream.Recv() + if err != nil { + log.Fatalf("Failed to receive message: %v", err) + } + dateString := nsToDate(msg.TimestampNs) + + if msg.GetAccountAddressAssociation() != nil { + accountAddressAssociation := msg.GetAccountAddressAssociation() + log.Printf("Wallet %s was associated with inbox %s at %s", + accountAddressAssociation.AccountAddress, + accountAddressAssociation.InboxId, + dateString, + ) + } + if msg.GetAccountAddressRevocation() != nil { + accountAddressRevocation := msg.GetAccountAddressRevocation() + log.Printf("Wallet %s was revoked from inbox %s at %s", + accountAddressRevocation.AccountAddress, + accountAddressRevocation.InboxId, + dateString, + ) + } + } +} + +func nsToDate(ns uint64) string { + return time.Unix(0, int64(ns)).Format(time.RFC3339) +} diff --git a/pkg/identity/api/v1/identity_service.go b/pkg/identity/api/v1/identity_service.go index 189fdd6f..d327fc30 100644 --- a/pkg/identity/api/v1/identity_service.go +++ b/pkg/identity/api/v1/identity_service.go @@ -162,7 +162,12 @@ func (s *Service) SubscribeAssociationChanges(req *identity.SubscribeAssociation _ = sub.Unsubscribe() }() - return nil + select { + case <-stream.Context().Done(): + return nil + case <-s.ctx.Done(): + return nil + } } func (s *Service) PublishAssociationChangesEvent(ctx context.Context, identityUpdateResult *identityTypes.PublishIdentityUpdateResult) error {