Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example usage of the SubscribeAssociationChanges API #415

Open
wants to merge 1 commit into
base: 12-31-start_publishing_events
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions examples/identity-update-stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Identity Update Stream Example

## Background

XMTP nodes expose a streaming API that provides callers with all identity association changes on the network in real-time.

An identity association change event is created any time a blockchain account (wallet address) is associated with an Inbox ID or revoked from an Inbox ID.

This API method is not supported in our web or mobile SDKs, since the intended use is to call it from a backend service.

## Use-cases

- An application may want to keep an up-to-date registry of all account address -> inbox ID mappings to power a search API.
- An application may want to notify users when a new wallet association is added to their InboxID

## Usage

This API can be called from any environment that supports GRPC and Protobuf. You can generate a client in your language of choice following the instructions [here](https://protobuf.dev/reference/). The .proto can be found [here](https://github.com/xmtp/proto/blob/main/proto/identity/api/v1/identity.proto).

This [example](./main.go) uses a Golang client generated via Buf and connects to a local instance of our node software. To connect to the `dev` or `production` network you would need to replace `localhost:50051` with `grpc.dev.xmtp.network:443` or `grpc.production.xmtp.network:443` and configure the client to use TLS.
62 changes: 62 additions & 0 deletions examples/identity-update-stream/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"log"
"time"

identityProto "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func main() {
ctx := context.Background()

// Connect to the identity service
// To connect to the dev or production environment, you will need to use secure credentials
conn, err := grpc.Dial("localhost:5556", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

// Create identity service client
client := identityProto.NewIdentityApiClient(conn)

// Subscribe to association changes
stream, err := client.SubscribeAssociationChanges(ctx, &identityProto.SubscribeAssociationChangesRequest{})
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}

// Read from the stream
for {
msg, err := stream.Recv()
if err != nil {
log.Fatalf("Failed to receive message: %v", err)
}
dateString := nsToDate(msg.TimestampNs)

if msg.GetAccountAddressAssociation() != nil {
accountAddressAssociation := msg.GetAccountAddressAssociation()
log.Printf("Wallet %s was associated with inbox %s at %s",
accountAddressAssociation.AccountAddress,
accountAddressAssociation.InboxId,
dateString,
)
}
if msg.GetAccountAddressRevocation() != nil {
accountAddressRevocation := msg.GetAccountAddressRevocation()
log.Printf("Wallet %s was revoked from inbox %s at %s",
accountAddressRevocation.AccountAddress,
accountAddressRevocation.InboxId,
dateString,
)
}
}
}

func nsToDate(ns uint64) string {
return time.Unix(0, int64(ns)).Format(time.RFC3339)
}
7 changes: 6 additions & 1 deletion pkg/identity/api/v1/identity_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ func (s *Service) SubscribeAssociationChanges(req *identity.SubscribeAssociation
_ = sub.Unsubscribe()
}()

return nil
select {
case <-stream.Context().Done():
return nil
case <-s.ctx.Done():
return nil
}
}

func (s *Service) PublishAssociationChangesEvent(ctx context.Context, identityUpdateResult *identityTypes.PublishIdentityUpdateResult) error {
Expand Down
Loading