diff --git a/README.md b/README.md index a5b769f..48c2fe9 100644 --- a/README.md +++ b/README.md @@ -35,3 +35,5 @@ storage of temporary tokens, and, crucially, communication between the Capture c * `SHARD_ID`: Probably just use 0 * `MAX_REQ_5_SEC`: How many Discord API mute/deafens should be issued per token per 5 second window. Defaults to 7 (ratelimits returned by Discord are anywhere from [5-10]/5sec, so 7 is a decent heuristic) +* `ACK_TIMEOUT_MS`: How many milliseconds after a Mute task is received before it times out, if no capture bot picks up the task +* `TASK_TIMEOUT_MS`: After a mute task is issued to a capture bot, how long before it times out and another method is used diff --git a/broker/broker.go b/broker/broker.go index 39252e9..ff75119 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -11,7 +11,6 @@ import ( "log" "net/http" "strconv" - "strings" "sync" "time" ) @@ -24,6 +23,15 @@ func activeGamesCode() string { return "automuteus:games" } +type GameLobby struct { + LobbyCode string `json:"LobbyCode"` + Region int `json:"Region"` +} + +func roomCodesForConnCodeKey(connCode string) string { + return "automuteus:roomcode:" + connCode +} + type Broker struct { client *redis.Client @@ -81,7 +89,7 @@ func (broker *Broker) TasksListener(server *socketio.Server, connectCode string, broker.taskChannelsLock.Unlock() taskID = task.TaskID - go broker.TaskCompletionListener(task.TaskID, taskChan, subKillChan) + go broker.TaskCompletionListener(taskID, taskChan, subKillChan) log.Println("Broadcasting " + t.Payload + " to room " + connectCode) server.BroadcastToRoom("/", connectCode, "modify", t.Payload) @@ -187,17 +195,28 @@ func (broker *Broker) Start(port string) { server.OnEvent("/", "lobby", func(s socketio.Conn, msg string) { log.Println("lobby:", msg) - //TODO validation - broker.connectionsLock.RLock() - if cCode, ok := broker.connections[s.ID()]; ok { - err := PushJob(ctx, broker.client, cCode, Lobby, msg) - if err != nil { - log.Println(err) + //validation + var lobby GameLobby + err := json.Unmarshal([]byte(msg), &lobby) + if err != nil { + log.Println(err) + } else { + broker.connectionsLock.RLock() + if cCode, ok := broker.connections[s.ID()]; ok { + err := PushJob(ctx, broker.client, cCode, Lobby, msg) + if err != nil { + log.Println(err) + } + err = broker.client.Set(context.Background(), roomCodesForConnCodeKey(cCode), lobby.LobbyCode, time.Minute*15).Err() + if err != nil { + log.Println(err) + } else { + log.Printf("Updated room code %s for connect code %s in Redis", lobby.LobbyCode, cCode) + } } + broker.connectionsLock.RUnlock() } - broker.connectionsLock.RUnlock() - }) server.OnEvent("/", "state", func(s socketio.Conn, msg string) { log.Println("phase received from capture: ", msg) @@ -297,64 +316,26 @@ func (broker *Broker) Start(port string) { vars := mux.Vars(r) conncode := vars["connectCode"] - if conncode == "" || len(conncode) != 8 { + if conncode == "" || len(conncode) != ConnectCodeLength { errorResponse(w) return } - cursor := uint64(0) - keys := []string{} - for len(keys) == 0 { - keys, cursor, err = broker.client.Scan(context.Background(), cursor, "automuteus:discord:*:"+conncode, 10).Result() - if cursor == 0 { - break - } - i := 0 - for _, v := range keys { - if !strings.Contains(v, ":pointer:") { - keys[i] = v - i++ - } - } - } - if err != nil || len(keys) == 0 { - log.Println(err) + key, err := broker.client.Get(context.Background(), roomCodesForConnCodeKey(conncode)).Result() + if err == redis.Nil { w.WriteHeader(http.StatusNotFound) return } - key := keys[0] - res, err := broker.client.Get(context.Background(), key).Result() + resp := Resp{Result: key} + jbytes, err := json.Marshal(resp) if err != nil { log.Println(err) - w.WriteHeader(http.StatusNotFound) - return + w.WriteHeader(http.StatusInternalServerError) } else { - jsonVars := map[string]interface{}{} - err := json.Unmarshal([]byte(res), &jsonVars) - if err != nil || jsonVars["amongUsData"] == nil { - errorResponse(w) - return - } - - //this is some ugly casting - auData := jsonVars["amongUsData"].(map[string]interface{}) - - if auData["room"] == nil { - errorResponse(w) - return - } - - r := Resp{Result: auData["room"].(string)} - jbytes, err := json.Marshal(r) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - w.Write(jbytes) - } - return + w.WriteHeader(http.StatusOK) + w.Write(jbytes) } + return }) log.Printf("Message broker is running on port %s...\n", port) log.Fatal(http.ListenAndServe(":"+port, router)) diff --git a/galactus/server.go b/galactus/server.go index 961b9b2..3a0d22b 100644 --- a/galactus/server.go +++ b/galactus/server.go @@ -19,8 +19,8 @@ import ( "time" ) -const BroadcastToClientCapturesTimeout = time.Millisecond * 500 -const AckFromClientCapturesTimeout = time.Second +const DefaultBroadcastToClientBotTimeout = time.Second +const DefaultAckFromClientBotTimeout = time.Second * 2 var DefaultIdentifyThresholds = discord.IdentifyThresholds{ HardWindow: time.Hour * 24, @@ -182,7 +182,7 @@ func (tokenProvider *TokenProvider) IncrAndTestGuildTokenComboLock(guildID, hash } usable := i < tokenProvider.maxRequests5Seconds - log.Printf("Token %s on guild %s is at count %d. Skipping: %v", hashToken, guildID, i, usable) + log.Printf("Token %s on guild %s is at count %d. Using: %v", hashToken, guildID, i, usable) return usable } @@ -196,6 +196,21 @@ var UnresponsiveCaptureBlacklistDuration = time.Minute * time.Duration(1) func (tokenProvider *TokenProvider) Run(port string) { r := mux.NewRouter() + ackTimeoutms := DefaultAckFromClientBotTimeout + taskTimeoutms := DefaultBroadcastToClientBotTimeout + + ackTimeoutmsStr := os.Getenv("ACK_TIMEOUT_MS") + num, err := strconv.ParseInt(ackTimeoutmsStr, 10, 64) + if err == nil { + ackTimeoutms = time.Millisecond * time.Duration(num) + } + + taskTimeoutmsStr := os.Getenv("TASK_TIMEOUT_MS") + num, err = strconv.ParseInt(taskTimeoutmsStr, 10, 64) + if err == nil { + taskTimeoutms = time.Millisecond * time.Duration(num) + } + r.HandleFunc("/modify/{guildID}/{connectCode}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) guildID := vars["guildID"] @@ -275,7 +290,7 @@ func (tokenProvider *TokenProvider) Run(port string) { } else { acked := make(chan bool) pubsub := tokenProvider.client.Subscribe(context.Background(), discord.BroadcastTaskAckKey(task.TaskID)) - go tokenProvider.waitForAck(pubsub, BroadcastToClientCapturesTimeout, acked) + go tokenProvider.waitForAck(pubsub, ackTimeoutms, acked) err := tokenProvider.client.Publish(context.Background(), discord.TasksSubscribeKey(connectCode), jBytes).Err() if err != nil { @@ -292,7 +307,7 @@ func (tokenProvider *TokenProvider) Run(port string) { } else { acked := make(chan bool) pubsub := tokenProvider.client.Subscribe(context.Background(), discord.CompleteTaskAckKey(task.TaskID)) - go tokenProvider.waitForAck(pubsub, AckFromClientCapturesTimeout, acked) + go tokenProvider.waitForAck(pubsub, taskTimeoutms, acked) res := <-acked if res { log.Println("Successful mute/deafen using client capture bot!") @@ -419,9 +434,11 @@ func (tokenProvider *TokenProvider) waitForAck(pubsub *redis.PubSub, waitTime ti select { case <-t.C: result <- false + t.Stop() return - case t := <-pubsub.Channel(): - result <- t.Payload == "true" + case val := <-pubsub.Channel(): + result <- val.Payload == "true" + t.Stop() return } }