Skip to content

Commit

Permalink
Fixed ChunkFetchIntegrationSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
otterc committed Nov 16, 2023
1 parent 6534819 commit c36eba8
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import org.junit.AfterClass;
Expand All @@ -43,6 +44,7 @@
import org.apache.celeborn.common.network.client.RpcResponseCallback;
import org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.protocol.ChunkFetchFailure;
import org.apache.celeborn.common.network.protocol.ChunkFetchSuccess;
import org.apache.celeborn.common.network.protocol.RequestMessage;
import org.apache.celeborn.common.network.protocol.StreamChunkSlice;
Expand Down Expand Up @@ -118,10 +120,17 @@ public void receive(TransportClient client, RequestMessage msg) {
}
StreamChunkSlice slice =
StreamChunkSlice.fromProto(chunkFetchRequest.getStreamChunkSlice());
ManagedBuffer buf =
chunkStreamManager.getChunk(
slice.streamId, slice.chunkIndex, slice.offset, slice.len);
client.getChannel().writeAndFlush(new ChunkFetchSuccess(slice, buf));
ManagedBuffer buf = null;
try {
buf =
chunkStreamManager.getChunk(
slice.streamId, slice.chunkIndex, slice.offset, slice.len);
client.getChannel().writeAndFlush(new ChunkFetchSuccess(slice, buf));
} catch (Exception e) {
client
.getChannel()
.writeAndFlush(new ChunkFetchFailure(slice, Throwables.getStackTraceAsString(e)));
}
}

@Override
Expand Down Expand Up @@ -189,7 +198,7 @@ public void onFailure(int chunkIndex, Throwable e) {
for (int chunkIndex : chunkIndices) {
client.fetchChunk(STREAM_ID, chunkIndex, 10000, callback);
}
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
if (!sem.tryAcquire(chunkIndices.size(), 60, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");
}
client.close();
Expand Down

0 comments on commit c36eba8

Please sign in to comment.