Skip to content

Commit

Permalink
[CELEBORN-1490][CIP-6] Add Flink Hybrid Shuffle IT test cases
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. Add Flink Hybrid Shuffle IT test cases
2. Fix bug in open stream.

### Why are the changes needed?

Test coverage for celeborn + hybrid shuffle

### Does this PR introduce _any_ user-facing change?
No

Closes #2859 from reswqa/10-itcase-10month.

Authored-by: Weijie Guo <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
reswqa authored and SteNicholas committed Nov 1, 2024
1 parent e2f640c commit c12e888
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void open(int initialCredit) {
client.readBufferedPartition(
shuffleId, partitionId, subPartitionIndexStart, subPartitionIndexEnd, false);
bufferStream.open(
RemoteBufferStreamReader.this::requestBuffer, initialCredit, messageConsumer);
RemoteBufferStreamReader.this::requestBuffer, initialCredit, messageConsumer, false);
} catch (Exception e) {
logger.warn("Failed to open stream and report to flink framework. ", e);
messageConsumer.accept(new TransportableError(0L, e));
Expand Down Expand Up @@ -158,6 +158,6 @@ public void dataReceived(ReadData readData) {
public void onStreamEnd(BufferStreamEnd streamEnd) {
long streamId = streamEnd.getStreamId();
logger.debug("Buffer stream reader get stream end for {}", streamId);
bufferStream.moveToNextPartitionIfPossible(streamId);
bufferStream.moveToNextPartitionIfPossible(streamId, null, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class CelebornBufferStream {
private PartitionLocation[] locations;
private int subIndexStart;
private int subIndexEnd;
private long pushDataTimeoutMs;
private TransportClient client;
private AtomicInteger currentLocationIndex = new AtomicInteger(0);
private long streamId = 0;
Expand All @@ -72,23 +73,26 @@ public CelebornBufferStream(
String shuffleKey,
PartitionLocation[] locations,
int subIndexStart,
int subIndexEnd) {
int subIndexEnd,
long pushDataTimeoutMs) {
this.mapShuffleClient = mapShuffleClient;
this.clientFactory = dataClientFactory;
this.shuffleKey = shuffleKey;
this.locations = locations;
this.subIndexStart = subIndexStart;
this.subIndexEnd = subIndexEnd;
this.pushDataTimeoutMs = pushDataTimeoutMs;
}

public void open(
Supplier<ByteBuf> bufferSupplier,
int initialCredit,
Consumer<RequestMessage> messageConsumer) {
Consumer<RequestMessage> messageConsumer,
boolean sync) {
this.bufferSupplier = bufferSupplier;
this.initialCredit = initialCredit;
this.messageConsumer = messageConsumer;
moveToNextPartitionIfPossible(0);
moveToNextPartitionIfPossible(0, null, sync);
}

public void addCredit(PbReadAddCredit pbReadAddCredit) {
Expand Down Expand Up @@ -156,12 +160,19 @@ public static CelebornBufferStream create(
String shuffleKey,
PartitionLocation[] locations,
int subIndexStart,
int subIndexEnd) {
int subIndexEnd,
long pushDataTimeoutMs) {
if (locations == null || locations.length == 0) {
return empty();
} else {
return new CelebornBufferStream(
client, dataClientFactory, shuffleKey, locations, subIndexStart, subIndexEnd);
client,
dataClientFactory,
shuffleKey,
locations,
subIndexStart,
subIndexEnd,
pushDataTimeoutMs);
}
}

Expand Down Expand Up @@ -198,12 +209,10 @@ public void close() {
}
}

public void moveToNextPartitionIfPossible(long endedStreamId) {
moveToNextPartitionIfPossible(endedStreamId, null);
}

public void moveToNextPartitionIfPossible(
long endedStreamId, @Nullable BiConsumer<Long, Integer> requiredSegmentIdConsumer) {
long endedStreamId,
@Nullable BiConsumer<Long, Integer> requiredSegmentIdConsumer,
boolean sync) {
logger.debug(
"MoveToNextPartitionIfPossible in this:{}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}, locationsLength:{}",
this,
Expand All @@ -218,7 +227,7 @@ public void moveToNextPartitionIfPossible(

if (currentLocationIndex.get() < locations.length) {
try {
openStreamInternal(requiredSegmentIdConsumer);
openStreamInternal(requiredSegmentIdConsumer, sync);
logger.debug(
"MoveToNextPartitionIfPossible after openStream this:{}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}, locationsLength:{}",
this,
Expand All @@ -237,7 +246,8 @@ public void moveToNextPartitionIfPossible(
* Open the stream, note that if the openReaderFuture is not null, requiredSegmentIdConsumer will
* be invoked for every subPartition when open stream success.
*/
private void openStreamInternal(@Nullable BiConsumer<Long, Integer> requiredSegmentIdConsumer)
private void openStreamInternal(
@Nullable BiConsumer<Long, Integer> requiredSegmentIdConsumer, boolean sync)
throws IOException, InterruptedException {
this.client =
clientFactory.createClientWithRetry(
Expand All @@ -255,8 +265,7 @@ private void openStreamInternal(@Nullable BiConsumer<Long, Integer> requiredSegm
.setInitialCredit(initialCredit)
.build()
.toByteArray());
client.sendRpc(
openStream.toByteBuffer(),
RpcResponseCallback rpcResponseCallback =
new RpcResponseCallback() {

@Override
Expand Down Expand Up @@ -313,7 +322,13 @@ public void onFailure(Throwable e) {
NettyUtils.getRemoteAddress(client.getChannel()));
messageConsumer.accept(new TransportableError(streamId, e));
}
});
};

if (sync) {
client.sendRpcSync(openStream.toByteBuffer(), rpcResponseCallback, pushDataTimeoutMs);
} else {
client.sendRpc(openStream.toByteBuffer(), rpcResponseCallback);
}
}

public TransportClient getClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public CelebornBufferStream readBufferedPartition(
shuffleKey,
partitionLocations,
subPartitionIndexStart,
subPartitionIndexEnd);
subPartitionIndexEnd,
conf.pushDataTimeoutMs());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,12 @@ public void setup(TieredStorageMemoryManager memoryManager) {
}
}

public void open(int initialCredit) {
public void open(int initialCredit, boolean sync) {
try {
bufferStream =
client.readBufferedPartition(
shuffleId, partitionId, subPartitionIndexStart, subPartitionIndexEnd, true);
bufferStream.open(this::requestBuffer, initialCredit, messageConsumer);
this.isOpened = bufferStream.isOpened();
bufferStream.open(this::requestBuffer, initialCredit, messageConsumer, sync);
} catch (Exception e) {
messageConsumer.accept(new TransportableError(0L, e));
LOG.error("Failed to open reader", e);
Expand Down Expand Up @@ -178,6 +177,10 @@ public boolean isOpened() {
return isOpened;
}

public void setOpened(boolean opened) {
isOpened = opened;
}

boolean isClosed() {
return closed;
}
Expand Down Expand Up @@ -306,7 +309,7 @@ public void onStreamEnd(BufferStreamEnd streamEnd) {
if (!closed && !CelebornBufferStream.isEmptyStream(bufferStream)) {
// TOOD: Update the partition locations here if support reading and writing shuffle data
// simultaneously
bufferStream.moveToNextPartitionIfPossible(streamId, this::sendRequireSegmentId);
bufferStream.moveToNextPartitionIfPossible(streamId, this::sendRequireSegmentId, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,16 @@ private boolean openReader(
private boolean openReader(CelebornChannelBufferReader bufferReader) {
if (!bufferReader.isOpened()) {
try {
bufferReader.open(0);
bufferReader.open(0, true);
} catch (Exception e) {
// may throw PartitionUnRetryAbleException
recycleAllResources();
ExceptionUtils.rethrow(e);
}
}

return bufferReader.isOpened();
bufferReader.setOpened(true);
return true;
}

private void initBufferReaders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,48 @@ public void onFailure(Throwable e) {
}
}

/**
* Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to a
* specified timeout for a response. The callback will be invoked with the server's response or
* upon any failure.
*/
public void sendRpcSync(ByteBuffer message, RpcResponseCallback callback, long timeoutMs)
throws IOException {
final SettableFuture<Void> result = SettableFuture.create();

sendRpc(
message,
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
try {
ByteBuffer copy = ByteBuffer.allocate(response.remaining());
copy.put(response);
// flip "copy" to make it readable
copy.flip();
callback.onSuccess(copy);
result.set(null);
} catch (Throwable t) {
logger.warn("Error in responding RPC callback", t);
callback.onFailure(t);
result.set(null);
}
}

@Override
public void onFailure(Throwable e) {
callback.onFailure(e);
result.set(null);
}
});

try {
result.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new IOException("Exception in sendRpcSync to: " + this.getSocketAddress(), e);
}
}

/**
* Sends an opaque message to the RpcHandler on the server-side. No reply is expected for the
* message, and no delivery guarantees are made.
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@
</systemProperties>
<environmentVariables>
<CELEBORN_LOCAL_HOSTNAME>localhost</CELEBORN_LOCAL_HOSTNAME>
<FLINK_VERSION>${flink.version}</FLINK_VERSION>
</environmentVariables>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
Expand Down Expand Up @@ -927,6 +928,7 @@
</systemProperties>
<environmentVariables>
<CELEBORN_LOCAL_HOSTNAME>localhost</CELEBORN_LOCAL_HOSTNAME>
<FLINK_VERSION>${flink.version}</FLINK_VERSION>
</environmentVariables>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.tests.flink;

import org.apache.flink.annotation.Public;

/** All supported flink versions. */
@Public
public enum FlinkVersion {
v1_14("1.14"),
v1_15("1.15"),
v1_16("1.16"),
v1_17("1.17"),
v1_18("1.18"),
v1_19("1.19"),
v1_20("1.20");

private final String versionStr;

FlinkVersion(String versionStr) {
this.versionStr = versionStr;
}

public static FlinkVersion fromVersionStr(String versionStr) {
switch (versionStr) {
case "1.14":
return v1_14;
case "1.15":
return v1_15;
case "1.16":
return v1_16;
case "1.17":
return v1_17;
case "1.18":
return v1_18;
case "1.19":
return v1_19;
case "1.20":
return v1_20;
default:
throw new IllegalArgumentException("Unsupported flink version: " + versionStr);
}
}

@Override
public String toString() {
return versionStr;
}

public boolean isNewerOrEqualVersionThan(FlinkVersion otherVersion) {
return this.ordinal() >= otherVersion.ordinal();
}
}
Loading

0 comments on commit c12e888

Please sign in to comment.