diff --git a/locust_plugins/__init__.py b/locust_plugins/__init__.py index 8c33754..47f5a43 100644 --- a/locust_plugins/__init__.py +++ b/locust_plugins/__init__.py @@ -1,4 +1,4 @@ -from .wait_time import constant_ips, constant_total_ips +from .wait_time import constant_total_ips import locust from locust.user.task import DefaultTaskSet, TaskSet from locust import events diff --git a/locust_plugins/distributor.py b/locust_plugins/distributor.py index dae0f4a..de8e2c9 100644 --- a/locust_plugins/distributor.py +++ b/locust_plugins/distributor.py @@ -2,6 +2,7 @@ import logging from gevent.event import AsyncResult import greenlet +import gevent from locust.env import Environment from locust.runners import WorkerRunner @@ -23,12 +24,8 @@ def __init__(self, environment: Environment, iterator: Optional[Iterator], name= if self.runner: # received on master def _distributor_request(environment: Environment, msg, **kwargs): - item = next(self.iterator) - self.runner.send_message( - f"_{name}_response", - {"item": item, "gid": msg.data["gid"]}, - client_id=msg.data["client_id"], - ) + # do this in the background to avoid blocking locust's client_listener loop + gevent.spawn(self._master_next_and_send, msg.data["gid"], msg.data["client_id"]) # received on worker def _distributor_response(environment: Environment, msg, **kwargs): @@ -37,6 +34,14 @@ def _distributor_response(environment: Environment, msg, **kwargs): self.runner.register_message(f"_{name}_request", _distributor_request) self.runner.register_message(f"_{name}_response", _distributor_response) + def _master_next_and_send(self, gid, client_id): + item = next(self.iterator) + self.runner.send_message( + f"_{self.name}_response", + {"item": item, "gid": gid}, + client_id=client_id, + ) + def __next__(self): """Get the next data dict from iterator diff --git a/locust_plugins/mongoreader.py b/locust_plugins/mongoreader.py index 87089fb..a2242d2 100644 --- a/locust_plugins/mongoreader.py +++ b/locust_plugins/mongoreader.py @@ -46,7 +46,7 @@ def __next__(self) -> dict: try: with dblock: doc: dict = next(self.cursor) - self.coll.find_one_and_update( + self.coll.update_one( {"_id": doc["_id"]}, {"$set": {self.timestamp_field: datetime.now(tz=timezone.utc)}}, ) diff --git a/locust_plugins/wait_time.py b/locust_plugins/wait_time.py index c7afb71..e7216a8 100644 --- a/locust_plugins/wait_time.py +++ b/locust_plugins/wait_time.py @@ -1,4 +1,3 @@ -import sys import logging import time from collections import deque, namedtuple @@ -57,11 +56,5 @@ def func(user): return func -def constant_ips(_ips): - """this function has been removed, now that it is available in locust.wait_time.constant_throughput""" - logging.error("constant_ips has been removed, now that it is available in locust.wait_time.constant_throughput") - sys.exit(1) - - def constant_total_pacing(seconds: float): return constant_total_ips(1.0 / seconds)