Skip to content
This repository has been archived by the owner on Mar 26, 2023. It is now read-only.
/ rio Public archive

Commit

Permalink
Merge pull request #5 from g4s8/4
Browse files Browse the repository at this point in the history
Handle unchecked exceptions
  • Loading branch information
g4s8 authored May 15, 2020
2 parents c741d13 + 0b3a9ee commit ae2cab1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
42 changes: 41 additions & 1 deletion src/main/java/wtf/g4s8/rio/file/ReadFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,49 @@ public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
wrap.onSubscribe(new ReadSubscription(chan, wrap, this.buffers, queue));
this.exec.submit(
new CloseChanOnExit(
new ReadBusyLoop(queue, wrap, chan),
new ErrorOnException(
new ReadBusyLoop(queue, wrap, chan),
wrap
),
chan
)
);
}

/**
* Handle all exceptions including unchecked and signal error state to
* subscriber.
* @since 0.1
*/
private static final class ErrorOnException implements Runnable {

/**
* Origin runnable.
*/
private final Runnable runnable;

/**
* Subscriber.
*/
private final ReadSubscriberState<?> sub;

/**
* Wrap runnable.
* @param runnable Runnable to wrap
* @param sub Subscriber
*/
ErrorOnException(final Runnable runnable, final ReadSubscriberState<?> sub) {
this.runnable = runnable;
this.sub = sub;
}

@Override
public void run() {
try {
this.runnable.run();
} catch (final Throwable exx) {
this.sub.onError(exx);
}
}
}
}
18 changes: 16 additions & 2 deletions src/main/java/wtf/g4s8/rio/file/ReadRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.jcabi.log.Logger;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

Expand Down Expand Up @@ -104,9 +105,22 @@ void process(final FileChannel channel) {
this.sub.onError(iex);
return;
}
buf.flip();
((Buffer) buf).flip();
if (read >= 0) {
this.sub.onNext(buf);
try {
this.sub.onNext(buf);
} catch (final Exception exx) {
try {
channel.close();
} catch (final IOException cex) {
Logger.warn(
this,
"Failed to close channel on next error: %[exception]s", cex
);
}
this.sub.onError(exx);
return;
}
} else {
try {
channel.close();
Expand Down

0 comments on commit ae2cab1

Please sign in to comment.