diff --git a/src/pubtools/_marketplacesvm/tasks/push/command.py b/src/pubtools/_marketplacesvm/tasks/push/command.py index cd8b99d..b1015e7 100644 --- a/src/pubtools/_marketplacesvm/tasks/push/command.py +++ b/src/pubtools/_marketplacesvm/tasks/push/command.py @@ -1,5 +1,4 @@ # SPDX-License-Identifier: GPL-3.0-or-later -import asyncio import datetime import json import logging @@ -257,7 +256,7 @@ def _push_pre_publish(self, upload_result: List[UPLOAD_RESULT]) -> List[UPLOAD_R res.append((mapped_item, starmap_query)) return res - async def _push_publish(self, upload_result: List[UPLOAD_RESULT]) -> List[Dict[str, Any]]: + def _push_publish(self, upload_result: List[UPLOAD_RESULT]) -> List[Dict[str, Any]]: """ Perform the publishing for the the VM images. @@ -268,7 +267,7 @@ async def _push_publish(self, upload_result: List[UPLOAD_RESULT]) -> List[Dict[s Dictionary with the resulting operation for the Collector service. """ - async def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, Any]: + def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, Any]: # Get the push item for the current marketplace pi = mapped_item.get_push_item_for_marketplace(marketplace) @@ -315,15 +314,20 @@ async def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, An # Sequentially publish the uploaded items for each marketplace. # It's recommended to do this operation sequentially since parallel publishing in the # same marketplace may cause errors due to the change set already being applied. - res_output.extend( - await asyncio.gather( - *( - push_function(mapped_item, marketplace, starmap_query) - for mapped_item, starmap_query in upload_result - for marketplace in mapped_item.marketplaces - ) + for mapped_item, starmap_query in upload_result: + to_await = [] + executor = Executors.thread_pool( + name="pubtools-marketplacesvm-push-regions", + max_workers=min(max(len(mapped_item.marketplaces), 1), self._PROCESS_THREADS), ) - ) + + for marketplace in mapped_item.marketplaces: + to_await.append( + executor.submit(push_function, mapped_item, marketplace, starmap_query) + ) + + for f_out in to_await: + res_output.append(f_out.result()) return res_output @@ -418,7 +422,7 @@ def run(self, collect_results: bool = True, allow_empty_targets: bool = False) - upload_result = self._push_pre_publish(upload_result) # 4 - Publish the uploaded images letting the external function to control the threads - result = asyncio.run(self._push_publish(upload_result)) + result = self._push_publish(upload_result) # process result for failures failed = False