From 0b3a9ee56d728576e39b88865636fa2ae91617f9 Mon Sep 17 00:00:00 2001 From: Kirill Date: Fri, 15 May 2020 17:54:19 +0300 Subject: [PATCH] #4 - handle unchecked exceptions --- src/main/java/wtf/g4s8/rio/file/ReadFlow.java | 42 ++++++++++++++++++- .../java/wtf/g4s8/rio/file/ReadRequest.java | 18 +++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/main/java/wtf/g4s8/rio/file/ReadFlow.java b/src/main/java/wtf/g4s8/rio/file/ReadFlow.java index bf61dad..20b49d6 100644 --- a/src/main/java/wtf/g4s8/rio/file/ReadFlow.java +++ b/src/main/java/wtf/g4s8/rio/file/ReadFlow.java @@ -101,9 +101,49 @@ public void subscribe(final Subscriber subscriber) { wrap.onSubscribe(new ReadSubscription(chan, wrap, this.buffers, queue)); this.exec.submit( new CloseChanOnExit( - new ReadBusyLoop(queue, wrap, chan), + new ErrorOnException( + new ReadBusyLoop(queue, wrap, chan), + wrap + ), chan ) ); } + + /** + * Handle all exceptions including unchecked and signal error state to + * subscriber. + * @since 0.1 + */ + private static final class ErrorOnException implements Runnable { + + /** + * Origin runnable. + */ + private final Runnable runnable; + + /** + * Subscriber. + */ + private final ReadSubscriberState sub; + + /** + * Wrap runnable. + * @param runnable Runnable to wrap + * @param sub Subscriber + */ + ErrorOnException(final Runnable runnable, final ReadSubscriberState sub) { + this.runnable = runnable; + this.sub = sub; + } + + @Override + public void run() { + try { + this.runnable.run(); + } catch (final Throwable exx) { + this.sub.onError(exx); + } + } + } } diff --git a/src/main/java/wtf/g4s8/rio/file/ReadRequest.java b/src/main/java/wtf/g4s8/rio/file/ReadRequest.java index 4f246e0..70a3fab 100644 --- a/src/main/java/wtf/g4s8/rio/file/ReadRequest.java +++ b/src/main/java/wtf/g4s8/rio/file/ReadRequest.java @@ -26,6 +26,7 @@ import com.jcabi.log.Logger; import java.io.IOException; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -104,9 +105,22 @@ void process(final FileChannel channel) { this.sub.onError(iex); return; } - buf.flip(); + ((Buffer) buf).flip(); if (read >= 0) { - this.sub.onNext(buf); + try { + this.sub.onNext(buf); + } catch (final Exception exx) { + try { + channel.close(); + } catch (final IOException cex) { + Logger.warn( + this, + "Failed to close channel on next error: %[exception]s", cex + ); + } + this.sub.onError(exx); + return; + } } else { try { channel.close();