Skip to content

Commit

Permalink
Fixed FetchHandlerSuiteJ
Browse files Browse the repository at this point in the history
  • Loading branch information
otterc committed Nov 16, 2023
1 parent e7fc56b commit 6534819
Showing 1 changed file with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Random;
import java.util.UUID;

import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -45,11 +47,13 @@
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.FileInfo;
import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
import org.apache.celeborn.common.network.client.RpcResponseCallback;
import org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportResponseHandler;
import org.apache.celeborn.common.network.protocol.ChunkFetchSuccess;
import org.apache.celeborn.common.network.protocol.Message;
import org.apache.celeborn.common.network.protocol.OpenStream;
import org.apache.celeborn.common.network.protocol.RpcFailure;
import org.apache.celeborn.common.network.protocol.RpcRequest;
import org.apache.celeborn.common.network.protocol.RpcResponse;
import org.apache.celeborn.common.network.protocol.StreamHandle;
Expand Down Expand Up @@ -289,7 +293,9 @@ private void legacyOpenStreamAndCheck(
ByteBuffer openStreamByteBuffer =
new OpenStream(shuffleKey, fileName, startIndex, endIndex).toByteBuffer();
fetchHandler.receive(
client, new RpcRequest(dummyRequestId, new NioManagedBuffer(openStreamByteBuffer)));
client,
new RpcRequest(dummyRequestId, new NioManagedBuffer(openStreamByteBuffer)),
createRpcResponseCallback(channel));
RpcResponse result = channel.readOutbound();
StreamHandle streamHandler = (StreamHandle) Message.decode(result.body().nioByteBuffer());
if (endIndex == Integer.MAX_VALUE) {
Expand Down Expand Up @@ -318,7 +324,9 @@ private PbStreamHandler openStreamAndCheck(
.toByteArray())
.toByteBuffer();
fetchHandler.receive(
client, new RpcRequest(dummyRequestId, new NioManagedBuffer(openStreamByteBuffer)));
client,
new RpcRequest(dummyRequestId, new NioManagedBuffer(openStreamByteBuffer)),
createRpcResponseCallback(channel));
RpcResponse result = channel.readOutbound();
PbStreamHandler streamHandler =
TransportMessage.fromByteBuffer(result.body().nioByteBuffer()).getParsedPayload();
Expand Down Expand Up @@ -352,7 +360,8 @@ private void fetchChunkAndCheck(
.setLen(Integer.MAX_VALUE))
.build()
.toByteArray())
.toByteBuffer())));
.toByteBuffer())),
createRpcResponseCallback(channel));
ChunkFetchSuccess chunkFetchSuccess = channel.readOutbound();
chunkFetchSuccess.body().retain();
// chunk size 8m
Expand All @@ -372,7 +381,8 @@ private void bufferStreamEnd(TransportClient client, FetchHandler fetchHandler,
.toByteArray());
fetchHandler.receive(
client,
new RpcRequest(dummyRequestId, new NioManagedBuffer(bufferStreamEnd.toByteBuffer())));
new RpcRequest(dummyRequestId, new NioManagedBuffer(bufferStreamEnd.toByteBuffer())),
createRpcResponseCallback(client.getChannel()));
}

private void checkOriginFileBeDeleted(FileInfo fileInfo) {
Expand Down Expand Up @@ -403,4 +413,18 @@ private ArrayList<Integer> generateMapIds(int batchCountPerMap) {
Collections.shuffle(ids);
return ids;
}

private RpcResponseCallback createRpcResponseCallback(Channel channel) {
return new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
channel.writeAndFlush(new RpcResponse(dummyRequestId, new NioManagedBuffer(response)));
}

@Override
public void onFailure(Throwable e) {
channel.writeAndFlush(new RpcFailure(dummyRequestId, Throwables.getStackTraceAsString(e)));
}
};
}
}

0 comments on commit 6534819

Please sign in to comment.