Skip to content

Commit

Permalink
fix PushDatahandler reference count error
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Dec 11, 2024
1 parent 3f4efdb commit 657e15a
Showing 1 changed file with 68 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,28 +262,28 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
return
}
val writePromise = Promise[Unit]()
// for primary, send data to replica
if (doReplicate) {
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
peer.getRpcPort,
peer.getPushPort,
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
// pushData.body().release()
fileWriter.decrementPendingWrites()
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushData replication failed caused by unavailable peer for partitionLocation: $location")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
return
}

pushData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
peer.getRpcPort,
peer.getPushPort,
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushData.body().release()
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushData replication failed caused by unavailable peer for partitionLocation: $location")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
return
}

// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
Expand Down Expand Up @@ -323,24 +323,28 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
}

override def onFailure(e: Throwable): Unit = {
logError(s"PushData replication failed for partitionLocation: $location", e)
// 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if (e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
callbackWithTimer.onFailure(e)
} else if (e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
callbackWithTimer.onFailure(e)
} else if (ExceptionUtils.connectFail(e.getMessage)) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
} else {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_NON_CRITICAL_CAUSE_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE_REPLICA))
Try(Await.result(writePromise.future, Duration.Inf)) match {
case _ =>
logError(s"PushData replication failed for partitionLocation: $location", e)
// 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if (e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
callbackWithTimer.onFailure(e)
} else if (e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
callbackWithTimer.onFailure(e)
} else if (ExceptionUtils.connectFail(e.getMessage)) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
} else {
workerSource.incCounter(
WorkerSource.REPLICATE_DATA_FAIL_NON_CRITICAL_CAUSE_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE_REPLICA))
}
}
}
}
Expand All @@ -360,7 +364,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
logError(
s"PushData replication failed during connecting peer for partitionLocation: $location",
e)
callbackWithTimer.onFailure(
wrappedCallback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
}
}
Expand Down Expand Up @@ -546,9 +550,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
val writePromise = Promise[Unit]()
// for primary, send data to replica
if (doReplicate) {
pushMergedData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
val location = partitionIdToLocations.head._2
val peer = location.getPeer
val peerWorker = new WorkerInfo(
Expand All @@ -558,15 +559,17 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushMergedData.body().release()
fileWriters.foreach(_.decrementPendingWrites())
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushMergedData replication failed caused by unavailable peer for partitionLocation: $location")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
return
}

pushMergedData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
Expand Down Expand Up @@ -600,24 +603,28 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
}

override def onFailure(e: Throwable): Unit = {
logError(s"PushMergedData replicate failed for partitionLocation: $location", e)
// 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if (e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
callbackWithTimer.onFailure(e)
} else if (e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
callbackWithTimer.onFailure(e)
} else if (ExceptionUtils.connectFail(e.getMessage)) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
} else {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_NON_CRITICAL_CAUSE_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE_REPLICA))
Try(Await.result(writePromise.future, Duration.Inf)) match {
case _ =>
logError(s"PushMergedData replicate failed for partitionLocation: $location", e)
// 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if (e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
callbackWithTimer.onFailure(e)
} else if (e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
callbackWithTimer.onFailure(e)
} else if (ExceptionUtils.connectFail(e.getMessage)) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
} else {
workerSource.incCounter(
WorkerSource.REPLICATE_DATA_FAIL_NON_CRITICAL_CAUSE_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE_REPLICA))
}
}
}
}
Expand All @@ -642,7 +649,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
logError(
s"PushMergedData replication failed during connecting peer for partitionLocation: $location",
e)
callbackWithTimer.onFailure(
wrappedCallback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
}
}
Expand Down

0 comments on commit 657e15a

Please sign in to comment.