diff --git a/poe-api/src/poe.py b/poe-api/src/poe.py index 20910cf..a8b15d9 100644 --- a/poe-api/src/poe.py +++ b/poe-api/src/poe.py @@ -16,8 +16,7 @@ def load_queries(): for path in queries_path.iterdir(): if path.suffix != ".graphql": continue - with open(path) as f: - queries[path.stem] = f.read() + queries[path.stem] = Path(path).read_text() def generate_payload(query_name, variables): return { @@ -85,33 +84,33 @@ def __init__(self, token, proxy=None): def get_next_data(self): logger.info("Downloading next_data...") - + r = request_with_retries(self.session.get, self.home_url) json_regex = r'' - json_text = re.search(json_regex, r.text).group(1) + json_text = re.search(json_regex, r.text)[1] next_data = json.loads(json_text) self.formkey = next_data["props"]["formkey"] self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"] - + return next_data def get_bots(self): viewer = self.next_data["props"]["pageProps"]["payload"]["viewer"] - if not "availableBots" in viewer: + if "availableBots" not in viewer: raise RuntimeError("Invalid token.") bot_list = viewer["availableBots"] bots = {} for bot in bot_list: url = f'https://poe.com/_next/data/{self.next_data["buildId"]}/{bot["displayName"].lower()}.json' - logger.info("Downloading "+url) - + logger.info(f"Downloading {url}") + r = request_with_retries(self.session.get, url) chat_data = r.json()["pageProps"]["payload"]["chatOfBotDisplayName"] bots[chat_data["defaultBotObject"]["nickname"]] = chat_data - + return bots def get_bot_names(self): @@ -133,20 +132,20 @@ def get_websocket_url(self, channel=None): if channel is None: channel = self.channel query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}' - return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'+query + return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates{query}' def send_query(self, query_name, variables): for i in range(20): payload = generate_payload(query_name, variables) r = request_with_retries(self.session.post, self.gql_url, json=payload, headers=self.gql_headers) data = r.json() - if data["data"] == None: + if data["data"] is None: logger.warn(f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)') time.sleep(2) continue return r.json() - + raise RuntimeError(f'{query_name} failed too many times.') def subscribe(self): @@ -213,8 +212,7 @@ def on_message(self, ws, msg): self.message_queues[key].put(message) return - #indicate that the response id is tied to the human message id - elif key != "pending" and value == None and message["state"] != "complete": + elif key != "pending" and value is None and message["state"] != "complete": self.active_messages[key] = message["messageId"] self.message_queues[key].put(message) @@ -294,7 +292,7 @@ def get_message_history(self, chatbot, count=25, cursor=None): def delete_message(self, message_ids): logger.info(f"Deleting messages: {message_ids}") - if not type(message_ids) is list: + if type(message_ids) is not list: message_ids = [int(message_ids)] result = self.send_query("DeleteMessageMutation", { @@ -303,8 +301,7 @@ def delete_message(self, message_ids): def purge_conversation(self, chatbot, count=-1): logger.info(f"Purging messages from {chatbot}") - last_messages = self.get_message_history(chatbot, count=50)[::-1] - while last_messages: + while last_messages := self.get_message_history(chatbot, count=50)[::-1]: message_ids = [] for message in last_messages: if count == 0: @@ -313,10 +310,9 @@ def purge_conversation(self, chatbot, count=-1): message_ids.append(message["node"]["messageId"]) self.delete_message(message_ids) - + if count == 0: return - last_messages = self.get_message_history(chatbot, count=50)[::-1] - logger.info(f"No more messages left to delete.") + logger.info("No more messages left to delete.") load_queries() \ No newline at end of file