-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sourcery refactored main branch #1
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,7 @@ def load_queries(): | |
for path in queries_path.iterdir(): | ||
if path.suffix != ".graphql": | ||
continue | ||
with open(path) as f: | ||
queries[path.stem] = f.read() | ||
queries[path.stem] = Path(path).read_text() | ||
|
||
def generate_payload(query_name, variables): | ||
return { | ||
|
@@ -85,33 +84,33 @@ def __init__(self, token, proxy=None): | |
|
||
def get_next_data(self): | ||
logger.info("Downloading next_data...") | ||
|
||
r = request_with_retries(self.session.get, self.home_url) | ||
json_regex = r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>' | ||
json_text = re.search(json_regex, r.text).group(1) | ||
json_text = re.search(json_regex, r.text)[1] | ||
next_data = json.loads(json_text) | ||
|
||
self.formkey = next_data["props"]["formkey"] | ||
self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"] | ||
|
||
Comment on lines
-88
to
+95
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
return next_data | ||
|
||
def get_bots(self): | ||
viewer = self.next_data["props"]["pageProps"]["payload"]["viewer"] | ||
if not "availableBots" in viewer: | ||
if "availableBots" not in viewer: | ||
raise RuntimeError("Invalid token.") | ||
bot_list = viewer["availableBots"] | ||
|
||
bots = {} | ||
for bot in bot_list: | ||
url = f'https://poe.com/_next/data/{self.next_data["buildId"]}/{bot["displayName"].lower()}.json' | ||
logger.info("Downloading "+url) | ||
logger.info(f"Downloading {url}") | ||
|
||
r = request_with_retries(self.session.get, url) | ||
|
||
chat_data = r.json()["pageProps"]["payload"]["chatOfBotDisplayName"] | ||
bots[chat_data["defaultBotObject"]["nickname"]] = chat_data | ||
|
||
Comment on lines
-101
to
+113
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
return bots | ||
|
||
def get_bot_names(self): | ||
|
@@ -133,20 +132,20 @@ def get_websocket_url(self, channel=None): | |
if channel is None: | ||
channel = self.channel | ||
query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}' | ||
return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'+query | ||
return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates{query}' | ||
Comment on lines
-136
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def send_query(self, query_name, variables): | ||
for i in range(20): | ||
payload = generate_payload(query_name, variables) | ||
r = request_with_retries(self.session.post, self.gql_url, json=payload, headers=self.gql_headers) | ||
data = r.json() | ||
if data["data"] == None: | ||
if data["data"] is None: | ||
logger.warn(f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)') | ||
time.sleep(2) | ||
continue | ||
|
||
return r.json() | ||
|
||
Comment on lines
-143
to
+148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
raise RuntimeError(f'{query_name} failed too many times.') | ||
|
||
def subscribe(self): | ||
|
@@ -213,8 +212,7 @@ def on_message(self, ws, msg): | |
self.message_queues[key].put(message) | ||
return | ||
|
||
#indicate that the response id is tied to the human message id | ||
elif key != "pending" and value == None and message["state"] != "complete": | ||
elif key != "pending" and value is None and message["state"] != "complete": | ||
Comment on lines
-216
to
+215
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
This removes the following comments ( why? ):
|
||
self.active_messages[key] = message["messageId"] | ||
self.message_queues[key].put(message) | ||
|
||
|
@@ -294,7 +292,7 @@ def get_message_history(self, chatbot, count=25, cursor=None): | |
|
||
def delete_message(self, message_ids): | ||
logger.info(f"Deleting messages: {message_ids}") | ||
if not type(message_ids) is list: | ||
if type(message_ids) is not list: | ||
Comment on lines
-297
to
+295
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
message_ids = [int(message_ids)] | ||
|
||
result = self.send_query("DeleteMessageMutation", { | ||
|
@@ -303,8 +301,7 @@ def delete_message(self, message_ids): | |
|
||
def purge_conversation(self, chatbot, count=-1): | ||
logger.info(f"Purging messages from {chatbot}") | ||
last_messages = self.get_message_history(chatbot, count=50)[::-1] | ||
while last_messages: | ||
while last_messages := self.get_message_history(chatbot, count=50)[::-1]: | ||
Comment on lines
-306
to
+304
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
message_ids = [] | ||
for message in last_messages: | ||
if count == 0: | ||
|
@@ -313,10 +310,9 @@ def purge_conversation(self, chatbot, count=-1): | |
message_ids.append(message["node"]["messageId"]) | ||
|
||
self.delete_message(message_ids) | ||
|
||
if count == 0: | ||
return | ||
last_messages = self.get_message_history(chatbot, count=50)[::-1] | ||
logger.info(f"No more messages left to delete.") | ||
logger.info("No more messages left to delete.") | ||
|
||
load_queries() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function
load_queries
refactored with the following changes:pathlib
(path-read
)