diff --git a/docs/index.rst b/docs/index.rst index 80ad94c8d..a4710c253 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,6 +23,7 @@ Usage asyncio iproute ndb + plan9 wiset ipset netns diff --git a/docs/plan9.rst b/docs/plan9.rst new file mode 100644 index 000000000..6db37e454 --- /dev/null +++ b/docs/plan9.rst @@ -0,0 +1,20 @@ +.. _plan9: + +.. testsetup:: + + import asyncio + + from pyroute2.plan9.server import Plan9ServerSocket + from pyroute2.plan9.client import Plan9ClientSocket + + +Plan9 9p2000 protocol +===================== + +The library provides basic asynchronous 9p2000 implementation. + +.. autoclass:: pyroute2.plan9.server.Plan9ServerSocket + :members: + +.. autoclass:: pyroute2.plan9.client.Plan9ClientSocket + :members: diff --git a/pyroute2/__init__.py b/pyroute2/__init__.py index c9a74ce3f..31a85ddec 100644 --- a/pyroute2/__init__.py +++ b/pyroute2/__init__.py @@ -54,6 +54,8 @@ from pyroute2.netlink.taskstats import TaskStats from pyroute2.netlink.uevent import UeventSocket from pyroute2.nslink.nspopen import NSPopen +from pyroute2.plan9.client import Plan9ClientSocket +from pyroute2.plan9.server import Plan9ServerSocket from pyroute2.wiset import WiSet modules = [ @@ -91,6 +93,8 @@ NFTSocket, NL80211, NSPopen, + Plan9ClientSocket, + Plan9ServerSocket, ProcEventSocket, RawIPRoute, Server, diff --git a/pyroute2/plan9/__init__.py b/pyroute2/plan9/__init__.py index 25f516953..aa16f88b2 100644 --- a/pyroute2/plan9/__init__.py +++ b/pyroute2/plan9/__init__.py @@ -417,6 +417,8 @@ def parse(self, data, seq=None, callback=None, skip_alien_seq=False): spec = json.loads(msg['ename']) if spec['class'] in dir(builtins): cls = getattr(builtins, spec['class']) + elif spec['class'] == 'Plan9Exit': + cls = Plan9Exit else: cls = Exception if not spec.get('argv'): diff --git a/pyroute2/plan9/client.py b/pyroute2/plan9/client.py index 28f25feee..f79a58916 100644 --- a/pyroute2/plan9/client.py +++ b/pyroute2/plan9/client.py @@ -21,6 +21,11 @@ class Plan9ClientSocket(AsyncCoreSocket): + '''9p2000 client. + + * address -- `('address', port)` to listen on + * use_socket -- alternatively, provide a connected SOCK_STRAM socket + ''' def __init__(self, address=None, use_socket=None): self.spec = CoreSocketSpec( { @@ -60,6 +65,10 @@ async def setup_endpoint(self, loop=None): ) async def start_session(self): + '''Initiate 9p2000 session. + + One must await this routine before running any other requests. + ''' await self.ensure_socket() await self.version() await self.auth() @@ -80,6 +89,8 @@ async def request(self, msg, tag=0): self.addr_pool.free(tag, ban=0xFF) async def version(self): + '''`Tverion` request. No arguments required. + ''' m = msg_tversion() m['header']['tag'] = 0xFFFF m['msize'] = 8192 @@ -89,15 +100,26 @@ async def version(self): async def auth(self): pass - async def attach(self): + async def attach(self, aname=''): + '''`Tattach` request. + + * `aname` (optional) -- aname to attach to + ''' m = msg_tattach() m['fid'] = 0 m['afid'] = 0xFFFFFFFF m['uname'] = pwd.getpwuid(os.getuid()).pw_name - m['aname'] = '' + m['aname'] = aname return await self.request(m) async def walk(self, path, newfid=None, fid=None): + '''`Twalk` request. + + * `path` -- string path to the file + * `newfid` (optional) -- use this fid to store the info + * `fid` (optional) -- use this fid to walk from, otherwise walk + from the current directory for this client session + ''' m = msg_twalk() m['fid'] = self.cwd if fid is None else fid m['newfid'] = newfid if newfid is not None else self.fid_pool.alloc() @@ -106,20 +128,36 @@ async def walk(self, path, newfid=None, fid=None): return await self.request(m) async def fid(self, path): + '''Walk the path and return `fid` to the required file. + + * `path` -- string path to the file + ''' if path not in self.wnames: newfid = self.fid_pool.alloc() await self.walk(path, newfid) self.wnames[path] = newfid return self.wnames[path] - async def read(self, fid): + async def read(self, fid, offset=0, count=8192): + '''`Tread` request. + + * `fid` -- fid of the file to read from + * `offset` (optional, default 0) -- read offset + * `count` (optional, default 8192) -- read count + ''' m = msg_tread() m['fid'] = fid - m['offset'] = 0 - m['count'] = 8192 + m['offset'] = offset + m['count'] = count return await self.request(m) - async def write(self, fid, data): + async def write(self, fid, data, offset=0): + '''`Twrite` request. + + * `fid` -- fid of the file to write to + * `data` -- bytes to write + * `offset` (optional, default 0) -- write offset + ''' m = msg_twrite() m['fid'] = fid m['offset'] = 0 @@ -127,10 +165,26 @@ async def write(self, fid, data): return await self.request(m) async def call( - self, fid, fname='', argv=None, kwarg=None, data=b'', data_arg='data' + self, + fid, + argv=None, + kwarg=None, + data=b'', + data_arg='data', + loader=json.loads, ): + '''`Tcall` request. + + * `fid` -- fid of the file that represents a registered function + * `argv` (optional) -- positional arguments as an iterable + * `kwarg` (optional) -- keyword arguments as a dictionary + * `data` (opional) -- optional binary data + * `data_arg` (optional) -- name of the argument to use with + the binary data + * `loader` (optional, default `json.loads`) -- loader for the + response data + ''' spec = { - 'call': fname, 'argv': argv if argv is not None else [], 'kwarg': kwarg if kwarg is not None else {}, 'data_arg': data_arg, @@ -139,4 +193,5 @@ async def call( m['fid'] = fid m['text'] = json.dumps(spec) m['data'] = data - return await self.request(m) + response = await self.request(m) + return loader(response['data']) diff --git a/pyroute2/plan9/filesystem.py b/pyroute2/plan9/filesystem.py index 2998d56a3..2cfc688d9 100644 --- a/pyroute2/plan9/filesystem.py +++ b/pyroute2/plan9/filesystem.py @@ -35,7 +35,7 @@ def _publish_function_r( inode.data.write(dumper(ret)) inode.data.seek(request['offset']) - response['data'] = dumper(inode.data.read(request['count'])) + response['data'] = inode.data.read(request['count']) return response @@ -120,7 +120,7 @@ def get_child(self, name): return child raise KeyError('file not found') - def publish_function( + def register_function( self, func, loader=json.loads, diff --git a/pyroute2/plan9/server.py b/pyroute2/plan9/server.py index 21c69a6e1..c9aa8858d 100644 --- a/pyroute2/plan9/server.py +++ b/pyroute2/plan9/server.py @@ -218,6 +218,37 @@ def connection_made(self, transport): class Plan9ServerSocket(AsyncCoreSocket): + '''9p2000 server. + + Requires either an IP address to listen on, or an open + `SOCK_STREAM` socket to operate. An IP example, suitable + to establish IPC between processes in one network: + + .. testcode:: + + from pyroute2 import Plan9ClientSocket, Plan9ServerSocket + + address = ('localhost', 8149) + p9server = Plan9ServerSocket(address=address) + p9client = Plan9ClientSocket(address=address) + + Server/client running on a `socketpair()` suitable + for internal API within one process, or between + parent/child processes: + + .. testcode:: + + from socket import socketpair + + from pyroute2 import Plan9ClientSocket, Plan9ServerSocket + + server, client = socketpair() + p9server = Plan9ServerSocket(use_socket=server) + p9client = Plan9ClientSocket(use_socket=client) + + + ''' + def __init__(self, address=None, use_socket=None): self.spec = CoreSocketSpec( { @@ -232,6 +263,101 @@ def __init__(self, address=None, use_socket=None): self.marshal = Marshal9P() super().__init__(use_socket=use_socket) + def register_function( + self, + func, + inode, + loader=json.loads, + dumper=lambda x: json.dumps(x).encode('utf-8'), + ): + '''Register a function to an file. + + The file usage: + + * `write()`: write arguments for the call as a json dictionary of + keyword arguments to the file data. + * `read()`: + 1. if the arguments were written to the data, call the function + and write the result to the file data + 2. read the file data and return to the client + * `call()`: protocol extension, `Tcall` = 80, `Rcall` = 81, make + this in one turn. + + .. testcode:: + :hide: + + from pyroute2.plan9 import Tcall, Rcall + + assert Tcall == 80 + assert Rcall == 81 + + Registering a function: + + .. testcode:: + + def communicate(a, b): + return a + b + + + def example_register(): + fd = p9server.filesystem.create('test_func') + p9server.register_function(communicate, fd) + + Communication using Twrite/Tread: + + .. testcode:: + + import json + + + async def example_write(): + fid = await p9client.fid('test_func') + await p9client.write( + fid, + json.dumps({"a": 17, "b": 25}) + ) + msg = await p9client.read(fid) + response = json.loads(msg['data']) + assert response == 42 + + Same, using a command line 9p client from plan9port:: + + $ echo '{"a": 17, "b": 25}' | 9p -a localhost:8149 write test_func + $ 9p -a localhost:8149 read test_func + 42 + + And using a mounted file system via FUSE client from plan9port:: + + $ 9pfuse localhost:8149 mnt + $ echo '{"a": 17, "b": 25}' >mnt/test_func + $ cat mnt/test_func + 42 + + And the same, but using Tcall: + + .. testcode:: + + async def example_call(): + fid = await p9client.fid('test_func') + response = await p9client.call(fid, argv=(17, 25)) + assert response == 42 + + And finnaly run this code: + + .. testcode:: + + async def main(): + server_task = await p9server.async_run() + example_register() + await p9client.start_session() + await example_write() + await example_call() + server_task.cancel() + + asyncio.run(main()) + ''' + return inode.register_function(func, loader, dumper) + async def setup_endpoint(self, loop=None): if self.endpoint is not None: return @@ -251,6 +377,77 @@ async def setup_endpoint(self, loop=None): ) async def async_run(self): + '''Return the server asyncio task. + + Using this task one can stop the server: + + .. testcode:: + + async def main(): + server = Plan9ServerSocket(address=('localhost', 8149)) + server_task = await server.async_run() + # ... server is running here + server_task.cancel() + # ... server is stopped + + asyncio.run(main()) + + To forcefully close all client connections and stop the server + immediately from a registered function, one can pass this task + to the function, cancel it, and raise `Plan9Exit()` exception: + + .. testcode:: + + import functools + + from pyroute2.plan9 import Plan9Exit + + server_sock, client_sock = socketpair() + + + def test_exit_func(context): + if 'server_task' in context: + context['server_task'].cancel() + raise Plan9Exit('server stopped upon client request') + return 'server starting, please wait' + + + async def server(): + p9server = Plan9ServerSocket(use_socket=server_sock) + context = {} + + inode = p9server.filesystem.create('stop') + p9server.register_function( + functools.partial(test_exit_func, context), + inode + ) + context['server_task'] = await p9server.async_run() + + try: + await context['server_task'] + except asyncio.exceptions.CancelledError: + pass + + assert context['server_task'].cancelled() + + .. testcode:: + :hide: + + async def client(): + p9client = Plan9ClientSocket(use_socket=client_sock) + await p9client.start_session() + fid = await p9client.fid('stop') + try: + await p9client.call(fid) + except Plan9Exit: + pass + + + async def main(): + await asyncio.gather(server(), client()) + + asyncio.run(main()) + ''' await self.setup_endpoint() if self.status['use_socket']: return self.endpoint[1].on_con_lost @@ -258,5 +455,9 @@ async def async_run(self): return asyncio.create_task(self.endpoint.serve_forever()) def run(self): + '''A simple synchronous runner. + + Uses `event_loop.run_forever()`. + ''' self.event_loop.create_task(self.async_run()) self.event_loop.run_forever()