Skip to content
This repository was archived by the owner on Mar 26, 2023. It is now read-only.

Commit ae2cab1

Browse files
authored
Merge pull request #5 from g4s8/4
Handle unchecked exceptions
2 parents c741d13 + 0b3a9ee commit ae2cab1

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

src/main/java/wtf/g4s8/rio/file/ReadFlow.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,49 @@ public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
101101
wrap.onSubscribe(new ReadSubscription(chan, wrap, this.buffers, queue));
102102
this.exec.submit(
103103
new CloseChanOnExit(
104-
new ReadBusyLoop(queue, wrap, chan),
104+
new ErrorOnException(
105+
new ReadBusyLoop(queue, wrap, chan),
106+
wrap
107+
),
105108
chan
106109
)
107110
);
108111
}
112+
113+
/**
114+
* Handle all exceptions including unchecked and signal error state to
115+
* subscriber.
116+
* @since 0.1
117+
*/
118+
private static final class ErrorOnException implements Runnable {
119+
120+
/**
121+
* Origin runnable.
122+
*/
123+
private final Runnable runnable;
124+
125+
/**
126+
* Subscriber.
127+
*/
128+
private final ReadSubscriberState<?> sub;
129+
130+
/**
131+
* Wrap runnable.
132+
* @param runnable Runnable to wrap
133+
* @param sub Subscriber
134+
*/
135+
ErrorOnException(final Runnable runnable, final ReadSubscriberState<?> sub) {
136+
this.runnable = runnable;
137+
this.sub = sub;
138+
}
139+
140+
@Override
141+
public void run() {
142+
try {
143+
this.runnable.run();
144+
} catch (final Throwable exx) {
145+
this.sub.onError(exx);
146+
}
147+
}
148+
}
109149
}

src/main/java/wtf/g4s8/rio/file/ReadRequest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import com.jcabi.log.Logger;
2828
import java.io.IOException;
29+
import java.nio.Buffer;
2930
import java.nio.ByteBuffer;
3031
import java.nio.channels.FileChannel;
3132

@@ -104,9 +105,22 @@ void process(final FileChannel channel) {
104105
this.sub.onError(iex);
105106
return;
106107
}
107-
buf.flip();
108+
((Buffer) buf).flip();
108109
if (read >= 0) {
109-
this.sub.onNext(buf);
110+
try {
111+
this.sub.onNext(buf);
112+
} catch (final Exception exx) {
113+
try {
114+
channel.close();
115+
} catch (final IOException cex) {
116+
Logger.warn(
117+
this,
118+
"Failed to close channel on next error: %[exception]s", cex
119+
);
120+
}
121+
this.sub.onError(exx);
122+
return;
123+
}
110124
} else {
111125
try {
112126
channel.close();

0 commit comments

Comments
 (0)