Skip to content

Commit

Permalink
Fixed roomcode endpoint, added capture bot timeouts as ENV
Browse files Browse the repository at this point in the history
  • Loading branch information
denverquane committed Dec 4, 2020
1 parent 2ad58ca commit f288e19
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 64 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ storage of temporary tokens, and, crucially, communication between the Capture c
* `SHARD_ID`: Probably just use 0
* `MAX_REQ_5_SEC`: How many Discord API mute/deafens should be issued per token per 5 second window. Defaults to 7 (ratelimits
returned by Discord are anywhere from [5-10]/5sec, so 7 is a decent heuristic)
* `ACK_TIMEOUT_MS`: How many milliseconds after a Mute task is received before it times out, if no capture bot picks up the task
* `TASK_TIMEOUT_MS`: After a mute task is issued to a capture bot, how long before it times out and another method is used
95 changes: 38 additions & 57 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
)
Expand All @@ -24,6 +23,15 @@ func activeGamesCode() string {
return "automuteus:games"
}

type GameLobby struct {
LobbyCode string `json:"LobbyCode"`
Region int `json:"Region"`
}

func roomCodesForConnCodeKey(connCode string) string {
return "automuteus:roomcode:" + connCode
}

type Broker struct {
client *redis.Client

Expand Down Expand Up @@ -81,7 +89,7 @@ func (broker *Broker) TasksListener(server *socketio.Server, connectCode string,
broker.taskChannelsLock.Unlock()
taskID = task.TaskID

go broker.TaskCompletionListener(task.TaskID, taskChan, subKillChan)
go broker.TaskCompletionListener(taskID, taskChan, subKillChan)

log.Println("Broadcasting " + t.Payload + " to room " + connectCode)
server.BroadcastToRoom("/", connectCode, "modify", t.Payload)
Expand Down Expand Up @@ -187,17 +195,28 @@ func (broker *Broker) Start(port string) {

server.OnEvent("/", "lobby", func(s socketio.Conn, msg string) {
log.Println("lobby:", msg)
//TODO validation

broker.connectionsLock.RLock()
if cCode, ok := broker.connections[s.ID()]; ok {
err := PushJob(ctx, broker.client, cCode, Lobby, msg)
if err != nil {
log.Println(err)
//validation
var lobby GameLobby
err := json.Unmarshal([]byte(msg), &lobby)
if err != nil {
log.Println(err)
} else {
broker.connectionsLock.RLock()
if cCode, ok := broker.connections[s.ID()]; ok {
err := PushJob(ctx, broker.client, cCode, Lobby, msg)
if err != nil {
log.Println(err)
}
err = broker.client.Set(context.Background(), roomCodesForConnCodeKey(cCode), lobby.LobbyCode, time.Minute*15).Err()
if err != nil {
log.Println(err)
} else {
log.Printf("Updated room code %s for connect code %s in Redis", lobby.LobbyCode, cCode)
}
}
broker.connectionsLock.RUnlock()
}
broker.connectionsLock.RUnlock()

})
server.OnEvent("/", "state", func(s socketio.Conn, msg string) {
log.Println("phase received from capture: ", msg)
Expand Down Expand Up @@ -297,64 +316,26 @@ func (broker *Broker) Start(port string) {
vars := mux.Vars(r)
conncode := vars["connectCode"]

if conncode == "" || len(conncode) != 8 {
if conncode == "" || len(conncode) != ConnectCodeLength {
errorResponse(w)
return
}
cursor := uint64(0)
keys := []string{}
for len(keys) == 0 {
keys, cursor, err = broker.client.Scan(context.Background(), cursor, "automuteus:discord:*:"+conncode, 10).Result()
if cursor == 0 {
break
}
i := 0
for _, v := range keys {
if !strings.Contains(v, ":pointer:") {
keys[i] = v
i++
}
}
}

if err != nil || len(keys) == 0 {
log.Println(err)
key, err := broker.client.Get(context.Background(), roomCodesForConnCodeKey(conncode)).Result()
if err == redis.Nil {
w.WriteHeader(http.StatusNotFound)
return
}
key := keys[0]
res, err := broker.client.Get(context.Background(), key).Result()
resp := Resp{Result: key}
jbytes, err := json.Marshal(resp)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusNotFound)
return
w.WriteHeader(http.StatusInternalServerError)
} else {
jsonVars := map[string]interface{}{}
err := json.Unmarshal([]byte(res), &jsonVars)
if err != nil || jsonVars["amongUsData"] == nil {
errorResponse(w)
return
}

//this is some ugly casting
auData := jsonVars["amongUsData"].(map[string]interface{})

if auData["room"] == nil {
errorResponse(w)
return
}

r := Resp{Result: auData["room"].(string)}
jbytes, err := json.Marshal(r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
w.Write(jbytes)
}
return
w.WriteHeader(http.StatusOK)
w.Write(jbytes)
}
return
})
log.Printf("Message broker is running on port %s...\n", port)
log.Fatal(http.ListenAndServe(":"+port, router))
Expand Down
31 changes: 24 additions & 7 deletions galactus/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"time"
)

const BroadcastToClientCapturesTimeout = time.Millisecond * 500
const AckFromClientCapturesTimeout = time.Second
const DefaultBroadcastToClientBotTimeout = time.Second
const DefaultAckFromClientBotTimeout = time.Second * 2

var DefaultIdentifyThresholds = discord.IdentifyThresholds{
HardWindow: time.Hour * 24,
Expand Down Expand Up @@ -182,7 +182,7 @@ func (tokenProvider *TokenProvider) IncrAndTestGuildTokenComboLock(guildID, hash
}

usable := i < tokenProvider.maxRequests5Seconds
log.Printf("Token %s on guild %s is at count %d. Skipping: %v", hashToken, guildID, i, usable)
log.Printf("Token %s on guild %s is at count %d. Using: %v", hashToken, guildID, i, usable)

return usable
}
Expand All @@ -196,6 +196,21 @@ var UnresponsiveCaptureBlacklistDuration = time.Minute * time.Duration(1)
func (tokenProvider *TokenProvider) Run(port string) {
r := mux.NewRouter()

ackTimeoutms := DefaultAckFromClientBotTimeout
taskTimeoutms := DefaultBroadcastToClientBotTimeout

ackTimeoutmsStr := os.Getenv("ACK_TIMEOUT_MS")
num, err := strconv.ParseInt(ackTimeoutmsStr, 10, 64)
if err == nil {
ackTimeoutms = time.Millisecond * time.Duration(num)
}

taskTimeoutmsStr := os.Getenv("TASK_TIMEOUT_MS")
num, err = strconv.ParseInt(taskTimeoutmsStr, 10, 64)
if err == nil {
taskTimeoutms = time.Millisecond * time.Duration(num)
}

r.HandleFunc("/modify/{guildID}/{connectCode}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
guildID := vars["guildID"]
Expand Down Expand Up @@ -275,7 +290,7 @@ func (tokenProvider *TokenProvider) Run(port string) {
} else {
acked := make(chan bool)
pubsub := tokenProvider.client.Subscribe(context.Background(), discord.BroadcastTaskAckKey(task.TaskID))
go tokenProvider.waitForAck(pubsub, BroadcastToClientCapturesTimeout, acked)
go tokenProvider.waitForAck(pubsub, ackTimeoutms, acked)

err := tokenProvider.client.Publish(context.Background(), discord.TasksSubscribeKey(connectCode), jBytes).Err()
if err != nil {
Expand All @@ -292,7 +307,7 @@ func (tokenProvider *TokenProvider) Run(port string) {
} else {
acked := make(chan bool)
pubsub := tokenProvider.client.Subscribe(context.Background(), discord.CompleteTaskAckKey(task.TaskID))
go tokenProvider.waitForAck(pubsub, AckFromClientCapturesTimeout, acked)
go tokenProvider.waitForAck(pubsub, taskTimeoutms, acked)
res := <-acked
if res {
log.Println("Successful mute/deafen using client capture bot!")
Expand Down Expand Up @@ -419,9 +434,11 @@ func (tokenProvider *TokenProvider) waitForAck(pubsub *redis.PubSub, waitTime ti
select {
case <-t.C:
result <- false
t.Stop()
return
case t := <-pubsub.Channel():
result <- t.Payload == "true"
case val := <-pubsub.Channel():
result <- val.Payload == "true"
t.Stop()
return
}
}
Expand Down

0 comments on commit f288e19

Please sign in to comment.