-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathduneapi.py
107 lines (83 loc) · 3.86 KB
/
duneapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from typing import Dict, List
import os
import sys
import time
import httpx
"""
This is an example showcase of using the Dune API to execute easily a known Query ID
This code is not meant for production use, for example, very basic or missing error handling.
example of usage:
$ DUNE_API_KEY='your-key' python duneapi.py 1105134
"""
URL = "https://api.dune.com/api"
class DuneAPI(object):
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"X-DUNE-API-KEY": api_key}
self.client = httpx.Client(headers=self.headers)
def execute_query(self, query_id: int, query_params: Dict = {}) -> Dict:
body = None
if query_params:
body = { "query_parameters": query_params }
resp = self.client.post(f"{URL}/v1/query/{query_id}/execute", json=body)
assert resp.is_success
return resp.json()
def get_execution_status(self, execution_id: str) -> Dict:
resp = self.client.get(f"{URL}/v1/execution/{execution_id}/status")
assert resp.is_success, resp.text
return resp.json()
def get_execution_result(self, execution_id: str) -> Dict:
resp = self.client.get(f"{URL}/v1/execution/{execution_id}/results")
assert resp.is_success
return resp.json()
def cancel_execution(self, execution_id: str) -> Dict:
resp = self.client.delete(f"{URL}/v1/execution/{execution_id}")
assert resp.is_success
return resp.json()
def wait_for_execution_end(self, execution_id: str, poll_interval_secs=5.0, max_wait_secs=1800) -> Dict:
maxTime = time.time() + max_wait_secs
while time.time() < maxTime:
# here we wait for the execution to complete, cancel or fail
# these are the 3 terminal states of an execution
terminal_states = (
"QUERY_STATE_COMPLETED",
"QUERY_STATE_FAILED",
"QUERY_STATE_CANCELLED",
)
status = self.get_execution_status(execution_id)
state = status["state"]
if state in terminal_states:
if state == "QUERY_STATE_COMPLETED":
row_count = status["result_metadata"]["total_row_count"]
print(f"execution_id: {execution_id} completed, rowCount:{row_count}")
else:
print(f"execution_id: {execution_id} ended with state: {state} :-(")
return status
print(f"execution_id: {execution_id} not done yet, state: {state}, sleeping {poll_interval_secs} secs")
time.sleep(poll_interval_secs)
raise Exception(
f"wait_for_execution_end() expired, waited for: {max_wait_secs} seconds, execution_id: {execution_id}"
)
def execute_query_and_get_results(query_id: int, api_key: str) -> List[Dict]:
dune = DuneAPI(api_key)
print(f"Requesting new execution of Query: {query_id}")
resp = dune.execute_query(query_id)
execution_id = resp['execution_id']
status = dune.wait_for_execution_end(execution_id)
print(f"QueryID: {query_id}, finished, status: {status}")
resp = dune.get_execution_result(execution_id)
if resp["state"] == "QUERY_STATE_COMPLETED":
rows = resp["result"]["rows"]
metadata = resp["result"]["metadata"]
print(f"QueryID: {query_id}, result metadata: {metadata}")
assert len(rows) == metadata['total_row_count']
return rows
else:
error = resp["error"]
print(f"QueryID: {query_id}, execution_id: {execution_id} full response: {resp.json()}")
raise Exception(f"QueryID: {query_id}, execution_id: {execution_id} result error: {error}")
if __name__ == "__main__":
api_key = os.getenv("DUNE_API_KEY")
query_id = sys.argv[1]
rows = execute_query_and_get_results(query_id, api_key)
print(f"row 0:\n\t {rows[0]}\nlast row:\n\t {rows[-1]}")