Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
joestubbs committed Feb 12, 2021
2 parents 8bae8c6 + b187d80 commit cc82b01
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 187 deletions.
239 changes: 146 additions & 93 deletions actors/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,37 @@

from config import Config

class WorkerChannel(Channel):
"""Channel for communication with a worker. Pass the id of the worker to communicate with an
existing worker.
"""
@classmethod
def get_name(cls, worker_id):
"""Return the name of the channel that would be used for this worker_id."""
return 'worker_{}'.format(worker_id)

def __init__(self, worker_id=None):
self.uri = Config.get('rabbit', 'uri')
ch_name = None
if worker_id:
ch_name = WorkerChannel.get_name(worker_id)
super().__init__(name=ch_name,
connection_type=RabbitConnection,
uri=self.uri)


class SpawnerWorkerChannel(Channel):
"""Channel facilitating communication between a spawner and a worker during startup. Pass the name of the worker to communicate with an
existing worker.
"""
def __init__(self, worker_id=None):
self.uri = Config.get('rabbit', 'uri')
ch_name = None
if worker_id:
ch_name = 'spawner_worker_{}'.format(worker_id)
super().__init__(name=ch_name,
connection_type=RabbitConnection,
uri=self.uri)
# class WorkerChannel(Channel):
# """Channel for communication with a worker. Pass the id of the worker to communicate with an
# existing worker.
# """
# @classmethod
# def get_name(cls, worker_id):
# """Return the name of the channel that would be used for this worker_id."""
# return 'worker_{}'.format(worker_id)
#
# def __init__(self, worker_id=None):
# self.uri = Config.get('rabbit', 'uri')
# ch_name = None
# if worker_id:
# ch_name = WorkerChannel.get_name(worker_id)
# super().__init__(name=ch_name,
# connection_type=RabbitConnection,
# uri=self.uri)


# class SpawnerWorkerChannel(Channel):
# """Channel facilitating communication between a spawner and a worker during startup. Pass the name of the worker to communicate with an
# existing worker.
# """
# def __init__(self, worker_id=None):
# self.uri = Config.get('rabbit', 'uri')
# ch_name = None
# if worker_id:
# ch_name = 'spawner_worker_{}'.format(worker_id)
# super().__init__(name=ch_name,
# connection_type=RabbitConnection,
# uri=self.uri)


class ClientsChannel(Channel):
Expand Down Expand Up @@ -70,50 +70,51 @@ def request_delete_client(self, tenant, actor_id, worker_id, client_id, secret):
return self.put_sync(msg, timeout=60)


class CommandChannel(Channel):
"""Work with commands on the command channel."""

def __init__(self, name='default'):
self.uri = Config.get('rabbit', 'uri')
queues_list = Config.get('spawner', 'host_queues').replace(' ', '')
valid_queues = queues_list.split(',')
if name not in valid_queues:
raise Exception('Invalid Queue name.')


super().__init__(name='command_channel_{}'.format(name),
connection_type=RabbitConnection,
uri=self.uri)

def put_cmd(self, actor_id, worker_id, image, tenant, stop_existing=True):
"""Put a new command on the command channel."""
msg = {'actor_id': actor_id,
'worker_id': worker_id,
'image': image,
'tenant': tenant,
'stop_existing': stop_existing}

self.put(msg)


class EventsChannel(Channel):
"""Work with events on the events channel."""

event_queue_names = ('default',
)

def __init__(self, name='default'):
self.uri = Config.get('rabbit', 'uri')
if name not in EventsChannel.event_queue_names:
raise Exception('Invalid Events Channel Queue name.')

super().__init__(name='events_channel_{}'.format(name),
connection_type=RabbitConnection,
uri=self.uri)

def put_event(self, json_data):
"""Put a new event on the events channel."""
self.put(json_data)
# class CommandChannel(Channel):
# """Work with commands on the command channel."""
#
# def __init__(self, name='default'):
# self.uri = Config.get('rabbit', 'uri')
# queues_list = Config.get('spawner', 'host_queues').replace(' ', '')
# valid_queues = queues_list.split(',')
# if name not in valid_queues:
# raise Exception('Invalid Queue name.')
#
#
# super().__init__(name='command_channel_{}'.format(name),
# connection_type=RabbitConnection,
# uri=self.uri)
#
# def put_cmd(self, actor_id, worker_id, image, revision, tenant, stop_existing=True):
# """Put a new command on the command channel."""
# msg = {'actor_id': actor_id,
# 'worker_id': worker_id,
# 'image': image,
# 'revision': revision,
# 'tenant': tenant,
# 'stop_existing': stop_existing}
#
# self.put(msg)


# class EventsChannel(Channel):
# """Work with events on the events channel."""
#
# event_queue_names = ('default',
# )
#
# def __init__(self, name='default'):
# self.uri = Config.get('rabbit', 'uri')
# if name not in EventsChannel.event_queue_names:
# raise Exception('Invalid Events Channel Queue name.')
#
# super().__init__(name='events_channel_{}'.format(name),
# connection_type=RabbitConnection,
# uri=self.uri)
#
# def put_event(self, json_data):
# """Put a new event on the events channel."""
# self.put(json_data)


class BinaryChannel(BasicChannel):
Expand Down Expand Up @@ -154,31 +155,83 @@ def get_one(self):
from queues import BinaryTaskQueue


class ActorMsgChannel(BinaryTaskQueue):
def __init__(self, actor_id):
super().__init__(name='actor_msg_{}'.format(actor_id))
class EventsChannel(BinaryTaskQueue):
"""Work with events on the events channel."""

def put_msg(self, message, d={}, **kwargs):
d['message'] = message
for k, v in kwargs:
d[k] = v
self.put(d)
event_queue_names = ('default',
)

def __init__(self, name='default'):
self.uri = Config.get('rabbit', 'uri')
if name not in EventsChannel.event_queue_names:
raise Exception('Invalid Events Channel Queue name.')

super().__init__(name='events_channel_{}'.format(name))

def put_event(self, json_data):
"""Put a new event on the events channel."""
self.put(json_data)


class ActorMSSgChannel(BinaryChannel):
"""Work with messages sent to a specific actor.
class CommandChannel(BinaryTaskQueue):
"""Work with commands on the command channel."""

def __init__(self, name='default'):
self.uri = Config.get('rabbit', 'uri')
queues_list = Config.get('spawner', 'host_queues').replace(' ', '')
valid_queues = queues_list.split(',')
if name not in valid_queues:
raise Exception('Invalid Queue name.')


super().__init__(name='command_channel_{}'.format(name))

def put_cmd(self, actor_id, worker_id, image, revision, tenant, stop_existing=True):
"""Put a new command on the command channel."""
msg = {'actor_id': actor_id,
'worker_id': worker_id,
'image': image,
'revision': revision,
'tenant': tenant,
'stop_existing': stop_existing}

self.put(msg)


class SpawnerWorkerChannel(BinaryTaskQueue):
"""Channel facilitating communication between a spawner and a worker during startup. Pass the name of the worker to communicate with an
existing worker.
"""
def __init__(self, actor_id):
def __init__(self, worker_id=None):
self.uri = Config.get('rabbit', 'uri')
super().__init__(name='actor_msg_{}'.format(actor_id),
connection_type=RabbitConnection,
uri=self.uri)
ch_name = None
if worker_id:
ch_name = 'spawner_worker_{}'.format(worker_id)
super().__init__(name=ch_name)


class WorkerChannel(BinaryTaskQueue):
"""Channel for communication with a worker. Pass the id of the worker to communicate with an
existing worker.
"""
@classmethod
def get_name(cls, worker_id):
"""Return the name of the channel that would be used for this worker_id."""
return 'worker_{}'.format(worker_id)

def __init__(self, worker_id=None):
self.uri = Config.get('rabbit', 'uri')
ch_name = None
if worker_id:
ch_name = WorkerChannel.get_name(worker_id)
super().__init__(name=ch_name)


class ActorMsgChannel(BinaryTaskQueue):
def __init__(self, actor_id):
super().__init__(name='actor_msg_{}'.format(actor_id))

def put_msg(self, message, d={}, **kwargs):
"""Pass a message to an actor's inbox, thereby invoking it. `message` is the request
body msg parameter; `d` is a dictionary built from the request query parameters;
additional metadata (e.g. jwt, username) can be passed through kwargs.
"""
d['message'] = message
for k, v in kwargs:
d[k] = v
Expand Down
Loading

0 comments on commit cc82b01

Please sign in to comment.