-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy path2b-fetch-first-ip-address-response.py
42 lines (29 loc) · 1.08 KB
/
2b-fetch-first-ip-address-response.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'query')
)
async def aiohttp_get_json(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
json_response = await aiohttp_get_json(service.url)
ip = json_response[service.ip_attr]
return '{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start)
async def main():
futures = [fetch_ip(service) for service in SERVICES]
done, pending = await asyncio.wait(
futures, return_when=FIRST_COMPLETED)
print(done.pop().result())
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(main())
ioloop.close()