Skip to content

Commit

Permalink
[CELEBORN-1668] Fix NPE when handle closed file writers
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
To fix an NPE when handling the closed file writers.

### Why are the changes needed?
If a file writer stores its shuffle data in memory, the disk file info object will be null, causing NPE.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA.

Closes #2846 from FMX/b1688.

Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
FMX authored and RexXiong committed Oct 24, 2024
1 parent 4b150be commit e1bebb9
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public File getFile() {
return new File(filePath);
}

@Override
public String getFilePath() {
return filePath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,6 @@ public boolean isStreamsEmpty() {
return streams.isEmpty();
}
}

public abstract String getFilePath();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ public int releaseMemoryBuffers() {
logger.info("Memory File Info {} expire, removed {}", this, bufferSize);
return bufferSize;
}

@Override
public String getFilePath() {
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public synchronized void destroy(IOException ioException) {
}
}

protected FileInfo getCurrentFileInfo() {
public FileInfo getCurrentFileInfo() {
if (!isMemoryShuffleFile.get()) {
return diskFileInfo;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
fileWriter.incrementPendingWrites()

if (fileWriter.isClosed) {
val diskFileInfo = fileWriter.getDiskFileInfo
val fileInfo = fileWriter.getCurrentFileInfo
logWarning(
s"[handlePushData] FileWriter is already closed! File path ${diskFileInfo.getFilePath} " +
s"length ${diskFileInfo.getFileLength}")
s"[handlePushData] FileWriter is already closed! File path ${fileInfo.getFilePath} " +
s"length ${fileInfo.getFileLength}")
callbackWithTimer.onFailure(new CelebornIOException("File already closed!"))
fileWriter.decrementPendingWrites()
return
Expand Down Expand Up @@ -546,10 +546,10 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler

val closedFileWriter = fileWriters.find(_.isClosed)
if (closedFileWriter.isDefined) {
val diskFileInfo = closedFileWriter.get.getDiskFileInfo
val fileInfo = closedFileWriter.get.getCurrentFileInfo
logWarning(
s"[handlePushMergedData] FileWriter is already closed! File path ${diskFileInfo.getFilePath} " +
s"length ${diskFileInfo.getFileLength}")
s"[handlePushMergedData] FileWriter is already closed! File path ${fileInfo.getFilePath} " +
s"length ${fileInfo.getFileLength}")
callbackWithTimer.onFailure(new CelebornIOException("File already closed!"))
fileWriters.foreach(_.decrementPendingWrites())
return
Expand Down Expand Up @@ -824,10 +824,10 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
fileWriter.incrementPendingWrites()

if (fileWriter.isClosed) {
val diskFileInfo = fileWriter.getDiskFileInfo
val fileInfo = fileWriter.getCurrentFileInfo
logWarning(
s"[handleMapPartitionPushData] FileWriter is already closed! File path ${diskFileInfo.getFilePath} " +
s"length ${diskFileInfo.getFileLength}")
s"[handleMapPartitionPushData] FileWriter is already closed! File path ${fileInfo.getFilePath} " +
s"length ${fileInfo.getFileLength}")
callback.onFailure(new CelebornIOException("File already closed!"))
fileWriter.decrementPendingWrites()
return
Expand Down Expand Up @@ -1029,8 +1029,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler

// During worker shutdown, worker will return HARD_SPLIT for all existed partition.
// This should before return exception to make current push request revive and retry.
val isPartitionSplitEnabled = fileWriter.asInstanceOf[
MapPartitionDataWriter].getDiskFileInfo.isPartitionSplitEnabled
val isPartitionSplitEnabled = fileWriter.getCurrentFileInfo.isPartitionSplitEnabled

if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
Expand Down Expand Up @@ -1248,8 +1247,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
|diskFull:$diskFull,
|partitionSplitMinimumSize:$partitionSplitMinimumSize,
|splitThreshold:${fileWriter.getSplitThreshold},
|fileLength:${fileWriter.getDiskFileInfo.getFileLength}
|fileName:${fileWriter.getDiskFileInfo.getFilePath}
|fileLength:${fileWriter.getCurrentFileInfo.getFileLength}
|fileName:${fileWriter.getCurrentFileInfo.getFilePath}
|""".stripMargin)
if (fileWriter.needHardSplitForMemoryShuffleStorage()) {
return true
Expand Down

0 comments on commit e1bebb9

Please sign in to comment.