diff --git a/src/pubtools/_marketplacesvm/tasks/push/command.py b/src/pubtools/_marketplacesvm/tasks/push/command.py index d0a898d..f8806fe 100644 --- a/src/pubtools/_marketplacesvm/tasks/push/command.py +++ b/src/pubtools/_marketplacesvm/tasks/push/command.py @@ -1,5 +1,4 @@ # SPDX-License-Identifier: GPL-3.0-or-later -import asyncio import datetime import json import logging @@ -259,7 +258,7 @@ def _push_pre_publish(self, upload_result: List[UPLOAD_RESULT]) -> List[UPLOAD_R res.append((mapped_item, starmap_query)) return res - async def _push_publish(self, upload_result: List[UPLOAD_RESULT]) -> List[Dict[str, Any]]: + def _push_publish(self, upload_result: List[UPLOAD_RESULT]) -> List[Dict[str, Any]]: """ Perform the publishing for the the VM images. @@ -270,7 +269,7 @@ async def _push_publish(self, upload_result: List[UPLOAD_RESULT]) -> List[Dict[s Dictionary with the resulting operation for the Collector service. """ - async def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, Any]: + def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, Any]: # Get the push item for the current marketplace pi = mapped_item.get_push_item_for_marketplace(marketplace) @@ -304,17 +303,22 @@ async def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, An res_output = [] - # This publishes in parallel since there shouldn't be collision between the - # differing marketplaces. - res_output.extend( - await asyncio.gather( - *( - push_function(mapped_item, marketplace, starmap_query) - for mapped_item, starmap_query in upload_result - for marketplace in mapped_item.marketplaces - ) + # Sequentially publish the uploaded items for each marketplace. + # It's recommended to do this operation sequentially since parallel publishing in the + # same marketplace may cause errors due to the change set already being applied. + for mapped_item, starmap_query in upload_result: + to_await = [] + executor = Executors.thread_pool( + name="pubtools-marketplacesvm-push-regions", + max_workers=min(max(len(mapped_item.marketplaces), 1), self._PROCESS_THREADS), ) - ) + for marketplace in mapped_item.marketplaces: + to_await.append( + executor.submit(push_function, mapped_item, marketplace, starmap_query) + ) + + for f_out in to_await: + res_output.append(f_out.result()) return res_output @@ -411,7 +415,7 @@ def run(self, collect_results: bool = True, allow_empty_targets: bool = False) - upload_result = self._push_pre_publish(upload_result) # 4 - Publish the uploaded images letting the external function to control the threads - result = asyncio.run(self._push_publish(upload_result)) + result = self._push_publish(upload_result) # process result for failures failed = False