Skip to content

Commit

Permalink
Merge pull request #14 from automuteus/token-migrate
Browse files Browse the repository at this point in the history
Token migration and worker validation
  • Loading branch information
denverquane authored Mar 30, 2022
2 parents 197eb80 + bdd8272 commit 130aecb
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 142 deletions.
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
FROM golang:1.17-alpine AS builder
FROM --platform=${BUILDPLATFORM} golang:1.18-alpine AS builder
ARG TARGETOS
ARG TARGETARCH

# Git is required for getting the dependencies.
# hadolint ignore=DL3018
Expand All @@ -19,6 +21,8 @@ COPY ./ ./
RUN export TAG=$(git describe --tags "$(git rev-list --tags --max-count=1)") && \
export COMMIT=$(git rev-parse --short HEAD) && \
CGO_ENABLED=0 \
GOOS=${TARGETOS} \
GOARCH=${TARGETARCH} \
go build -installsuffix 'static' \
-ldflags="-X main.version=${TAG} -X main.commit=${COMMIT}" \
-o /app .
Expand Down
16 changes: 11 additions & 5 deletions galactus/modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,31 @@ import (
"time"
)

func (tokenProvider *TokenProvider) attemptOnSecondaryTokens(guildID, userID string, tokens []string, limit int, request task.UserModify) bool {
if tokens != nil && limit > 0 {
sess, hToken := tokenProvider.getAnySession(guildID, tokens, limit)
func (tokenProvider *TokenProvider) attemptOnSecondaryTokens(guildID, userID string, tokenSubset map[string]struct{}, request task.UserModify) string {
if len(tokenProvider.activeSessions) > 0 {
sess, hToken := tokenProvider.getSession(guildID, tokenSubset)
if sess != nil {
err := task.ApplyMuteDeaf(sess, guildID, userID, request.Mute, request.Deaf)
if err != nil {
log.Println("Failed to apply mute to player with error:")
log.Println(err)

// don't attempt this token for this guild for another 5 minutes
err = tokenProvider.BlacklistTokenForDuration(guildID, hToken, UnresponsiveCaptureBlacklistDuration)
if err != nil {
log.Println(err)
}
} else {
log.Printf("Successfully applied mute=%v, deaf=%v to User %d using secondary bot: %s\n", request.Mute, request.Deaf, request.UserID, hToken)
return true
return hToken
}
} else {
log.Println("No secondary bot tokens found. Trying other methods")
}
} else {
log.Println("Guild has no access to secondary bot tokens; skipping")
}
return false
return ""
}

func (tokenProvider *TokenProvider) attemptOnCaptureBot(guildID, connectCode string, gid uint64, timeout time.Duration, request task.UserModify) bool {
Expand Down
204 changes: 92 additions & 112 deletions galactus/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"github.com/automuteus/utils/pkg/premium"
"github.com/automuteus/utils/pkg/rediskey"
"github.com/automuteus/utils/pkg/task"
"github.com/automuteus/utils/pkg/token"
"github.com/bwmarrin/discordgo"
"github.com/go-redis/redis/v8"
"github.com/gorilla/mux"
"golang.org/x/exp/constraints"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
)
Expand All @@ -34,8 +33,6 @@ var PremiumBotConstraints = map[premium.Tier]int{

const DefaultCaptureBotTimeout = time.Second

var ctx = context.Background()

type TokenProvider struct {
client *redis.Client
primarySession *discordgo.Session
Expand All @@ -44,6 +41,8 @@ type TokenProvider struct {
activeSessions map[string]*discordgo.Session
maxRequests5Seconds int64
sessionLock sync.RWMutex

botVerificationQueue chan botVerifyTask
}

func NewTokenProvider(botToken, redisAddr, redisUser, redisPass string, maxReq int64) *TokenProvider {
Expand Down Expand Up @@ -79,26 +78,41 @@ func NewTokenProvider(botToken, redisAddr, redisUser, redisPass string, maxReq i
}

return &TokenProvider{
client: rdb,
primarySession: dg,
activeSessions: make(map[string]*discordgo.Session),
maxRequests5Seconds: maxReq,
sessionLock: sync.RWMutex{},
client: rdb,
primarySession: dg,
activeSessions: make(map[string]*discordgo.Session),
maxRequests5Seconds: maxReq,
sessionLock: sync.RWMutex{},
botVerificationQueue: make(chan botVerifyTask),
}
}

func (tokenProvider *TokenProvider) BotVerificationWorker() {
log.Println("Premium bot verification worker started")
for {
verifyTask := <-tokenProvider.botVerificationQueue

if tokenProvider.canRunBotVerification(verifyTask.guildID) {
// always send nil tokens used; we can't populate this info from anywhere anyways
tokenProvider.verifyBotMembership(verifyTask.guildID, verifyTask.limit, nil)

err := tokenProvider.markBotVerificationLockout(verifyTask.guildID)
if err != nil {
log.Println(err)
}

// cheap ratelimiting; only process verifications once per second
time.Sleep(time.Second)
}
}
}

func rateLimitEventCallback(sess *discordgo.Session, rl *discordgo.RateLimit) {
log.Println(rl.Message)
}

func (tokenProvider *TokenProvider) PopulateAndStartSessions() {
keys, err := tokenProvider.client.HGetAll(ctx, rediskey.AllTokensHSet).Result()
if err != nil {
log.Println(err)
return
}

for _, v := range keys {
func (tokenProvider *TokenProvider) PopulateAndStartSessions(tokens []string) {
for _, v := range tokens {
tokenProvider.openAndStartSessionWithToken(v)
}
}
Expand All @@ -123,53 +137,48 @@ func (tokenProvider *TokenProvider) openAndStartSessionWithToken(botToken string
return false
}
// associates the guilds with this token to be used for requests
sess.AddHandler(tokenProvider.newGuild(k))
sess.AddHandler(tokenProvider.newGuild)
log.Println("Opened session on startup for " + k)
tokenProvider.activeSessions[k] = sess
return true
}
return false
}

func (tokenProvider *TokenProvider) getAllTokensForGuild(guildID string) []string {
hTokens, err := tokenProvider.client.SMembers(context.Background(), rediskey.GuildTokensKey(guildID)).Result()
if err != nil {
return nil
}
return hTokens
}

func (tokenProvider *TokenProvider) getAnySession(guildID string, tokens []string, limit int) (*discordgo.Session, string) {
func (tokenProvider *TokenProvider) getSession(guildID string, hTokenSubset map[string]struct{}) (*discordgo.Session, string) {
tokenProvider.sessionLock.RLock()
defer tokenProvider.sessionLock.RUnlock()

for i, hToken := range tokens {
if i == limit {
return nil, ""
}
// if this token isn't potentially rate-limited
if tokenProvider.IncrAndTestGuildTokenComboLock(guildID, hToken) {
sess, ok := tokenProvider.activeSessions[hToken]
if ok {
for hToken, sess := range tokenProvider.activeSessions {
// if we have already used this token successfully, or haven't set any restrictions
if hTokenSubset == nil || mapHasEntry(hTokenSubset, hToken) {
// if this token isn't potentially rate-limited
if tokenProvider.IncrAndTestGuildTokenComboLock(guildID, hToken) {
return sess, hToken
} else {
log.Println("Secondary token is potentially rate-limited. Skipping")
}
// remove this key from our records and keep going
tokenProvider.client.SRem(context.Background(), rediskey.GuildTokensKey(guildID), hToken)
} else {
log.Println("Secondary token is potentially rate-limited. Skipping")
}
}

return nil, ""
}

func mapHasEntry[T constraints.Ordered, K any](dict map[T]K, key T) bool {
if dict == nil {
return false
}
_, ok := dict[key]
return ok
}

func (tokenProvider *TokenProvider) IncrAndTestGuildTokenComboLock(guildID, hashToken string) bool {
i, err := tokenProvider.client.Incr(context.Background(), rediskey.GuildTokenLock(guildID, hashToken)).Result()
if err != nil {
log.Println(err)
}
usable := i < tokenProvider.maxRequests5Seconds
log.Printf("Token %s on guild %s is at count %d. Using: %v", hashToken, guildID, i, usable)
log.Printf("Token/capture %s on guild %s is at count %d. Using?: %v", hashToken, guildID, i, usable)
if !usable {
return false
}
Expand All @@ -182,6 +191,11 @@ func (tokenProvider *TokenProvider) IncrAndTestGuildTokenComboLock(guildID, hash
return true
}

// BlacklistTokenForDuration sets a guild token (or connect code ala capture bot) to the maximum value allowed before
// attempting other non-rate-limited mute/deafen methods.
// NOTE: this will manifest as the capture/token in question appearing like it "has been used <maxnum> times" in logs,
// even if this is not technically accurate. A more accurate approach would probably use a totally separate Redis key,
// as opposed to this approach, which simply uses the ratelimiting counter key(s) to achieve blacklisting
func (tokenProvider *TokenProvider) BlacklistTokenForDuration(guildID, hashToken string, duration time.Duration) error {
return tokenProvider.client.Set(context.Background(), rediskey.GuildTokenLock(guildID, hashToken), tokenProvider.maxRequests5Seconds, duration).Err()
}
Expand Down Expand Up @@ -240,7 +254,6 @@ func (tokenProvider *TokenProvider) Run(port string) {
}

limit := PremiumBotConstraints[userModifications.Premium]
tokens := tokenProvider.getAllTokensForGuild(guildID)

tasksChannel := make(chan task.UserModify, len(userModifications.Users))
wg := sync.WaitGroup{}
Expand All @@ -251,21 +264,37 @@ func (tokenProvider *TokenProvider) Run(port string) {
Official: 0,
RateLimit: 0,
}
uniqueTokensUsed := make(map[string]struct{})
lock := sync.Mutex{}
tokenLock := sync.RWMutex{}

errors := 0
// start a handful of workers to handle the tasks
for i := 0; i < maxWorkers; i++ {
go func() {
for request := range tasksChannel {
userIDStr := strconv.FormatUint(request.UserID, 10)
success := tokenProvider.attemptOnSecondaryTokens(guildID, userIDStr, tokens, limit, request)
if success {
hToken := ""
if limit > 0 {
tokenLock.RLock()
if len(uniqueTokensUsed) >= limit {
hToken = tokenProvider.attemptOnSecondaryTokens(guildID, userIDStr, uniqueTokensUsed, request)
tokenLock.RUnlock()
} else {
tokenLock.RUnlock()
hToken = tokenProvider.attemptOnSecondaryTokens(guildID, userIDStr, nil, request)
}
}
if hToken != "" {
lock.Lock()
mdsc.Worker++
lock.Unlock()

tokenLock.Lock()
uniqueTokensUsed[hToken] = struct{}{}
tokenLock.Unlock()
} else {
success = tokenProvider.attemptOnCaptureBot(guildID, connectCode, gid, taskTimeoutms, request)
success := tokenProvider.attemptOnCaptureBot(guildID, connectCode, gid, taskTimeoutms, request)
if success {
lock.Lock()
mdsc.Capture++
Expand Down Expand Up @@ -313,66 +342,31 @@ func (tokenProvider *TokenProvider) Run(port string) {
log.Println(err)
}
}

// note, this should probably be more systematic on startup, not when a mute/deafen task comes in. But this is a
// context in which we already have the guildID, successful tokens, AND the premium limit...
go tokenProvider.verifyBotMembership(guildID, limit, uniqueTokensUsed)
}).Methods("POST")

r.HandleFunc("/addtoken", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
r.HandleFunc("/verify/{guildID}/{premiumTier}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
guildID := vars["guildID"]
tierStr := vars["premiumTier"]
_, gerr := strconv.ParseUint(guildID, 10, 64)
if gerr != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
w.Write([]byte("Invalid guildID (non-numeric) received. Query should be of the form POST `/verify/<guildID>/<premiumTier>`"))
return
}
defer r.Body.Close()

botToken := string(body)
log.Println(botToken)

k := hashToken(botToken)
log.Println(k)
tokenProvider.sessionLock.RLock()
if _, ok := tokenProvider.activeSessions[k]; ok {
log.Println("Token already exists on the server")
w.WriteHeader(http.StatusAlreadyReported)
w.Write([]byte("Token already exists on the server"))
tokenProvider.sessionLock.RUnlock()
return
}
tokenProvider.sessionLock.RUnlock()

token.WaitForToken(tokenProvider.client, botToken)
token.LockForToken(tokenProvider.client, botToken)
sess, err := discordgo.New("Bot " + botToken)
if err != nil {
tier, perr := strconv.ParseUint(tierStr, 10, 64)
if perr != nil || tier < 0 || tier > 5 {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
w.Write([]byte("Invalid premium tier (not [0,5]) received. Query should be of the form POST `/verify/<guildID>/<premiumTier>`"))
return
}
sess.AddHandler(tokenProvider.newGuild(k))
err = sess.Open()
if err != nil {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(err.Error()))
return
}

tokenProvider.sessionLock.Lock()
tokenProvider.activeSessions[k] = sess
tokenProvider.sessionLock.Unlock()

err = tokenProvider.client.HSet(ctx, rediskey.AllTokensHSet, k, botToken).Err()
if err != nil {
log.Println(err)
}

for _, v := range sess.State.Guilds {
err := tokenProvider.client.SAdd(ctx, rediskey.GuildTokensKey(v.ID), k).Err()
if !errors.Is(err, redis.Nil) && err != nil {
log.Println(strings.ReplaceAll(err.Error(), botToken, "<redacted>"))
} else {
log.Println("Added token for guild " + v.ID)
}
}
limit := PremiumBotConstraints[premium.Tier(tier)]
tokenProvider.enqueueBotMembershipVerifyTask(guildID, limit)
w.WriteHeader(http.StatusOK)
}).Methods("POST")

r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -424,20 +418,6 @@ func (tokenProvider *TokenProvider) Close() {
tokenProvider.primarySession.Close()
}

func (tokenProvider *TokenProvider) newGuild(hashedToken string) func(s *discordgo.Session, m *discordgo.GuildCreate) {
return func(s *discordgo.Session, m *discordgo.GuildCreate) {
tokenProvider.sessionLock.RLock()
for test := range tokenProvider.activeSessions {
if hashedToken == test {
err := tokenProvider.client.SAdd(ctx, rediskey.GuildTokensKey(m.Guild.ID), hashedToken)
if err != nil {
log.Println(err)
} else {
log.Println("Token added for running guild " + m.Guild.ID)
}
}
}

tokenProvider.sessionLock.RUnlock()
}
func (tokenProvider *TokenProvider) newGuild(s *discordgo.Session, m *discordgo.GuildCreate) {
log.Println("added to " + m.ID)
}
Loading

0 comments on commit 130aecb

Please sign in to comment.