diff --git a/.gitignore b/.gitignore index 59737d3c..7df3c18b 100644 --- a/.gitignore +++ b/.gitignore @@ -64,4 +64,5 @@ local-dev.conf.bak passwords.yml *.key -*.log \ No newline at end of file +*.log +abaco.log \ No newline at end of file diff --git a/Dockerfile-test b/Dockerfile-test index 9798e9f5..1ede3300 100644 --- a/Dockerfile-test +++ b/Dockerfile-test @@ -1,13 +1,14 @@ # Test suite for abaco project. -# Image: jstubbs/abaco_testsuite +# Image: abaco/testsuite from alpine:3.8 -RUN apk add --update musl python3 && rm /var/cache/apk/* +RUN apk add --update musl python3 && rm -f /var/cache/apk/* RUN apk add --update bash && rm -f /var/cache/apk/* RUN apk add --update git && rm -f /var/cache/apk/* -RUN apk add --update g++ -f /var/cache/apk/* -RUN apk add --update python3-dev -f /var/cache/apk/* +RUN apk add --update g++ && rm -f /var/cache/apk/* +RUN apk add --update python3-dev && rm -f /var/cache/apk/* +RUN apk add --update linux-headers && rm -f /var/cache/apk/* ADD actors/requirements.txt /requirements.txt RUN pip3 install -r /requirements.txt RUN pip3 install pytest ipython locustio diff --git a/actors/auth.py b/actors/auth.py index 974075ab..1223598e 100644 --- a/actors/auth.py +++ b/actors/auth.py @@ -256,6 +256,11 @@ def check_privileged(): if data.get('mem_limit') or data.get('memLimit'): logger.debug("User is trying to set mem limit") raise PermissionsException("Not authorized -- only admins and privileged users can set mem limit.") + if data.get('queue'): + logger.debug("User is trying to set queue") + raise PermissionsException("Not authorized -- only admins and privileged users can set queue.") + + else: logger.debug("user allowed to set privileged.") diff --git a/actors/channels.py b/actors/channels.py index 2e0693b0..f8a39a85 100644 --- a/actors/channels.py +++ b/actors/channels.py @@ -73,9 +73,15 @@ def request_delete_client(self, tenant, actor_id, worker_id, client_id, secret): class CommandChannel(Channel): """Work with commands on the command channel.""" - def __init__(self): + def __init__(self, name='default'): self.uri = Config.get('rabbit', 'uri') - super().__init__(name='command', + queues_list = Config.get('spawner', 'host_queues').replace(' ', '') + valid_queues = queues_list.split(',') + if name not in valid_queues: + raise Exception('Invalid Queue name.') + + + super().__init__(name='command_channel_{}'.format(name), connection_type=RabbitConnection, uri=self.uri) diff --git a/actors/clients.py b/actors/clients.py index 0c54e3a2..8be230e3 100644 --- a/actors/clients.py +++ b/actors/clients.py @@ -23,7 +23,16 @@ class ClientGenerator(object): def __init__(self): self.secret = os.environ.get('_abaco_secret') - self.ch = ClientsChannel() + ready = False + i = 0 + while not ready: + try: + self.ch = ClientsChannel() + ready = True + except RuntimeError as e: + i = i + 1 + if i > 10: + raise e self.credentials = {} for tenant in get_tenants(): self.credentials[tenant] = {'username': os.environ.get('_abaco_{}_username'.format(tenant), ''), diff --git a/actors/controllers.py b/actors/controllers.py index 1f3198a3..ec4aa5f4 100644 --- a/actors/controllers.py +++ b/actors/controllers.py @@ -32,12 +32,12 @@ message_gauges = {} rate_gauges = {} last_metric = {} -command_gauge = Gauge('message_count_for_command_channel', - 'Number of messages currently in the Command Channel') clients_gauge = Gauge('clients_count_for_clients_store', 'Number of clients currently in the clients_store') + + try: ACTOR_MAX_WORKERS = Config.get("spawner", "max_workers_per_actor") except: @@ -50,6 +50,7 @@ except: num_init_workers = 1 + class MetricsResource(Resource): def get(self): actor_ids = self.get_metrics() @@ -66,12 +67,6 @@ def get_metrics(self): in actors_store.items() if actor.get('stateless') and not actor.get('status') == 'ERROR' ] - ch = CommandChannel() - command_gauge.set(len(ch._queue._queue)) - logger.debug("METRICS COMMAND CHANNEL size: {}".format(command_gauge._value._value)) - ch.close() - logger.debug("ACTOR IDS: {}".format(actor_ids)) - try: if actor_ids: # Create a gauge for each actor id @@ -102,7 +97,7 @@ def check_metrics(self, actor_ids): last_metric.update({actor_id: data}) workers = Worker.get_workers(actor_id) - actor = actor = actors_store[actor_id] + actor = actors_store[actor_id] logger.debug('METRICS: MAX WORKERS TEST {}'.format(actor)) # If this actor has a custom max_workers, use that. Otherwise use default. @@ -126,7 +121,7 @@ def check_metrics(self, actor_ids): # Add a worker if message count reaches a given number try: logger.debug("METRICS current message count: {}".format(current_message_count)) - if metrics_utils.allow_autoscaling(command_gauge._value._value, max_workers, len(workers)): + if metrics_utils.allow_autoscaling(max_workers, len(workers)): if current_message_count >= 1: metrics_utils.scale_up(actor_id) logger.debug("METRICS current message count: {}".format(data[0]['value'][1])) @@ -386,6 +381,12 @@ def validate_post(self): parser = Actor.request_parser() try: args = parser.parse_args() + if args['queue']: + queues_list = Config.get('spawner', 'host_queues').replace(' ', '') + valid_queues = queues_list.split(',') + if args['queue'] not in valid_queues: + raise BadRequest('Invalid queue name.') + except BadRequest as e: msg = 'Unable to process the JSON description.' if hasattr(e, 'data'): @@ -398,6 +399,7 @@ def validate_post(self): def post(self): logger.info("top of POST to register a new actor.") args = self.validate_post() + logger.debug("validate_post() successful") args['tenant'] = g.tenant args['api_server'] = g.api_server @@ -496,6 +498,11 @@ def put(self, actor_id): args = self.validate_put(actor) logger.debug("PUT args validated successfully.") args['tenant'] = g.tenant + if args['queue']: + queues_list = Config.get('spawner', 'host_queues').replace(' ', '') + valid_queues = queues_list.split(',') + if args['queue'] not in valid_queues: + raise BadRequest('Invalid queue name.') # user can force an update by setting the force param: update_image = args.get('force') if not update_image and args['image'] == previous_image: @@ -531,7 +538,8 @@ def put(self, actor_id): logger.info("updated actor {} stored in db.".format(actor_id)) if update_image: worker_ids = [Worker.request_worker(tenant=g.tenant, actor_id=actor.db_id)] - ch = CommandChannel() + # get actor queue name + ch = CommandChannel(name=actor.queue) ch.put_cmd(actor_id=actor.db_id, worker_ids=worker_ids, image=actor.image, tenant=args['tenant']) ch.close() logger.debug("put new command on command channel to update actor.") @@ -1021,9 +1029,9 @@ def validate_post(self): def post(self, actor_id): """Ensure a certain number of workers are running for an actor""" logger.debug("top of POST /actors/{}/workers.".format(actor_id)) - id = g.db_id + dbid = g.db_id try: - actor = Actor.from_db(actors_store[id]) + actor = Actor.from_db(actors_store[dbid]) except KeyError: logger.debug("did not find actor: {}.".format(actor_id)) raise ResourceError("No actor found with id: {}.".format(actor_id), 404) @@ -1033,8 +1041,7 @@ def post(self, actor_id): if not num or num == 0: logger.debug("did not get a num: {}.".format(actor_id)) num = 1 - logger.debug("ensuring at least {} workers. actor: {}.".format(num, actor_id)) - dbid = Actor.get_dbid(g.tenant, actor_id) + logger.debug("ensuring at least {} workers. actor: {}.".format(num, dbid)) try: workers = Worker.get_workers(dbid) except WorkerException as e: @@ -1052,7 +1059,7 @@ def post(self, actor_id): worker_ids = [Worker.request_worker(tenant=g.tenant, actor_id=dbid)] logger.info("New worker id: {}".format(worker_ids[0])) - ch = CommandChannel() + ch = CommandChannel(name=actor.queue) ch.put_cmd(actor_id=actor.db_id, worker_ids=worker_ids, image=actor.image, diff --git a/actors/docker_utils.py b/actors/docker_utils.py index 35571b85..0889348c 100644 --- a/actors/docker_utils.py +++ b/actors/docker_utils.py @@ -93,22 +93,20 @@ def list_all_containers(): cli = docker.APIClient(base_url=dd, version="auto") # todo -- finish -def container_running(image=None, name=None): - """Check if there is a running container for an image. - image should be fully qualified; e.g. image='jstubbs/abaco_core' - Can pass wildcards in name using * character; e.g. name='abaco_spawner*' +def container_running(name=None): + """Check if there is a running container whose name contains the string, `name`. Note that this function will + return True if any running container has a name which contains the input `name`. + """ logger.debug("top of container_running().") filters = {} if name: filters['name'] = name - if image: - filters['image'] = image cli = docker.APIClient(base_url=dd, version="auto") try: containers = cli.containers(filters=filters) except Exception as e: - msg = "There was an error checking container_running for image: {}. Exception: {}".format(image, e) + msg = "There was an error checking container_running for name: {}. Exception: {}".format(name, e) logger.error(msg) raise DockerError(msg) logger.debug("found containers: {}".format(containers)) @@ -175,6 +173,15 @@ def run_container_with_docker(image, host_config = cli.create_host_config(binds=binds, auto_remove=auto_remove) logger.debug("binds: {}".format(binds)) + # add the container to a specific docker network, if configured + netconf = None + try: + docker_network = Config.get('spawner', 'docker_network') + except Exception: + docker_network = None + if docker_network: + netconf = cli.create_networking_config({docker_network: cli.create_endpoint_config()}) + # create and start the container try: container = cli.create_container(image=image, @@ -182,7 +189,8 @@ def run_container_with_docker(image, volumes=volumes, host_config=host_config, command=command, - name=name) + name=name, + networking_config=netconf) cli.start(container=container.get('Id')) except Exception as e: msg = "Got exception trying to run container from image: {}. Exception: {}".format(image, e) diff --git a/actors/health.py b/actors/health.py index 04f61801..b7280d26 100644 --- a/actors/health.py +++ b/actors/health.py @@ -233,7 +233,7 @@ def check_workers(actor_id, ttl): ch.close() except Exception as e: logger.error("Got an error trying to close the worker channel for dead worker. Exception: {}".format(e)) - if not result == 'ok': + if result and not result == 'ok': logger.error("Worker responded unexpectedly: {}, deleting worker.".format(result)) try: rm_container(worker['cid']) @@ -276,6 +276,69 @@ def check_workers(actor_id, ttl): # else: # logger.debug("Worker not in READY status, will postpone.") +def get_host_queues(): + """ + Read host_queues string from config and parse to return a Python list. + :return: list[str] + """ + try: + host_queues_str = Config.get('spawner', 'host_queues') + return [ s.strip() for s in host_queues_str.split(',')] + except Exception as e: + msg = "Got unexpected exception attempting to parse the host_queues config. Exception: {}".format(e) + logger.error(e) + raise e + +def start_spawner(queue, idx='0'): + """ + Start a spawner on this host listening to a queue, `queue`. + :param queue: (str) - the queue the spawner should listen to. + :param idx: (str) - the index to use as a suffix to the spawner container name. + :return: + """ + command = 'python3 -u /actors/spawner.py' + name = 'healthg_{}_spawner_{}'.format(queue, idx) + environment = {'AE_IMAGE': AE_IMAGE.split(':')[0], + 'queue': queue + } + # check logging strategy to determine log file name: + try: + run_container_with_docker(AE_IMAGE, + command, + name=name, + environment=environment, + mounts=[], + log_file=None) + except Exception as e: + logger.critical("Could not restart spawner for queue {}. Exception: {}".format(queue, e)) + +def check_spawner(queue): + """ + Check the health and existence of a spawner on this host for a particular queue. + :param queue: (str) - the queue to check on. + :return: + """ + logger.debug("top of check_spawner for queue: {}".format(queue)) + # spawner container names by convention should have the format __spawner_; for example + # abaco_default_spawner_2. + # so, we look for container names containing a string with that format: + spawner_name_segment = '{}_spawner'.format(queue) + if not container_running(name=spawner_name_segment): + logger.critical("No spawners running for queue {}! Launching new spawner..".format(queue)) + start_spawner(queue) + else: + logger.debug("spawner for queue {} already running.".format(queue)) + +def check_spawners(): + """ + Check health of spawners running on a given host. + :return: + """ + logger.debug("top of check_spawners") + host_queues = get_host_queues() + logger.debug("checking spawners for queues: {}".format(host_queues)) + for queue in host_queues: + check_spawner(queue) def manage_workers(actor_id): """Scale workers for an actor if based on message queue size and policy.""" @@ -300,6 +363,7 @@ def shutdown_all_workers(): def main(): logger.info("Running abaco health checks. Now: {}".format(time.time())) + check_spawners() try: clean_up_ipc_dirs() except Exception as e: @@ -308,19 +372,6 @@ def main(): ttl = Config.get('workers', 'worker_ttl') except Exception as e: logger.error("Could not get worker_ttl config. Exception: {}".format(e)) - if not container_running(name='spawner*'): - logger.critical("No spawners running! Launching new spawner..") - command = 'python3 -u /actors/spawner.py' - # check logging strategy to determine log file name: - try: - run_container_with_docker(AE_IMAGE, - command, - name='abaco_spawner_0', - environment={'AE_IMAGE': AE_IMAGE.split(':')[0]}, - mounts=[], - log_file=None) - except Exception as e: - logger.critical("Could not restart spawner. Exception: {}".format(e)) try: ttl = int(ttl) except Exception as e: diff --git a/actors/metrics_utils.py b/actors/metrics_utils.py index 518dcceb..ae770cb3 100644 --- a/actors/metrics_utils.py +++ b/actors/metrics_utils.py @@ -14,28 +14,53 @@ message_gauges = {} worker_gaueges = {} - +cmd_channel_gauges = {} PROMETHEUS_URL = 'http://172.17.0.1:9090' MAX_WORKERS_PER_HOST = Config.get('spawner', 'max_workers_per_host') +command_gauge = Gauge( + 'message_count_for_command_channel', + 'Number of messages currently in this command channel', + ['name']) def create_gauges(actor_ids): logger.debug("METRICS: Made it to create_gauges") for actor_id in actor_ids: - if actor_id not in message_gauges.keys(): - try: - g = Gauge( - 'message_count_for_actor_{}'.format(actor_id.decode("utf-8").replace('-', '_')), - 'Number of messages for actor {}'.format(actor_id.decode("utf-8").replace('-', '_')) - ) - message_gauges.update({actor_id: g}) - logger.debug('Created gauge {}'.format(g)) - except Exception as e: - logger.info("got exception trying to instantiate the Gauge: {}".format(e)) - else: - g = message_gauges[actor_id] + try: + actor = actors_store[actor_id] + + # If the actor doesn't have a gauge, add one + if actor_id not in message_gauges.keys(): + + g = Gauge( + 'message_count_for_actor_{}'.format(actor_id.decode("utf-8").replace('-', '_')), + 'Number of messages for actor {}'.format(actor_id.decode("utf-8").replace('-', '_')) + ) + message_gauges.update({actor_id: g}) + logger.debug('Created gauge {}'.format(g)) + else: + # Otherwise, get this actor's existing gauge + g = message_gauges[actor_id] + + # Update this actor's command channel metric + channel_name = actor.get("queue") + + queues_list = Config.get('spawner', 'host_queues').replace(' ', '') + valid_queues = queues_list.split(',') + + if not channel_name or channel_name not in valid_queues: + channel_name = 'default' + + ch = CommandChannel(name=channel_name) + command_gauge.labels(channel_name).set(len(ch._queue._queue)) + logger.debug("METRICS COMMAND CHANNEL {} size: {}".format(channel_name, command_gauge._value._value)) + ch.close() + except Exception as e: + logger.info("got exception trying to instantiate the Gauge: {}".format(e)) + + # Update this actor's gauge to its current # of messages try: ch = ActorMsgChannel(actor_id=actor_id.decode("utf-8")) except Exception as e: @@ -45,6 +70,8 @@ def create_gauges(actor_ids): ch.close() g.set(result['messages']) logger.debug("METRICS: {} messages found for actor: {}.".format(result['messages'], actor_id)) + + # add a worker gauge for this actor if one does not exist if actor_id not in worker_gaueges.keys(): try: g = Gauge( @@ -56,13 +83,18 @@ def create_gauges(actor_ids): except Exception as e: logger.info("got exception trying to instantiate the Worker Gauge: {}".format(e)) else: + # Otherwise, get the worker gauge that already exists g = worker_gaueges[actor_id] + + # Update this actor's worker IDs workers = Worker.get_workers(actor_id) result = {'workers': len(workers)} g.set(result['workers']) + # Return actor_ids so we don't have to query for them again later return actor_ids + def query_message_count_for_actor(actor_id): query = { 'query': 'message_count_for_actor_{}'.format(actor_id.decode("utf-8").replace('-', '_')), @@ -92,7 +124,7 @@ def calc_change_rate(data, last_metric, actor_id): def allow_autoscaling(cmd_q_len, max_workers, num_workers): - if cmd_q_len > int(MAX_WORKERS_PER_HOST) or cmd_q_len > 5 or int(num_workers) >= int(max_workers): + if int(num_workers) >= int(max_workers): logger.debug('METRICS NO AUTOSCALE - criteria not met. {} {} '.format(cmd_q_len, num_workers)) return False @@ -108,7 +140,11 @@ def scale_up(actor_id): actor = Actor.from_db(actors_store[actor_id]) worker_ids = [Worker.request_worker(tenant=tenant, actor_id=aid)] logger.info("New worker id: {}".format(worker_ids[0])) - ch = CommandChannel() + if actor.queue: + channel_name = actor.queue + else: + channel_name = 'default' + ch = CommandChannel(name=channel_name) ch.put_cmd(actor_id=actor.db_id, worker_ids=worker_ids, image=actor.image, diff --git a/actors/models.py b/actors/models.py index 4cf765b4..1d09c0b1 100644 --- a/actors/models.py +++ b/actors/models.py @@ -225,6 +225,7 @@ class Actor(AbacoDAO): ('uid', 'optional', 'uid', str, 'The uid to run the container as. Only used if user_container_uid is false.', None), ('gid', 'optional', 'gid', str, 'The gid to run the container as. Only used if user_container_uid is false.', None), + ('queue', 'optional', 'queue', str, 'The command channel that this actor uses.', 'default'), ('db_id', 'derived', 'db_id', str, 'Primary key in the database for this actor.', None), ('id', 'derived', 'id', str, 'Human readable id for this actor.', None), ] @@ -332,7 +333,7 @@ def ensure_one_worker(self): worker_ids = [worker_id] logger.info("Actor.ensure_one_worker() putting message on command channel for worker_id: {}".format( worker_id)) - ch = CommandChannel() + ch = CommandChannel(name=self.queue) ch.put_cmd(actor_id=self.db_id, worker_ids=worker_ids, image=self.image, @@ -384,6 +385,8 @@ class Alias(AbacoDAO): # the following nouns cannot be used for an alias as they RESERVED_WORDS = ['executions', 'nonces', 'logs', 'messages', 'adapters', 'admin'] + FORBIDDEN_CHAR = [':', '/', '?', '#', '[', ']', '@', '!', '$', '&', "'", '(', ')', '*', '+', ',', ';', '='] + @classmethod def generate_alias_id(cls, tenant, alias): @@ -396,11 +399,20 @@ def check_reserved_words(self): "The following reserved words cannot be used " "for an alias: {}.".format(self.alias, Alias.RESERVED_WORDS)) + def check_forbidden_char(self): + for char in Alias.FORBIDDEN_CHAR: + if char in self.alias: + raise errors.DAOError("'{}' is a forbidden character. " + "The following characters cannot be used " + "for an alias: ['{}'].".format(char, "', '".join(Alias.FORBIDDEN_CHAR))) + def check_and_create_alias(self): """Check to see if an alias is unique and create it if so. If not, raises a DAOError.""" - # first, make sure alias is not a reserved word: + # first, make sure alias is not a reserved word: self.check_reserved_words() + # second, make sure alias is not using a forbidden char: + self.check_forbidden_char() # attempt to create the alias within a transaction obj = alias_store.add_key_val_if_empty(self.alias_id, self) if not obj: @@ -416,7 +428,7 @@ def retrieve_by_alias_id(cls, alias_id): return Alias(**obj) def get_hypermedia(self): - return {'_links': { 'self': '{}/actors/v2/alaises/{}'.format(self.api_server, self.alias), + return {'_links': { 'self': '{}/actors/v2/aliases/{}'.format(self.api_server, self.alias), 'owner': '{}/profiles/v2/{}'.format(self.api_server, self.owner), 'actor': '{}/actors/v2/{}'.format(self.api_server, self.actor_id) }} @@ -446,7 +458,8 @@ class Nonce(AbacoDAO): 'Permission level associated with this nonce. Default is {}.'.format(EXECUTE), EXECUTE.name), ('max_uses', 'optional', 'max_uses', int, 'Maximum number of times this nonce can be redeemed. Default is unlimited.', -1), - + ('description', 'optional', 'description', str, 'Description of this nonce', ''), + ('id', 'derived', 'id', str, 'Unique id for this nonce.', None), ('actor_id', 'derived', 'actor_id', str, 'The human readable id for the actor associated with this nonce.', None), @@ -1124,4 +1137,4 @@ def set_permission(user, actor_id, level): except KeyError: # if actor has no permissions, a KeyError will be thrown permissions_store[actor_id] = {user: level.name} - logger.info("Permission set for actor: {}; user: {} at level: {}".format(actor_id, user, level)) \ No newline at end of file + logger.info("Permission set for actor: {}; user: {} at level: {}".format(actor_id, user, level)) diff --git a/actors/requirements.txt b/actors/requirements.txt index 090fb38a..4097ddc5 100644 --- a/actors/requirements.txt +++ b/actors/requirements.txt @@ -14,7 +14,7 @@ pika==0.9.13 docker==2.7.0 pycrypto==2.6.1 PyJWT==0.2.3 -gunicorn==19.3.0 +gunicorn==19.9.0 rabbitpy==1.0.0 pyzmq==14.3.0 pymongo==3.3.0 diff --git a/actors/spawner.py b/actors/spawner.py index 35378f2d..2442e953 100644 --- a/actors/spawner.py +++ b/actors/spawner.py @@ -36,7 +36,8 @@ class Spawner(object): def __init__(self): self.num_workers = int(Config.get('workers', 'init_count')) self.secret = os.environ.get('_abaco_secret') - self.cmd_ch = CommandChannel() + self.queue = os.environ.get('queue', 'default') + self.cmd_ch = CommandChannel(name=self.queue) self.tot_workers = 0 try: self.host_id = Config.get('spawner', 'host_id') @@ -331,7 +332,7 @@ def main(): logger.info("spawner made connection to rabbit, entering main loop") logger.info("spawner using abaco_conf_host_path={}".format(os.environ.get('abaco_conf_host_path'))) sp.run() - except rabbitpy.exceptions.ConnectionException: + except (rabbitpy.exceptions.ConnectionException, RuntimeError): # rabbit seems to take a few seconds to come up time.sleep(5) idx += 1 diff --git a/dashboard/tests.py b/dashboard/tests.py index b2216346..7243af02 100644 --- a/dashboard/tests.py +++ b/dashboard/tests.py @@ -1,32 +1,126 @@ -from django.test import TestCase +# To run tests: docker-compose -f docker-compose-dashboard.yml run django python manage.py test tests +# to build dashboard image: docker build -t abaco/dashboard -f Dockerfile-dashboard . + +import unittest +from django.test import TestCase, Client +from util import base_url + +import requests +import os +import tempfile +import json + +import pytest + + -# from rest_framework.authtoken.models import Token -# from rest_framework.test import APIClient -from local_secrets import * class DashboardClassCase(TestCase): def setup(self): - pass + self.client = Client() - def test_no_login_if_not_admin(self): - pass + def login(self, username, password): + return self.client.post('/login', data=dict( + username=username, + password=password, + ), follow_redirects=True) - def test_flushes_session_if_not_admin(self): - pass + + def logout(self, client): + return self.client.get('/logout', follow_redirects=True) + + def test_login(self): + rsp = self.login(username='testshareuser', password='testshareuser') + self.assertNotIn("Invalid username or password", rsp.content) + self.assertNotIn("You do not have Admin privileges.", rsp.content) + self.logout(client=Client) + + def test_no_login_if_not_admin(self): + rsp = self.login(username='testuser', password='testuser') + self.assertIn("You do not have Admin privileges.", rsp.content) + self.logout(client=Client) + + def test_no_login_if_invalid(self): + rsp = self.login(username='hshs', password='jsjsjs') + self.assertIn("Invalid username or password", rsp.content) + self.logout(client=Client) def test_actors_tab_no_session(self): - pass + rsp = self.client.get('/actors') + self.assertEquals(rsp.status_code, 302) + self. assertIn(rsp.content, "there was an error") def test_workers_tab_no_session(self): - pass + rsp = self.client.get('/workers') + self.assertEquals(rsp.status_code, 302) + self.assertIn(rsp.content, "there was an error") def test_executions_tab_no_session(self): - pass + rsp = self.client.get('/executions') + self.assertEquals(rsp.status_code, 302) + self.assertIn(rsp.content, "there was an error") + def test_logout(self): + rsp = self.login(username='testshareuser', password='testshareuser') + self.assertEquals(rsp.status_code, 302) + self.logout(client=Client) + rsp = self.client.get('/workers') + self.assertEquals(rsp.status_code, 302) + self.assertIn(rsp.content, "there was an error") + rspa = self.client.get('/actors') + self.assertEquals(rspa.status_code, 302) + self.assertIn(rspa.content, "there was an error") + rspe = self.client.get('/executions') + self.assertEquals(rspe.status_code, 302) + self.assertIn(rspe.content, "there was an error") + def test_api_deletes_actors_from_dashboard(self): + url = '{}/{}'.format(base_url, 'actors') + print(base_url) + data = { + 'image': 'jstubbs/abaco_test', + 'name': 'abaco_test_default', + 'stateless': False, + } + file_path = 'jwt-abaco-admin' + with open(file_path, 'r') as f: + jwt_default = f.read() + jwt = os.environ.get('jwt', jwt_default) + jwt_header = os.environ.get('jwt_header', 'X-Jwt-Assertion-DEV-DEVELOP') + headers = {jwt_header: jwt} + rsp = requests.post(url, data=data, headers=headers) + data = json.loads(rsp.content.decode('utf-8')) + result = data['result'] + actor_id = result['id'] + print(actor_id) + url_del = '{}/{}/v2/{}'.format(base_url, 'actors', actor_id) + self.assertIn(actor_id, rsp.content) + del_rsp = requests.delete(url_del, headers=headers) + fin_rsp = self.client.get('/actors') + self.assertNotIn(actor_id, fin_rsp.content) + + + def test_delete_button_deletes_workers_from_api(self): + pass + # pass until caching has been implemented + ########### + # url = '{}/{}'.format(base_url, '/actors') + # data = { + # 'image': 'jstubbs/abaco_test', + # 'name': 'abaco_test_default', + # 'stateless': False, + # } + # rsp = requests.post(url, data=data, headers=headers) + # actor_id = rsp['id'] + # print(actor_id) + # drsp = requests.delete(self.url, headers=headers, actorId=self.actor_id) + # # frsp = self.client.get('/actors') this one needs to go to API + # self.assertNotIn(frsp.conent, actor_id) + + diff --git a/docker-compose-local-db.yml b/docker-compose-local-db.yml index 27f78db2..96fd0c1a 100644 --- a/docker-compose-local-db.yml +++ b/docker-compose-local-db.yml @@ -1,26 +1,28 @@ --- -mongo: - image: mongo - ports: - - "27017:27017" -# uncomment to add auth -# command: --auth +version: "2" +services: + mongo: + image: mongo + ports: + - "27017:27017" + # uncomment to add auth + # command: --auth -rabbit: - image: rabbitmq:3.5.3-management - ports: - - "5672:5672" - - "15672:15672" - environment: - RABBITMQ_NODENAME: abaco-rabbit + rabbit: + image: rabbitmq:3.5.3-management + ports: + - "5672:5672" + - "15672:15672" + environment: + RABBITMQ_NODENAME: abaco-rabbit -redis: - image: redis - ports: - - "6379:6379" -# -- uncomment to add auth -# volumes: -# - ./redis.conf:/etc/redis.conf -# command: redis-server /etc/redis.conf --appendonly yes -# -- \ No newline at end of file + redis: + image: redis + ports: + - "6379:6379" + # -- uncomment to add auth + # volumes: + # - ./redis.conf:/etc/redis.conf + # command: redis-server /etc/redis.conf --appendonly yes + # -- \ No newline at end of file diff --git a/docker-compose-local.yml b/docker-compose-local.yml index d7c7d7d6..c478d722 100644 --- a/docker-compose-local.yml +++ b/docker-compose-local.yml @@ -1,122 +1,127 @@ --- -nginx: - image: abaco/nginx$TAG - volumes: - - ./local-dev.conf:/etc/service.conf - ports: - - "8000:80" +version: "2" -reg: - image: abaco/core$TAG - ports: - - "5000:5000" - volumes: - - ./local-dev.conf:/etc/service.conf - - ./abaco.log:/var/log/service.log - environment: - server: dev - api: reg - server: gunicorn - mongo_password: - redis_password: - TAS_ROLE_ACCT: - TAS_ROLE_PASS: +services: + nginx: + image: abaco/nginx$TAG +# image: nginx + volumes: + - ./local-dev.conf:/etc/service.conf +# - ./images/nginx/nginx.conf:/etc/nginx/nginx.conf +# - ./images/nginx/sites-enabled:/etc/nginx/sites-enabled + ports: + - "8000:80" + restart: always + reg: + image: abaco/core$TAG + ports: + - "5000:5000" + volumes: + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + environment: + api: reg + server: gunicorn + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: -mes: - image: abaco/core$TAG - volumes: - - ./local-dev.conf:/etc/service.conf - - ./abaco.log:/var/log/service.log - ports: - - "5001:5000" - environment: - server: gunicorn - api: mes - mongo_password: - redis_password: - TAS_ROLE_ACCT: - TAS_ROLE_PASS: + mes: + image: abaco/core$TAG + volumes: + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + ports: + - "5001:5000" + environment: + server: gunicorn + api: mes + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: -#metrics: -# image: abaco/core$TAG -# volumes: -# - ./local-dev.conf:/etc/service.conf -# - ./abaco.log:/var/log/service.log -# ports: -# - "5004:5000" -# environment: -# server: dev -# api: metrics -# mongo_password: -# redis_password: -# TAS_ROLE_ACCT: -# TAS_ROLE_PASS: + admin: + image: abaco/core$TAG + volumes: + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + ports: + - "5003:5000" + environment: + server: gunicorn + api: admin + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + spawner: + image: abaco/core$TAG + command: "python3 -u /actors/spawner.py" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + environment: + abaco_conf_host_path: ${abaco_path}/local-dev.conf + _abaco_secret: 123 + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + queue: default -admin: - image: abaco/core$TAG - volumes: - - ./local-dev.conf:/etc/service.conf - - ./abaco.log:/var/log/service.log - ports: - - "5003:5000" - environment: - server: gunicorn - api: admin - mongo_password: - redis_password: - TAS_ROLE_ACCT: - TAS_ROLE_PASS: + clientg: + image: abaco/core$TAG + command: "python3 -u /actors/clients.py" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + environment: + abaco_conf_host_path: ${abaco_path}/local-dev.conf + _abaco_secret: 123 + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: -spawner: - image: abaco/core$TAG - command: "python3 -u /actors/spawner.py" - volumes: - - /var/run/docker.sock:/var/run/docker.sock - - ./local-dev.conf:/etc/service.conf - - ./abaco.log:/var/log/service.log - environment: - abaco_conf_host_path: ${abaco_path}/local-dev.conf - _abaco_secret: 123 - mongo_password: - redis_password: - TAS_ROLE_ACCT: - TAS_ROLE_PASS: + # add the following pair of credentials for each tenant wanting client generation + _abaco_DEV-DEVELOP_username: testotheruser + _abaco_DEV-DEVELOP_password: testotheruser + # _abaco_DEV-STAGING_username: abaco1 + # _abaco_DEV-STAGING_password: abaco1 -clientg: - image: abaco/core$TAG - command: "python3 -u /actors/clients.py" - volumes: - - /var/run/docker.sock:/var/run/docker.sock - - ./local-dev.conf:/etc/service.conf - - ./abaco.log:/var/log/service.log - environment: - abaco_conf_host_path: ${abaco_path}/local-dev.conf - _abaco_secret: 123 - mongo_password: - redis_password: - TAS_ROLE_ACCT: - TAS_ROLE_PASS: + metrics: + image: abaco/core$TAG + volumes: + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + ports: + - "5004:5000" + environment: + server: dev + api: metrics + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: - # add the following pair of credentials for each tenant wanting client generation - _abaco_DEV-DEVELOP_username: testotheruser - _abaco_DEV-DEVELOP_password: testotheruser -# _abaco_DEV-STAGING_username: abaco1 -# _abaco_DEV-STAGING_password: abaco1 - -health: - image: abaco/core$TAG - command: /actors/health_check.sh - volumes: - - /:/host - - /var/run/docker.sock:/var/run/docker.sock - - ./local-dev.conf:/etc/service.conf - - ./abaco.log:/var/log/service.log - environment: - abaco_conf_host_path: ${abaco_path}/local-dev.conf - mongo_password: - redis_password: - TAS_ROLE_ACCT: - TAS_ROLE_PASS: + health: + image: abaco/core$TAG + command: /actors/health_check.sh + volumes: + - /:/host + - /var/run/docker.sock:/var/run/docker.sock + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + environment: + abaco_conf_host_path: ${abaco_path}/local-dev.conf + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: diff --git a/docker-compose.yml b/docker-compose.yml index 7020b6c2..60c728a3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,60 +1,96 @@ --- -mongo: - extends: - file: docker-compose-local-db.yml - service: mongo - -redis: - extends: - file: docker-compose-local-db.yml - service: redis - -rabbit: - extends: - file: docker-compose-local-db.yml - service: rabbit - - -nginx: - extends: - file: docker-compose-local.yml - service: nginx - -reg: - extends: - file: docker-compose-local.yml - service: reg - -mes: - extends: - file: docker-compose-local.yml - service: mes - -#metrics: -# extends: -# file: docker-compose-local.yml -# service: metrics - -admin: - extends: - file: docker-compose-local.yml - service: admin - -# uncomment to run the client generation -clientg: - extends: - file: docker-compose-local.yml - service: clientg - volumes: - - ./local-dev.conf:/etc/service.conf - -spawner: - extends: - file: docker-compose-local.yml - service: spawner - -health: - extends: - file: docker-compose-local.yml - service: health \ No newline at end of file +version: "2" + +networks: + abaco: + driver: bridge + +services: + mongo: + extends: + file: docker-compose-local-db.yml + service: mongo + networks: + - abaco + + redis: + extends: + file: docker-compose-local-db.yml + service: redis + networks: + - abaco + + rabbit: + extends: + file: docker-compose-local-db.yml + service: rabbit + networks: + - abaco + + nginx: + extends: + file: docker-compose-local.yml + service: nginx + networks: + - abaco + + reg: + extends: + file: docker-compose-local.yml + service: reg + networks: + - abaco + + mes: + extends: + file: docker-compose-local.yml + service: mes + networks: + - abaco + + metrics: + extends: + file: docker-compose-local.yml + service: metrics + networks: + - abaco + + admin: + extends: + file: docker-compose-local.yml + service: admin + networks: + - abaco + + clientg: + extends: + file: docker-compose-local.yml + service: clientg + volumes: + - ./local-dev.conf:/etc/service.conf + networks: + - abaco + + default_spawner: + extends: + file: docker-compose-local.yml + service: spawner + networks: + - abaco + + special_spawner: + extends: + file: docker-compose-local.yml + service: spawner + environment: + queue: special + networks: + - abaco + + health: + extends: + file: docker-compose-local.yml + service: health + networks: + - abaco diff --git a/docs/developer_docs.md b/docs/developer_docs.md index 44ec0138..806fa2c5 100644 --- a/docs/developer_docs.md +++ b/docs/developer_docs.md @@ -134,13 +134,13 @@ $ docker build -f Dockerfile-test -t abaco/testsuite$TAG . To run the functional tests, execute the following: ```shell -$ docker run -e base_url=http://172.17.0.1:8000 -e case=camel -v /:/host -v $(pwd)/local-dev.conf:/etc/service.conf -it --rm abaco/testsuite$TAG +$ docker run --network=abaco_abaco -e base_url=http://nginx -e case=camel -v /:/host -v $(pwd)/local-dev.conf:/etc/service.conf -it --rm abaco/testsuite$TAG ``` Run the unit tests with a command similar to the following, changing the test module as the end as necessary: ```shell -$ docker run -e base_url=http://172.17.0.1:8000 -v $(pwd)/local-dev.conf:/etc/service.conf --entrypoint=py.test -it --rm abaco/testsuite$TAG /tests/test_store.py +$ docker run --network=abaco_abaco -e base_url=http://nginx -v $(pwd)/local-dev.conf:/etc/service.conf --entrypoint=py.test -it --rm abaco/testsuite$TAG /tests/test_store.py ``` Dev, Staging and Master Branches and Environments diff --git a/images/nginx/Dockerfile b/images/nginx/Dockerfile index e4097bfb..472ad2fe 100644 --- a/images/nginx/Dockerfile +++ b/images/nginx/Dockerfile @@ -1,5 +1,5 @@ # Image: abaco/nginx -FROM alpine:3.1 +FROM alpine:3.8 ENV NGINX_VERSION nginx-1.7.11 diff --git a/images/nginx/nginx.conf b/images/nginx/nginx.conf index 5276b9c4..efc49fa0 100644 --- a/images/nginx/nginx.conf +++ b/images/nginx/nginx.conf @@ -16,6 +16,7 @@ events { http { include mime.types; + resolver 127.0.0.11; default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' diff --git a/images/nginx/sites-enabled/flask-project b/images/nginx/sites-enabled/flask-project index da493551..f9c8afcc 100644 --- a/images/nginx/sites-enabled/flask-project +++ b/images/nginx/sites-enabled/flask-project @@ -3,36 +3,37 @@ server { listen 80; server_name abaco.org; charset utf-8; + resolver 127.0.0.11; location /docs { root /; } location /metrics { - proxy_pass http://172.17.0.1:5004/metrics; + proxy_pass http://metrics:5000/metrics; } location ~* ^/actors/admin(.*) { - proxy_pass http://172.17.0.1:5003/actors/admin$1$is_args$args; + proxy_pass http://admin:5000/actors/admin$1$is_args$args; } location ~* ^/actors/(.*)/messages(.*) { - proxy_pass http://172.17.0.1:5001/actors/$1/messages$is_args$args; + proxy_pass http://mes:5000/actors/$1/messages$is_args$args; } location ~/actors/(.*)/workers(.*) { - proxy_pass http://172.17.0.1:5003/actors/$1/workers$2$is_args$args; + proxy_pass http://admin:5000/actors/$1/workers$2$is_args$args; } location ~/actors/aliases/(.*)/permissions { - proxy_pass http://172.17.0.1:5003/actors/aliases/$1/permissions$is_args$args; + proxy_pass http://admin:5000/actors/aliases/$1/permissions$is_args$args; } location ~/actors/(.*)/permissions { - proxy_pass http://172.17.0.1:5003/actors/$1/permissions$is_args$args; + proxy_pass http://admin:5000/actors/$1/permissions$is_args$args; } location ~/actors(.*) { - proxy_pass http://172.17.0.1:5000/actors$1$is_args$args; + proxy_pass http://reg:5000/actors$1$is_args$args; } -} \ No newline at end of file +} diff --git a/local-dev.conf b/local-dev.conf index 2329c0b9..3b303ba6 100644 --- a/local-dev.conf +++ b/local-dev.conf @@ -16,7 +16,7 @@ level = DEBUG [store] # url for the mongo instance -mongo_host: 172.17.0.1 +mongo_host: mongo # port for the mongo instance mongo_port: 27017 @@ -26,7 +26,7 @@ mongo_port: 27017 #mongo_password: the_mongo_password # url for the redis instance -redis_host: 172.17.0.1 +redis_host: redis # port for the redis instance redis_port: 6379 @@ -37,7 +37,7 @@ redis_port: 6379 [rabbit] # url and port for the rabbitmq instance -uri: amqp://172.17.0.1:5672 +uri: amqp://rabbit:5672 [spawner] @@ -46,6 +46,9 @@ uri: amqp://172.17.0.1:5672 # host_id should be the unique for each worker host. host_id: 0 +# list of queues that spawners on this host will subscribe to. +host_queues: default, special + # An addressable IP for the spawner's host. This config is not currently used but could # be at a future date to support a conatiner scheduler like swarm or mesos. host_ip: 172.17.0.1 @@ -66,6 +69,15 @@ max_workers_per_host: 75 # an actor's worker pool beyond this number. max_workers_per_actor: 6 +# name of the docker network on which the Abaco agents should start spawner and worker containers. +# when using the local development stack, by default this network will be named {{ repo_name }}_abaco, where +# repo_name is the name of the directory in which this repository was checked out. +# this setting is required when using the local-dev.conf file included here so that spawners and workers +# can communicate with the database containers. + +# when this setting is not configured, spawners and workers started programmatically will be added to the +# default docker network for the host. +docker_network: abaco_abaco [docker] # url to use for docker daemon by spawners and workers. Currently only the unix socket is @@ -202,4 +214,7 @@ case: camel # The maximum content length, in bytes, allowed for raw (binary) data messages. # Below we set it to 500M: -max_content_length: 500000000 \ No newline at end of file +max_content_length: 500000000 + +# list of all allowable queues +all_queues: default, special \ No newline at end of file diff --git a/samples/binary_message_classifier/Dockerfile b/samples/binary_message_classifier/Dockerfile new file mode 100644 index 00000000..90d31f83 --- /dev/null +++ b/samples/binary_message_classifier/Dockerfile @@ -0,0 +1,16 @@ +# image: abacosamples/binary_message_classifier + +FROM tensorflow/tensorflow:1.5.0-py3 + +# Requirements +RUN apt-get update && apt-get install -y wget +RUN pip install --upgrade h5py +RUN pip install --no-cache-dir agavepy + +# Image +ADD classify_image.py /classify_image.py +ADD entry.sh /entry.sh +RUN chmod +x /entry.sh + +ENTRYPOINT ["python", "/classify_image.py", "--bin_message"] +# ENTRYPOINT ["/entry.sh"] diff --git a/samples/binary_message_classifier/README.rst b/samples/binary_message_classifier/README.rst new file mode 100644 index 00000000..800646ef --- /dev/null +++ b/samples/binary_message_classifier/README.rst @@ -0,0 +1,83 @@ +Image: abacosamples/binary_message_classifier +--------------------------------------------- + +`Originally from TACCster tutorial. +`_ + +Directory contains a "self-contained" image classifier based on the TensorFlow library. +Modified to take two additional inputs, binary messages, and binary image data. + +Binary image data can be used as an input to use the code locally, while binary message +input allows a connection to Abaco's FIFO pipeline and allows for binary message data to +be read in. + +Building image +~~~~~~~~~~~~~~ + +Image creation can be done with: + +.. code-block:: bash + + docker build . + +Executing the actor with Python and AgavePy +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Setting up an ``AgavePy`` object with token and API address information: + +.. code-block:: python + + from agavepy.agave import Agave + ag = Agave(api_server='https://api.tacc.utexas.edu', + username='', password='', + client_name='JPEG_classifier', + api_key='', + api_secret='') + + ag.get_access_token() + ag = Agave(api_server='https://api.tacc.utexas.edu/', token=ag.token) + +Creating actor with the TensorFlow image classifier docker image: + +.. code-block:: python + + my_actor = {'image': 'abacosamples/binary_message_classifier', + 'name': 'JPEG_classifier', + 'description': 'Labels a read in binary image'} + actor_data = ag.actors.add(body=my_actor) + +The following creates a binary message from a JPEG image file: + +.. code-block:: python + + with open('', 'rb') as file: + binary_image = file.read() + +Sending binary JPEG file to actor as message with the ``application/octet-stream`` header: + +.. code-block:: python + + result = ag.actors.sendMessage(actorId=actor_data['id'], + body={'binary': binary_image}, + headers={'Content-Type': 'application/octet-stream'}) + +The following returns information pertaining to the execution: + +.. code-block:: python + + execution = ag.actors.getExecution(actorId=actor_data['id'], + executionId = result['executionId']) + +Once the execution has complete, the logs can be called with the following: + +.. code-block:: python + + exec_info = requests.get('{}/actors/v2/{}/executions/{}'.format(url, actor_id, exec_id), + headers={'Authorization': 'Bearer {}'.format(token)}) + +Extra info +~~~~~~~~~~ + +There is a non-used entry.sh file in this folder, you can use that along with +uncommenting the final line of the Dockerfile in order to use image urls as +input. The classify_image.py file takes more inputs as well from command line! diff --git a/samples/binary_message_classifier/classify_image.py b/samples/binary_message_classifier/classify_image.py new file mode 100755 index 00000000..3079c679 --- /dev/null +++ b/samples/binary_message_classifier/classify_image.py @@ -0,0 +1,260 @@ +# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +"""Simple image classification with Inception. + +Run image classification with Inception trained on ImageNet 2012 Challenge data +set. + +This program creates a graph from a saved GraphDef protocol buffer, +and runs inference on an input JPEG image. It outputs human readable +strings of the top 5 predictions along with their probabilities. + +Change the --image_file argument to any jpg image to compute a +classification of that image. + +Please see the tutorial and website for a detailed description of how +to use this script to perform image recognition. + +https://tensorflow.org/tutorials/image_recognition/ +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import os.path +import re +import sys +import tarfile + +import numpy as np +from six.moves import urllib +import tensorflow as tf + +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' +tf.logging.set_verbosity(tf.logging.ERROR) + +FLAGS = None + +# pylint: disable=line-too-long +DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz' +# pylint: enable=line-too-long + + +class NodeLookup(object): + """Converts integer node ID's to human readable labels.""" + + def __init__(self, + label_lookup_path=None, + uid_lookup_path=None): + if not label_lookup_path: + label_lookup_path = os.path.join( + FLAGS.model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt') + if not uid_lookup_path: + uid_lookup_path = os.path.join( + FLAGS.model_dir, 'imagenet_synset_to_human_label_map.txt') + self.node_lookup = self.load(label_lookup_path, uid_lookup_path) + + def load(self, label_lookup_path, uid_lookup_path): + """Loads a human readable English name for each softmax node. + + Args: + label_lookup_path: string UID to integer node ID. + uid_lookup_path: string UID to human-readable string. + + Returns: + dict from integer node ID to human-readable string. + """ + if not tf.gfile.Exists(uid_lookup_path): + tf.logging.fatal('File does not exist %s', uid_lookup_path) + if not tf.gfile.Exists(label_lookup_path): + tf.logging.fatal('File does not exist %s', label_lookup_path) + + # Loads mapping from string UID to human-readable string + proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines() + uid_to_human = {} + p = re.compile(r'[n\d]*[ \S,]*') + for line in proto_as_ascii_lines: + parsed_items = p.findall(line) + uid = parsed_items[0] + human_string = parsed_items[2] + uid_to_human[uid] = human_string + + # Loads mapping from string UID to integer node ID. + node_id_to_uid = {} + proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines() + for line in proto_as_ascii: + if line.startswith(' target_class:'): + target_class = int(line.split(': ')[1]) + if line.startswith(' target_class_string:'): + target_class_string = line.split(': ')[1] + node_id_to_uid[target_class] = target_class_string[1:-2] + + # Loads the final mapping of integer node ID to human-readable string + node_id_to_name = {} + for key, val in node_id_to_uid.items(): + if val not in uid_to_human: + tf.logging.fatal('Failed to locate: %s', val) + name = uid_to_human[val] + node_id_to_name[key] = name + + return node_id_to_name + + def id_to_string(self, node_id): + if node_id not in self.node_lookup: + return '' + return self.node_lookup[node_id] + + +def create_graph(): + """Creates a graph from saved GraphDef file and returns a saver.""" + # Creates graph from saved graph_def.pb. + with tf.gfile.FastGFile(os.path.join( + FLAGS.model_dir, 'classify_image_graph_def.pb'), 'rb') as f: + graph_def = tf.GraphDef() + graph_def.ParseFromString(f.read()) + _ = tf.import_graph_def(graph_def, name='') + + +def run_inference_on_image(image, bin_flag=False): + """Runs inference on an image. + + Args: + image: Image file name or binary image. + bin_flag: Whether or not data entered is binary image. + + Returns: + Nothing + """ + if bin_flag: + image_data = image + else: + if not tf.gfile.Exists(image): + tf.logging.fatal('File does not exist %s', image) + image_data = tf.gfile.FastGFile(image, 'rb').read() + + # Creates graph from saved GraphDef. + create_graph() + + with tf.Session() as sess: + # Some useful tensors: + # 'softmax:0': A tensor containing the normalized prediction across + # 1000 labels. + # 'pool_3:0': A tensor containing the next-to-last layer containing 2048 + # float description of the image. + # 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG + # encoding of the image. + # Runs the softmax tensor by feeding the image_data as input to the graph. + softmax_tensor = sess.graph.get_tensor_by_name('softmax:0') + predictions = sess.run(softmax_tensor, + {'DecodeJpeg/contents:0': image_data}) + predictions = np.squeeze(predictions) + + # Creates node ID --> English string lookup. + node_lookup = NodeLookup() + + top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1] + for node_id in top_k: + human_string = node_lookup.id_to_string(node_id) + score = predictions[node_id] + print('%s (score = %.5f)' % (human_string, score)) + + +def maybe_download_and_extract(): + """Download and extract model tar file.""" + dest_directory = FLAGS.model_dir + if not os.path.exists(dest_directory): + os.makedirs(dest_directory) + filename = DATA_URL.split('/')[-1] + filepath = os.path.join(dest_directory, filename) + if not os.path.exists(filepath): +# def _progress(count, block_size, total_size): +# sys.stdout.write('\r>> Downloading %s %.1f%%' % ( +# filename, float(count * block_size) / float(total_size) * 100.0)) +# sys.stdout.flush() + filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath)#,_progress) +# print() + statinfo = os.stat(filepath) +# print('Successfully downloaded', filename, statinfo.st_size, 'bytes.') + tarfile.open(filepath, 'r:gz').extractall(dest_directory) + +def binary_get(): + from agavepy.actors import get_binary_message + return get_binary_message() + +def main(_): + maybe_download_and_extract() + + if FLAGS.bin_message: + image = binary_get() + bin_flag = True + elif FLAGS.bin_data: + image = FLAGS.bin_data + bin_flag = True + elif FLAGS.image_file: + image = FLAGS.image_file + bin_flag = False + else: + image = os.path.join(FLAGS.model_dir, 'cropped_panda.jpg') + bin_flag = False + + run_inference_on_image(image, bin_flag) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + # classify_image_graph_def.pb: + # Binary representation of the GraphDef protocol buffer. + # imagenet_synset_to_human_label_map.txt: + # Map from synset ID to a human readable string. + # imagenet_2012_challenge_label_map_proto.pbtxt: + # Text representation of a protocol buffer mapping a label to synset ID. + parser.add_argument( + '--model_dir', + type=str, + default='/tmp/imagenet', + help="""\ + Path to classify_image_graph_def.pb, + imagenet_synset_to_human_label_map.txt, and + imagenet_2012_challenge_label_map_proto.pbtxt.\ + """ + ) + parser.add_argument( + '--image_file', + type=str, + default='', + help='Absolute path to image file.' + ) + parser.add_argument( + '--num_top_predictions', + type=int, + default=5, + help='Display this many predictions.' + ) + parser.add_argument( + '--bin_data', + type=str, + default='', + help='Inputted binary image data.' + ) + parser.add_argument( + '--bin_message', + action="store_true", + help='Use to show that binary data should be read from stream.' + ) + FLAGS, unparsed = parser.parse_known_args() + tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) diff --git a/samples/binary_message_classifier/entry.sh b/samples/binary_message_classifier/entry.sh new file mode 100755 index 00000000..dcb1c3ea --- /dev/null +++ b/samples/binary_message_classifier/entry.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +cd / +wget $URL -O image.jpg +python /classify_image.py --image_file /image.jpg \ No newline at end of file diff --git a/tests/entry.sh b/tests/entry.sh index bd6eab0d..fe56cb69 100644 --- a/tests/entry.sh +++ b/tests/entry.sh @@ -5,13 +5,20 @@ # # Parameter to the entrypoint. -TEST=$1 +TEST="$1" # if nothing passed, run the full suite if [ -z $TEST ]; then - py.test /tests/test_abaco_core.py + pytest /tests/test_abaco_core.py +elif [ "$#" -eq 2 ]; then + TEST="$1 $2" + echo $TEST + pytest $TEST +elif [ "$#" -eq 3 ]; then + TEST="$1 $2 $3" + pytest $TEST else - py.test $TEST + pytest $TEST fi \ No newline at end of file diff --git a/tests/pytest.ini b/tests/pytest.ini new file mode 100644 index 00000000..9f75a7db --- /dev/null +++ b/tests/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +markers = + queuetest: used for testing custom actor queues + aliastest: used for testing the actor alias functionality diff --git a/tests/test_abaco_core.py b/tests/test_abaco_core.py index d2a9ce58..b5c01c8c 100644 --- a/tests/test_abaco_core.py +++ b/tests/test_abaco_core.py @@ -41,9 +41,10 @@ import pytest import requests import json +import pytest -from actors import health, models, codes, stores - +from actors import health, models, codes, stores, spawner +from channels import ActorMsgChannel, CommandChannel from util import headers, base_url, case, \ response_format, basic_response_checks, get_actor_id, check_execution_details, \ execute_actor, get_tenant, priv_headers, limited_headers @@ -53,6 +54,7 @@ # registration API # ################# +@pytest.mark.regapi def test_dict_to_camel(): dic = {"_links": {"messages": "http://localhost:8000/actors/v2/ca39fac2-60a7-11e6-af60-0242ac110009-059/messages", "owner": "http://localhost:8000/profiles/v2/anonymous", @@ -65,37 +67,52 @@ def test_dict_to_camel(): assert 'executionId' in dcamel assert dcamel['executionId'] == "458ab16c-60a8-11e6-8547-0242ac110008-053" + +@pytest.mark.regapi def test_permission_NONE_READ(): assert codes.NONE < codes.READ +@pytest.mark.regapi def test_permission_NONE_EXECUTE(): assert codes.NONE < codes.EXECUTE +@pytest.mark.regapi def test_permission_NONE_UPDATE(): assert codes.NONE < codes.UPDATE + +@pytest.mark.regapi def test_permission_READ_EXECUTE(): assert codes.READ < codes.EXECUTE + +@pytest.mark.regapi def test_permission_READ_UPDATE(): assert codes.READ < codes.UPDATE + +@pytest.mark.regapi def test_permission_EXECUTE_UPDATE(): assert codes.EXECUTE < codes.UPDATE +@pytest.mark.regapi def test_list_actors(headers): url = '{}/{}'.format(base_url, '/actors') rsp = requests.get(url, headers=headers) result = basic_response_checks(rsp) assert len(result) == 0 + +@pytest.mark.regapi def test_invalid_method_list_actors(headers): url = '{}/{}'.format(base_url, '/actors') rsp = requests.put(url, headers=headers) assert rsp.status_code == 405 response_format(rsp) + +@pytest.mark.regapi def test_list_nonexistent_actor(headers): url = '{}/{}'.format(base_url, '/actors/bad_actor_id') rsp = requests.get(url, headers=headers) @@ -103,6 +120,8 @@ def test_list_nonexistent_actor(headers): data = json.loads(rsp.content.decode('utf-8')) assert data['status'] == 'error' + +@pytest.mark.regapi def test_cors_list_actors(headers): url = '{}/{}'.format(base_url, '/actors') headers['Origin'] = 'http://example.com' @@ -110,6 +129,8 @@ def test_cors_list_actors(headers): basic_response_checks(rsp) assert 'Access-Control-Allow-Origin' in rsp.headers + +@pytest.mark.regapi def test_cors_options_list_actors(headers): url = '{}/{}'.format(base_url, '/actors') headers['Origin'] = 'http://example.com' @@ -121,6 +142,8 @@ def test_cors_options_list_actors(headers): assert 'Access-Control-Allow-Methods' in rsp.headers assert 'Access-Control-Allow-Headers' in rsp.headers + +@pytest.mark.regapi def test_register_actor(headers): url = '{}/{}'.format(base_url, '/actors') data = {'image': 'jstubbs/abaco_test', 'name': 'abaco_test_suite', 'stateless': False} @@ -133,6 +156,8 @@ def test_register_actor(headers): assert result['name'] == 'abaco_test_suite' assert result['id'] is not None + +@pytest.mark.regapi def test_register_alias_actor(headers): url = '{}/{}'.format(base_url, '/actors') data = {'image': 'jstubbs/abaco_test', 'name': 'abaco_test_suite_alias'} @@ -145,6 +170,8 @@ def test_register_alias_actor(headers): assert result['name'] == 'abaco_test_suite_alias' assert result['id'] is not None + +@pytest.mark.regapi def test_register_stateless_actor(headers): url = '{}/{}'.format(base_url, '/actors') # stateless actors are the default now, so stateless tests should pass without specifying "stateless": True @@ -158,6 +185,8 @@ def test_register_stateless_actor(headers): assert result['name'] == 'abaco_test_suite_statelesss' assert result['id'] is not None + +@pytest.mark.regapi def test_register_actor_default_env(headers): url = '{}/{}'.format(base_url, '/actors') data = {'image': 'abacosamples/test', @@ -179,6 +208,8 @@ def test_register_actor_default_env(headers): assert result['name'] == 'abaco_test_suite_default_env' assert result['id'] is not None + +@pytest.mark.regapi def test_register_actor_func(headers): url = '{}/{}'.format(base_url, '/actors') data = {'image': 'abacosamples/py3_func', 'name': 'abaco_test_suite_func'} @@ -191,6 +222,8 @@ def test_register_actor_func(headers): assert result['name'] == 'abaco_test_suite_func' assert result['id'] is not None + +@pytest.mark.regapi def test_invalid_method_get_actor(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}'.format(base_url, actor_id) @@ -199,6 +232,7 @@ def test_invalid_method_get_actor(headers): response_format(rsp) +@pytest.mark.regapi def test_list_actor(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}'.format(base_url, actor_id) @@ -212,6 +246,8 @@ def test_list_actor(headers): assert result['name'] == 'abaco_test_suite' assert result['id'] is not None + +@pytest.mark.regapi def test_list_actor_state(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}/state'.format(base_url, actor_id) @@ -219,6 +255,8 @@ def test_list_actor_state(headers): result = basic_response_checks(rsp) assert 'state' in result + +@pytest.mark.regapi def test_update_actor_state_string(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}/state'.format(base_url, actor_id) @@ -227,6 +265,8 @@ def test_update_actor_state_string(headers): assert 'state' in result assert result['state'] == 'abc' + +@pytest.mark.regapi def test_update_actor_state_dict(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}/state'.format(base_url, actor_id) @@ -240,6 +280,7 @@ def test_update_actor_state_dict(headers): assert result['state'] == {'foo': 'abc', 'bar': 1, 'baz': True} # invalid requests +@pytest.mark.regapi def test_register_without_image(headers): url = '{}/actors'.format(base_url) rsp = requests.post(url, headers=headers, data={}) @@ -249,6 +290,8 @@ def test_register_without_image(headers): message = data['message'] assert 'image' in message + +@pytest.mark.regapi def test_register_with_invalid_stateless(headers): url = '{}/{}'.format(base_url, '/actors') data = {'image': 'abacosamples/test', @@ -262,6 +305,8 @@ def test_register_with_invalid_stateless(headers): message = data['message'] assert 'stateless' in message + +@pytest.mark.regapi def test_register_with_invalid_container_uid(headers): url = '{}/{}'.format(base_url, '/actors') field = 'use_container_uid' @@ -279,6 +324,7 @@ def test_register_with_invalid_container_uid(headers): message = data['message'] assert field in message +@pytest.mark.regapi def test_register_with_invalid_def_env(headers): url = '{}/{}'.format(base_url, '/actors') field = 'default_environment' @@ -296,6 +342,8 @@ def test_register_with_invalid_def_env(headers): message = data['message'] assert field in message + +@pytest.mark.regapi def test_cant_register_max_workers_stateful(headers): url = '{}/{}'.format(base_url, '/actors') field = 'max_workers' @@ -313,12 +361,16 @@ def test_cant_register_max_workers_stateful(headers): message = data['message'] assert "stateful actors can only have 1 worker" in message + +@pytest.mark.regapi def test_register_with_put(headers): url = '{}/actors'.format(base_url) rsp = requests.put(url, headers=headers, data={'image': 'abacosamples/test'}) response_format(rsp) assert rsp.status_code not in range(1, 399) + +@pytest.mark.regapi def test_cant_update_stateless_actor_state(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_statelesss') url = '{}/actors/{}/state'.format(base_url, actor_id) @@ -327,6 +379,7 @@ def test_cant_update_stateless_actor_state(headers): assert rsp.status_code not in range(1, 399) # invalid check having to do with authorization +@pytest.mark.regapi def test_cant_set_max_workers_limited(headers): url = '{}/{}'.format(base_url, '/actors') field = 'max_workers' @@ -340,6 +393,7 @@ def test_cant_set_max_workers_limited(headers): response_format(rsp) assert rsp.status_code not in range(1, 399) +@pytest.mark.regapi def test_cant_set_max_cpus_limited(headers): url = '{}/{}'.format(base_url, '/actors') field = 'max_cpus' @@ -353,6 +407,8 @@ def test_cant_set_max_cpus_limited(headers): response_format(rsp) assert rsp.status_code not in range(1, 399) + +@pytest.mark.regapi def test_cant_set_mem_limit_limited(headers): url = '{}/{}'.format(base_url, '/actors') field = 'mem_limit' @@ -383,25 +439,38 @@ def check_actor_is_ready(headers, actor_id=None): count += 1 assert False + +@pytest.mark.regapi def test_basic_actor_is_ready(headers): check_actor_is_ready(headers) + +@pytest.mark.regapi def test_alias_actor_is_ready(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_alias') check_actor_is_ready(headers, actor_id) + +@pytest.mark.regapi def test_stateless_actor_is_ready(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_statelesss') check_actor_is_ready(headers, actor_id) + + +@pytest.mark.regapi def test_default_env_actor_is_ready(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_default_env') check_actor_is_ready(headers, actor_id) + +@pytest.mark.regapi def test_func_actor_is_ready(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_func') check_actor_is_ready(headers, actor_id) + +@pytest.mark.regapi def test_executions_empty_list(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}/executions'.format(base_url, actor_id) @@ -490,7 +559,6 @@ def test_execute_basic_actor(headers): data = {'message': 'testing execution'} execute_actor(headers, actor_id, data=data) - def test_execute_default_env_actor(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_default_env') data = {'message': 'testing execution'} @@ -561,7 +629,6 @@ def test_invalid_method_get_execution(headers): assert rsp.status_code == 405 response_format(rsp) - def test_invalid_method_get_execution_logs(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}/executions'.format(base_url, actor_id) @@ -573,7 +640,6 @@ def test_invalid_method_get_execution_logs(headers): assert rsp.status_code == 405 response_format(rsp) - def test_list_execution_logs(headers): actor_id = get_actor_id(headers) # get execution id @@ -594,13 +660,11 @@ def test_list_execution_logs(headers): assert '_abaco_execution_id' in result['logs'] assert '_abaco_Content_Type' in result['logs'] - def test_execute_actor_json(headers): actor_id = get_actor_id(headers) data = {'key1': 'value1', 'key2': 'value2'} execute_actor(headers, actor_id=actor_id, json_data=data) - def test_update_actor(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}'.format(base_url, actor_id) @@ -642,6 +706,132 @@ def test_update_actor_other_user(headers): else: assert not result['lastUpdateTime'] == orig_actor['lastUpdateTime'] +############### +# actor queue +# tests +############### + + +CH_NAME_1 = 'special' +CH_NAME_2 = 'default' + + +@pytest.mark.queuetest +def test_create_actor_with_custom_queue_name(headers): + url = '{}/actors'.format(base_url) + data = { + 'image': 'jstubbs/abaco_test', + 'name': 'abaco_test_suite_queue1_actor1', + 'stateless': False, + 'queue': CH_NAME_1 + } + rsp = requests.post(url, data=data, headers=headers) + result = basic_response_checks(rsp) + assert result['queue'] == CH_NAME_1 + +@pytest.mark.xfail +@pytest.mark.queuetest +def test_actor_uses_custom_queue(headers): + url = '{}/actors'.format(base_url) + # get the actor id of an actor registered on the defaul queue: + default_queue_actor_id = get_actor_id(headers, name='abaco_test_suite_statelesss') + # and the actor id for the actor on the special queue: + special_queue_actor_id = get_actor_id(headers, name='abaco_test_suite_queue1_actor1') + + # send a request to start a bunch of workers for that actor; this should keep the default + # spawner busy for some time: + url = '{}/actors/{}/workers'.format(base_url, default_queue_actor_id) + data = {'num': '5'} + rsp = requests.post(url, data=data, headers=headers) + + # now, try to start a second worker for the abaco_test_suite_queue1_actor1 actor. + url = '{}/actors/{}/workers'.format(base_url, special_queue_actor_id) + data = {'num': '2'} + rsp = requests.post(url, data=data, headers=headers) + basic_response_checks(rsp) + # ensure that worker is started within a small time window: + ch = CommandChannel(name=CH_NAME_1) + i = 0 + while True: + time.sleep(2) + if len(ch._queue._queue) == 0: + break + i = i + 1 + if i > 10: + assert False + # wait for workers to be ready and the shut them down + url = '{}/actors/{}/workers'.format(base_url, default_queue_actor_id) + check = True + i = 0 + while check and i < 10: + time.sleep(5) + rsp = requests.get(url, headers=headers) + result = basic_response_checks(rsp) + for w in result: + if w['status'] == 'REQUESTED': + i = i + 1 + continue + check = False + # remove all workers - + rsp = requests.get(url, data=data, headers=headers) + result = basic_response_checks(rsp) + for w in result: + url = '{}/actors/{}/workers/{}'.format(base_url, default_queue_actor_id, w['id']) + rsp = requests.delete(url, headers=headers) + basic_response_checks(rsp) + # check that workers are gone - + url = '{}/actors/{}/workers'.format(base_url, default_queue_actor_id) + check = True + i = 0 + while check and i < 10: + time.sleep(5) + rsp = requests.get(url, headers=headers) + result = basic_response_checks(rsp) + try: + if len(result) == 0: + check = False + else: + i = i + 1 + except: + check = False + +# @pytest.mark.queuetest +# def test_custom_actor_queue_with_autoscaling(headers): +# url = '{}/{}'.format(base_url, '/actors') +# data = { +# 'image': 'jstubbs/abaco_test', +# 'name': 'abaco_test_queue3', +# 'stateless': False, +# 'queue': CH_NAME_1 +# } +# rsp = requests.post(url, data=data, headers=headers) +# result = basic_response_checks(rsp) +# assert result['queue'] == CH_NAME_1 +# +# actor_id = get_actor_id(headers, name='abaco_test_queue2') +# data = {'message': 'testing execution'} +# url = '{}/actors/{}/messages'.format(base_url, actor_id) +# +# for i in range(50): +# rsp = requests.post(url, data=data, headers=headers) +# +# url = '{}/actors/{}/workers'.format(base_url, actor_id) +# rsp = requests.get(url, headers=headers) +# # workers collection returns the tenant_id since it is an admin api +# result = basic_response_checks(rsp, check_tenant=False) +# assert len(result) > 1 +# # get the first worker +# # worker = result[0] + + +@pytest.mark.queuetest +def test_actor_with_default_queue(headers): + pass + + +@pytest.mark.queuetest +def test_2_actors_with_different_queues(headers): + pass # ########## # alias API @@ -650,6 +840,8 @@ def test_update_actor_other_user(headers): ALIAS_1 = 'jane' ALIAS_2 = 'doe' + +@pytest.mark.aliastest def test_add_alias(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_alias') url = '{}/actors/aliases'.format(base_url) @@ -663,6 +855,8 @@ def test_add_alias(headers): assert result['alias'] == ALIAS_1 assert result[field] == actor_id + +@pytest.mark.aliastest def test_add_second_alias(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_alias') url = '{}/actors/aliases'.format(base_url) @@ -677,6 +871,8 @@ def test_add_second_alias(headers): assert result['alias'] == ALIAS_2 assert result[field] == actor_id + +@pytest.mark.aliastest def test_cant_add_same_alias(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_alias') url = '{}/actors/aliases'.format(base_url) @@ -690,6 +886,8 @@ def test_cant_add_same_alias(headers): data = response_format(rsp) assert 'already exists' in data['message'] + +@pytest.mark.aliastest def test_list_aliases(headers): url = '{}/actors/aliases'.format(base_url) rsp = requests.get(url, headers=headers) @@ -701,6 +899,8 @@ def test_list_aliases(headers): assert 'alias' in alias assert field in alias + +@pytest.mark.aliastest def test_list_alias(headers): url = '{}/actors/aliases/{}'.format(base_url, ALIAS_1) rsp = requests.get(url, headers=headers) @@ -713,6 +913,8 @@ def test_list_alias(headers): assert result[field] == actor_id assert result['alias'] == ALIAS_1 + +@pytest.mark.aliastest def test_list_alias_permission(headers): # first, get the alias to determine the owner url = '{}/actors/aliases/{}'.format(base_url, ALIAS_1) @@ -727,6 +929,8 @@ def test_list_alias_permission(headers): assert owner in result assert result[owner] == 'UPDATE' + +@pytest.mark.aliastest def test_other_user_cant_list_alias(headers): url = '{}/actors/aliases/{}'.format(base_url, ALIAS_1) rsp = requests.get(url, headers=priv_headers()) @@ -734,6 +938,8 @@ def test_other_user_cant_list_alias(headers): assert rsp.status_code == 400 assert 'you do not have access to this alias' in data['message'] + +@pytest.mark.aliastest def test_add_alias_permission(headers): user = 'testshareuser' data = {'user': user, 'level': 'UPDATE'} @@ -743,12 +949,16 @@ def test_add_alias_permission(headers): assert user in result assert result[user] == 'UPDATE' + +@pytest.mark.aliastest def test_other_user_can_now_list_alias(headers): url = '{}/actors/aliases/{}'.format(base_url, ALIAS_1) rsp = requests.get(url, headers=priv_headers()) result = basic_response_checks(rsp) assert 'alias' in result + +@pytest.mark.aliastest def test_other_user_still_cant_list_actor(headers): # alias permissions do not confer access to the actor itself - url = '{}/actors/{}'.format(base_url, ALIAS_1) @@ -757,6 +967,8 @@ def test_other_user_still_cant_list_actor(headers): data = response_format(rsp) assert 'you do not have access to this actor' in data['message'] + +@pytest.mark.aliastest def test_get_actor_with_alias(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_alias') url = '{}/actors/{}'.format(base_url, ALIAS_1) @@ -764,6 +976,8 @@ def test_get_actor_with_alias(headers): result = basic_response_checks(rsp) assert result['id'] == actor_id + +@pytest.mark.aliastest def test_get_actor_messages_with_alias(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_alias') url = '{}/actors/{}/messages'.format(base_url, ALIAS_1) @@ -772,6 +986,8 @@ def test_get_actor_messages_with_alias(headers): assert actor_id in result['_links']['self'] assert 'messages' in result + +@pytest.mark.aliastest def test_get_actor_executions_with_alias(headers): actor_id = get_actor_id(headers, name='abaco_test_suite_alias') url = '{}/actors/{}/executions'.format(base_url, ALIAS_1) @@ -780,6 +996,8 @@ def test_get_actor_executions_with_alias(headers): assert actor_id in result['_links']['self'] assert 'executions' in result + +@pytest.mark.aliastest def test_owner_can_delete_alias(headers): url = '{}/actors/aliases/{}'.format(base_url, ALIAS_2) rsp = requests.delete(url, headers=headers) @@ -792,6 +1010,8 @@ def test_owner_can_delete_alias(headers): for alias in result: assert not alias['alias'] == ALIAS_2 + +@pytest.mark.aliastest def test_other_user_can_delete_shared_alias(headers): url = '{}/actors/aliases/{}'.format(base_url, ALIAS_1) rsp = requests.delete(url, headers=priv_headers()) @@ -1019,8 +1239,8 @@ def test_invalid_method_get_nonce(headers): # ################ def check_worker_fields(worker): - assert worker.get('image') == 'jstubbs/abaco_test' assert worker.get('status') in ['READY', 'BUSY'] + assert worker.get('image') == 'jstubbs/abaco_test' or worker.get('image') == 'jstubbs/abaco_test2' assert worker.get('location') assert worker.get('cid') assert worker.get('tenant') @@ -1033,7 +1253,6 @@ def check_worker_fields(worker): assert 'lastExecutionTime' in worker assert 'lastHealthCheckTime' in worker - def test_list_workers(headers): actor_id = get_actor_id(headers) url = '{}/actors/{}/workers'.format(base_url, actor_id)