Skip to content

Commit

Permalink
Actually allow the worker thread to use the configured number of thre…
Browse files Browse the repository at this point in the history
…ads.

This also changes the PostDownloadHandler to be run in the main thread
-- this should avoid some CME issues.

We also run the actual downloads in the plugin worker threads, instead of
in the MainApplication.worker thread.

We also have better canceling of tasks in the queue.

Signed-off-by: Taylor Smock <[email protected]>
  • Loading branch information
tsmock committed Sep 6, 2022
1 parent 476d84b commit acb6aec
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ private static Collection<Bounds> getExisting(Class<?> klass) {
*/
private static void download(Collection<Bounds> bboxes, Class<?> klass) {
for (Bounds bbox : bboxes) {
// This returns a task that has been started on a worker thread.
// if DownloadOsmTask2, it is on DownloadPlugin.worker
// Otherwise, it is on MainApplication.worker
AbstractDownloadTask<?> task = getDownloadTask(klass);

ProgressMonitor monitor = null;
Expand All @@ -157,7 +160,9 @@ private static void download(Collection<Bounds> bboxes, Class<?> klass) {
}

Future<?> future = task.download(new DownloadParams(), bbox, monitor);
DownloadPlugin.worker.execute(new PostDownloadHandler(task, future));
// Run the PostDownloadHandler on the main worker thread.
// This should tend to be the bit where we may run into concurrent modification exceptions.
MainApplication.worker.execute(new PostDownloadHandler(task, future));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public Future<?> download(OsmServerReader reader, DownloadParams settings, Bound
return download(new DownloadTask2(settings, reader, progressMonitor), downloadArea);
}

@Override
protected Future<?> download(DownloadTask downloadTask, Bounds downloadArea) {
// This method needs to be overridden to avoid using JOSM's MainApplication.worker for downloads
this.downloadTask = downloadTask;
this.currentBounds = new Bounds(downloadArea);
// We need submit instead of execute so we can wait for it to finish and get the error
// message if necessary. If no one calls getErrorMessage() it just behaves like execute.
return DownloadPlugin.worker.submit(downloadTask);
}

protected class DownloadTask2 extends DownloadTask {
public DownloadTask2(DownloadParams settings, OsmServerReader reader,
ProgressMonitor progressMonitor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -25,12 +26,14 @@

import org.openstreetmap.josm.actions.JosmAction;
import org.openstreetmap.josm.data.Bounds;
import org.openstreetmap.josm.data.preferences.IntegerProperty;
import org.openstreetmap.josm.gui.MainApplication;
import org.openstreetmap.josm.gui.MainMenu;
import org.openstreetmap.josm.gui.MapView;
import org.openstreetmap.josm.gui.NavigatableComponent;
import org.openstreetmap.josm.gui.NavigatableComponent.ZoomChangeListener;
import org.openstreetmap.josm.gui.Notification;
import org.openstreetmap.josm.gui.PleaseWaitRunnable;
import org.openstreetmap.josm.gui.preferences.PreferenceSetting;
import org.openstreetmap.josm.gui.util.GuiHelper;
import org.openstreetmap.josm.io.OsmApiException;
Expand All @@ -45,20 +48,27 @@
* The POJO class for Continuous Download
*/
public class DownloadPlugin extends Plugin implements ZoomChangeListener, Destroyable {
private static final IntegerProperty maxThreads = new IntegerProperty("plugin.continuos_download.max_threads", 2);

private static final List<Consumer<Exception>> exceptionConsumers = new ArrayList<>();

/**
* The worker that runs all our downloads, it have more threads than
* The worker that runs all our downloads, it has more threads than
* {@link MainApplication#worker}.
*/
public static final ExecutorService worker = new ThreadPoolExecutor(1,
Config.getPref().getInt("plugin.continuos_download.max_threads", 2), 1, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
public static final ExecutorService worker = new ThreadPoolExecutor(
/*
* maximumPoolSize only matters when the queue is full. Which should never happen (Integer.MAX_VALUE).
* We will set core size to maxThreads and allow them to time out
*/
maxThreads.get(), maxThreads.get(), 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory());
private static final HashMap<String, AbstractDownloadStrategy> strats = new HashMap<>();
static {
registerStrat(new SimpleStrategy());
registerStrat(new BoxStrategy());
// This ensures that threads will be destroyed when not used.
((ThreadPoolExecutor) worker).allowCoreThreadTimeOut(true);
}
private Timer timer;
private TimerTask task;
Expand All @@ -84,6 +94,8 @@ public DownloadPlugin(PluginInformation info) {
menuItem = MainMenu.addWithCheckbox(MainApplication.getMenu().fileMenu, toggle,
MainMenu.WINDOW_MENU_GROUP.ALWAYS);
menuItem.setState(active);
// If the user re-enables continuousDownload, reset the zoom disabled level.
menuItem.addChangeListener(l -> this.zoomDisabled = null);
toggle.addButtonModel(menuItem.getModel());
exceptionConsumers.add(this::handleException);
}
Expand Down Expand Up @@ -146,7 +158,17 @@ public AbstractDownloadStrategy getStrat() {
private void handleException(final Exception exception) {
if (exception instanceof OsmApiException && ((OsmApiException) exception).getErrorHeader().contains("requested too many")) {
this.active = false;
this.menuItem.setSelected(false);
GuiHelper.runInEDT(() -> this.menuItem.setSelected(false));
final ThreadPoolExecutor executor = (ThreadPoolExecutor) worker;
// Remove anything that is currently in the queue. There are going to be a lot of PostDownloadHandler objects, which
// does not have cancel functionality. Unfortunately.
new ArrayList<>(executor.getQueue()).forEach(runnable -> {
executor.remove(runnable);
// DownloadTask is a subclass of PleaseWaitRunnable
if (runnable instanceof PleaseWaitRunnable) {
((PleaseWaitRunnable) runnable).operationCanceled();
}
});
this.zoomDisabled = Optional.ofNullable(MainApplication.getMap()).map(map -> map.mapView)
.map(NavigatableComponent::getScale).orElse(null);
GuiHelper.runInEDT(() -> {
Expand Down

0 comments on commit acb6aec

Please sign in to comment.