Skip to content

Commit

Permalink
Initial commit of large event fix
Browse files Browse the repository at this point in the history
Clean up un-yielded coroutines
  • Loading branch information
dwoz committed Dec 19, 2024
1 parent 7829add commit c4017fb
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 31 deletions.
105 changes: 75 additions & 30 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1663,20 +1663,28 @@ def _send_req_async(self, load, timeout):
)
raise salt.ext.tornado.gen.Return(ret)

def _fire_master(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
sync=True,
timeout_handler=None,
include_startup_grains=False,
):
@salt.ext.tornado.gen.coroutine
def _send_req_async_main(self, load, timeout):
"""
Fire an event on the master, or drop message if unable to send.
Send a request to the master's request server. To be called from the
top level process in the main thread only. Worker threads and
processess should call _send_req_sync or _send_req_async as nessecery.
"""
if self.opts["minion_sign_messages"]:
log.trace("Signing event to be published onto the bus.")
minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
sig = salt.crypt.sign_message(
minion_privkey_path, salt.serializers.msgpack.serialize(load)
)
load["sig"] = sig
ret = yield self.req_channel.send(
load, timeout=timeout, tries=self.opts["return_retry_tries"]
)
raise salt.ext.tornado.gen.Return(ret)

def _fire_master_prepare(
self, data, tag, events, pretag, include_startup_grains=False
):
load = {
"id": self.opts["id"],
"cmd": "_minion_event",
Expand All @@ -1701,7 +1709,52 @@ def _fire_master(
if k in self.opts["start_event_grains"]
}
load["grains"] = grains_to_add
return load

@salt.ext.tornado.gen.coroutine
def _fire_master_main(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
timeout_handler=None,
include_startup_grains=False,
):
load = self._fire_master_prepare(
data, tag, events, pretag, include_startup_grains
)
if timeout_handler is None:

def handle_timeout(*_):
log.info(
"fire_master failed: master could not be contacted. Request"
" timed out."
)
return True

timeout_handler = handle_timeout

yield self._send_req_async_main(load, timeout)

def _fire_master(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
sync=True,
timeout_handler=None,
include_startup_grains=False,
):
"""
Fire an event on the master, or drop message if unable to send.
"""
load = self._fire_master_prepare(
data, tag, events, pretag, include_startup_grains
)
if sync:
try:
self._send_req_sync(load, timeout)
Expand All @@ -1726,10 +1779,8 @@ def handle_timeout(*_):

timeout_handler = handle_timeout

with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
# pylint: disable=unexpected-keyword-arg
self._send_req_async(load, timeout, callback=lambda f: None)
# pylint: enable=unexpected-keyword-arg
# Returning a coroutine, should be awaited
return self._send_req_async(load, timeout)
return True

@salt.ext.tornado.gen.coroutine
Expand Down Expand Up @@ -2306,12 +2357,7 @@ def timeout_handler(*_):
timeout_handler()
return ""
else:
with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
# pylint: disable=unexpected-keyword-arg
ret_val = self._send_req_async(
load, timeout=timeout, callback=lambda f: None
)
# pylint: enable=unexpected-keyword-arg
ret_val = self._send_req_async(load, timeout=timeout)

log.trace("ret_val = %s", ret_val) # pylint: disable=no-member
return ret_val
Expand Down Expand Up @@ -2792,12 +2838,11 @@ def handle_event(self, package):
elif tag.startswith("fire_master"):
if self.connected:
log.debug("Forwarding master event tag=%s", data["tag"])
self._fire_master(
yield self._fire_master_main(
data["data"],
data["tag"],
data["events"],
data["pretag"],
sync=False,
)
elif tag.startswith(master_event(type="disconnected")) or tag.startswith(
master_event(type="failback")
Expand Down Expand Up @@ -2954,11 +2999,11 @@ def handle_event(self, package):
1
],
)
self._return_pub(data, ret_cmd="_return", sync=False)
yield self._return_pub(data, ret_cmd="_return", sync=False)
elif tag.startswith("_salt_error"):
if self.connected:
log.debug("Forwarding salt error event tag=%s", tag)
self._fire_master(data, tag, sync=False)
yield self._fire_master_main(data, tag)
elif tag.startswith("salt/auth/creds"):
key = tuple(data["key"])
log.debug(
Expand All @@ -2971,7 +3016,7 @@ def handle_event(self, package):
elif tag.startswith("__beacons_return"):
if self.connected:
log.debug("Firing beacons to master")
self._fire_master(events=data["beacons"])
yield self._fire_master_main(events=data["beacons"])

def cleanup_subprocesses(self):
"""
Expand Down Expand Up @@ -3373,12 +3418,12 @@ def fire_master_syndic_start(self):
self._fire_master(
"Syndic {} started at {}".format(self.opts["id"], time.asctime()),
"syndic_start",
sync=False,
sync=True, # sync needs to be false unless called from coroutine.
)
self._fire_master(
"Syndic {} started at {}".format(self.opts["id"], time.asctime()),
tagify([self.opts["id"], "start"], "syndic"),
sync=False,
sync=True, # sync needs to be false unless called from coroutine.
)

# TODO: clean up docs
Expand Down Expand Up @@ -3769,7 +3814,7 @@ def _forward_events(self):
"events": events,
"pretag": tagify(self.opts["id"], base="syndic"),
"timeout": self._return_retry_timer(),
"sync": False,
"sync": True, # Sync needs to be true unless being called from a coroutine
},
)
if self.delayed:
Expand Down
2 changes: 1 addition & 1 deletion salt/utils/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
close_methods=None,
loop_kwarg=None,
):
self.io_loop = salt.ext.tornado.ioloop.IOLoop()
self.io_loop = salt.ext.tornado.ioloop.IOLoop(make_current=False)
if args is None:
args = []
if kwargs is None:
Expand Down

0 comments on commit c4017fb

Please sign in to comment.