diff --git a/salt/minion.py b/salt/minion.py index 2c2585637420..c2afbbf1f594 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1663,20 +1663,28 @@ def _send_req_async(self, load, timeout): ) raise salt.ext.tornado.gen.Return(ret) - def _fire_master( - self, - data=None, - tag=None, - events=None, - pretag=None, - timeout=60, - sync=True, - timeout_handler=None, - include_startup_grains=False, - ): + @salt.ext.tornado.gen.coroutine + def _send_req_async_main(self, load, timeout): """ - Fire an event on the master, or drop message if unable to send. + Send a request to the master's request server. To be called from the + top level process in the main thread only. Worker threads and + processess should call _send_req_sync or _send_req_async as nessecery. """ + if self.opts["minion_sign_messages"]: + log.trace("Signing event to be published onto the bus.") + minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem") + sig = salt.crypt.sign_message( + minion_privkey_path, salt.serializers.msgpack.serialize(load) + ) + load["sig"] = sig + ret = yield self.req_channel.send( + load, timeout=timeout, tries=self.opts["return_retry_tries"] + ) + raise salt.ext.tornado.gen.Return(ret) + + def _fire_master_prepare( + self, data, tag, events, pretag, include_startup_grains=False + ): load = { "id": self.opts["id"], "cmd": "_minion_event", @@ -1701,7 +1709,52 @@ def _fire_master( if k in self.opts["start_event_grains"] } load["grains"] = grains_to_add + return load + + @salt.ext.tornado.gen.coroutine + def _fire_master_main( + self, + data=None, + tag=None, + events=None, + pretag=None, + timeout=60, + timeout_handler=None, + include_startup_grains=False, + ): + load = self._fire_master_prepare( + data, tag, events, pretag, include_startup_grains + ) + if timeout_handler is None: + + def handle_timeout(*_): + log.info( + "fire_master failed: master could not be contacted. Request" + " timed out." + ) + return True + + timeout_handler = handle_timeout + + yield self._send_req_async_main(load, timeout) + def _fire_master( + self, + data=None, + tag=None, + events=None, + pretag=None, + timeout=60, + sync=True, + timeout_handler=None, + include_startup_grains=False, + ): + """ + Fire an event on the master, or drop message if unable to send. + """ + load = self._fire_master_prepare( + data, tag, events, pretag, include_startup_grains + ) if sync: try: self._send_req_sync(load, timeout) @@ -1726,10 +1779,8 @@ def handle_timeout(*_): timeout_handler = handle_timeout - with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler): - # pylint: disable=unexpected-keyword-arg - self._send_req_async(load, timeout, callback=lambda f: None) - # pylint: enable=unexpected-keyword-arg + # Returning a coroutine, should be awaited + return self._send_req_async(load, timeout) return True @salt.ext.tornado.gen.coroutine @@ -2306,12 +2357,7 @@ def timeout_handler(*_): timeout_handler() return "" else: - with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler): - # pylint: disable=unexpected-keyword-arg - ret_val = self._send_req_async( - load, timeout=timeout, callback=lambda f: None - ) - # pylint: enable=unexpected-keyword-arg + ret_val = self._send_req_async(load, timeout=timeout) log.trace("ret_val = %s", ret_val) # pylint: disable=no-member return ret_val @@ -2792,12 +2838,11 @@ def handle_event(self, package): elif tag.startswith("fire_master"): if self.connected: log.debug("Forwarding master event tag=%s", data["tag"]) - self._fire_master( + yield self._fire_master_main( data["data"], data["tag"], data["events"], data["pretag"], - sync=False, ) elif tag.startswith(master_event(type="disconnected")) or tag.startswith( master_event(type="failback") @@ -2954,11 +2999,11 @@ def handle_event(self, package): 1 ], ) - self._return_pub(data, ret_cmd="_return", sync=False) + yield self._return_pub(data, ret_cmd="_return", sync=False) elif tag.startswith("_salt_error"): if self.connected: log.debug("Forwarding salt error event tag=%s", tag) - self._fire_master(data, tag, sync=False) + yield self._fire_master_main(data, tag) elif tag.startswith("salt/auth/creds"): key = tuple(data["key"]) log.debug( @@ -2971,7 +3016,7 @@ def handle_event(self, package): elif tag.startswith("__beacons_return"): if self.connected: log.debug("Firing beacons to master") - self._fire_master(events=data["beacons"]) + yield self._fire_master_main(events=data["beacons"]) def cleanup_subprocesses(self): """ @@ -3373,12 +3418,12 @@ def fire_master_syndic_start(self): self._fire_master( "Syndic {} started at {}".format(self.opts["id"], time.asctime()), "syndic_start", - sync=False, + sync=True, # sync needs to be false unless called from coroutine. ) self._fire_master( "Syndic {} started at {}".format(self.opts["id"], time.asctime()), tagify([self.opts["id"], "start"], "syndic"), - sync=False, + sync=True, # sync needs to be false unless called from coroutine. ) # TODO: clean up docs @@ -3769,7 +3814,7 @@ def _forward_events(self): "events": events, "pretag": tagify(self.opts["id"], base="syndic"), "timeout": self._return_retry_timer(), - "sync": False, + "sync": True, # Sync needs to be true unless being called from a coroutine }, ) if self.delayed: diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py index f0048ff19102..911088a3c29b 100644 --- a/salt/utils/asynchronous.py +++ b/salt/utils/asynchronous.py @@ -50,7 +50,7 @@ def __init__( close_methods=None, loop_kwarg=None, ): - self.io_loop = salt.ext.tornado.ioloop.IOLoop() + self.io_loop = salt.ext.tornado.ioloop.IOLoop(make_current=False) if args is None: args = [] if kwargs is None: