Skip to content

Commit

Permalink
[CELEBORN-1801][FOLLOWUP] Extract RemoteShuffleEnvironment, NettyShuf…
Browse files Browse the repository at this point in the history
…fleEnvironmentWrapper, SimpleResultPartitionAdapter to flink common module

### What changes were proposed in this pull request?

Extract `RemoteShuffleEnvironment`, `NettyShuffleEnvironmentWrapper`, `SimpleResultPartitionAdapter` to flink common module. Meanwhile, `RemoteShuffleInputGate` and `RemoteShuffleResultPartition` are abstracted in flink common module.

### Why are the changes needed?

After removing out-of-dated flink 1.14 and 1.15 in #3029, `RemoteShuffleEnvironment`, `NettyShuffleEnvironmentWrapper`, `SimpleResultPartitionAdapter` could be extracted to flink common module. Meanwhile, `RemoteShuffleInputGate` and `RemoteShuffleResultPartition` could also be abstracted in flink common module.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3041 from SteNicholas/CELEBORN-1801.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: Weijie Guo <[email protected]>
  • Loading branch information
SteNicholas authored and reswqa committed Dec 31, 2024
1 parent 4ec0228 commit d6496ae
Show file tree
Hide file tree
Showing 43 changed files with 642 additions and 2,895 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.plugin.flink;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.SupplierWithException;

import org.apache.celeborn.common.CelebornConf;

/** An abstract {@link IndexedInputGate} which ingest data from remote shuffle workers. */
public abstract class AbstractRemoteShuffleInputGate extends IndexedInputGate {

public final RemoteShuffleInputGateDelegation inputGateDelegation;

public AbstractRemoteShuffleInputGate(
CelebornConf celebornConf,
String taskName,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
BufferDecompressor bufferDecompressor,
int numConcurrentReading) {
inputGateDelegation =
new RemoteShuffleInputGateDelegation(
celebornConf,
taskName,
gateIndex,
gateDescriptor,
bufferPoolFactory,
bufferDecompressor,
numConcurrentReading,
availabilityHelper,
gateDescriptor.getConsumedSubpartitionIndexRange().getStartIndex(),
gateDescriptor.getConsumedSubpartitionIndexRange().getEndIndex());
}

/** Setup gate and build network connections. */
@Override
public void setup() throws IOException {
inputGateDelegation.setup();
}

/** Index of the gate of the corresponding computing task. */
@Override
public int getGateIndex() {
return inputGateDelegation.getGateIndex();
}

/** Get number of input channels. A channel is a data flow from one shuffle worker. */
@Override
public int getNumberOfInputChannels() {
return inputGateDelegation.getBufferReaders().size();
}

/** Whether reading is finished -- all channels are finished and cached buffers are drained. */
@Override
public boolean isFinished() {
return inputGateDelegation.isFinished();
}

@Override
public Optional<BufferOrEvent> getNext() {
throw new UnsupportedOperationException("Not implemented (DataSet API is not supported).");
}

/** Poll a received {@link BufferOrEvent}. */
@Override
public Optional<BufferOrEvent> pollNext() throws IOException {
return inputGateDelegation.pollNext();
}

/** Close all reading channels inside this {@link AbstractRemoteShuffleInputGate}. */
@Override
public void close() throws Exception {
inputGateDelegation.close();
}

/** Get {@link InputChannelInfo}s of this {@link AbstractRemoteShuffleInputGate}. */
@Override
public List<InputChannelInfo> getChannelInfos() {
return inputGateDelegation.getChannelsInfo();
}

@Override
public void requestPartitions() {
// do-nothing
}

@Override
public void checkpointStarted(CheckpointBarrier barrier) {
// do-nothing.
}

@Override
public void checkpointStopped(long cancelledCheckpointId) {
// do-nothing.
}

@Override
public void triggerDebloating() {
// do-nothing.
}

@Override
public List<InputChannelInfo> getUnfinishedChannels() {
return Collections.emptyList();
}

@Override
public EndOfDataStatus hasReceivedEndOfData() {
if (inputGateDelegation.getPendingEndOfDataEvents() > 0) {
return EndOfDataStatus.NOT_END_OF_DATA;
} else {
// Keep compatibility with streaming mode.
return EndOfDataStatus.DRAINED;
}
}

@Override
public void finishReadRecoveredState() {
// do-nothing.
}

@Override
public abstract InputChannel getChannel(int channelIndex);

@Override
public void sendTaskEvent(TaskEvent event) {
throw new FlinkRuntimeException("Method should not be called.");
}

@Override
public void resumeConsumption(InputChannelInfo channelInfo) {
throw new FlinkRuntimeException("Method should not be called.");
}

@Override
public void acknowledgeAllRecordsProcessed(InputChannelInfo inputChannelInfo) {}

@Override
public CompletableFuture<Void> getStateConsumedFuture() {
return CompletableFuture.completedFuture(null);
}

@Override
public String toString() {
return String.format(
"ReadGate [owning task: %s, gate index: %d, descriptor: %s]",
inputGateDelegation.getTaskName(),
inputGateDelegation.getGateIndex(),
inputGateDelegation.getGateDescriptor().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.utils.Utils;

/** Factory class to create RemoteShuffleInputGate. */
/** Factory class to create {@link AbstractRemoteShuffleInputGate}. */
public abstract class AbstractRemoteShuffleInputGateFactory {

public static final int MIN_BUFFERS_PER_GATE = 16;
Expand Down
Loading

0 comments on commit d6496ae

Please sign in to comment.