From 5436b5317417c72a2d7ac32856a260ef4b8b433b Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 16 Jan 2025 12:56:22 -0800 Subject: [PATCH 1/6] Move mocktaskContext to Spark320 shim Signed-off-by: Niranjan Artal --- .../metrics/source/MockTaskContext.scala | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) rename tests/src/test/{ => spark320}/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala (84%) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala b/tests/src/test/spark320/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala similarity index 84% rename from tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala rename to tests/src/test/spark320/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala index b0aec74feb7..6a27c4ebca0 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala +++ b/tests/src/test/spark320/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/*** spark-rapids-shim-json-lines +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "334"} +{"spark": "340"} +{"spark": "341"} +{"spark": "341db"} +{"spark": "342"} +{"spark": "343"} +{"spark": "344"} +{"spark": "350"} +{"spark": "350db143"} +{"spark": "351"} +{"spark": "352"} +{"spark": "353"} +{"spark": "354"} +spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.metrics.source import java.util From 1264cae2ddb8f73f13a528dce95d8cff8e5fe1a8 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 16 Jan 2025 13:01:43 -0800 Subject: [PATCH 2/6] Add file in Spark4 shim Signed-off-by: Niranjan Artal --- .../metrics/source/MockTaskContext.scala | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 tests/src/test/spark400/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala diff --git a/tests/src/test/spark400/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala b/tests/src/test/spark400/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala new file mode 100644 index 00000000000..cf31af98c89 --- /dev/null +++ b/tests/src/test/spark400/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "400"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.metrics.source + +import java.io.Closeable +import java.util +import java.util.Properties + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.TaskContext +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.metrics.source.Source +import org.apache.spark.resource.ResourceInformation +import org.apache.spark.scheduler.TaskLocality +import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener} + +class MockTaskContext(taskAttemptId: Long, partitionId: Int) extends TaskContext { + + val listeners = new ListBuffer[TaskCompletionListener] + + override def isCompleted(): Boolean = false + + override def isInterrupted(): Boolean = false + + override def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext = { + listeners += listener + this + } + + override def addTaskFailureListener(listener: TaskFailureListener): TaskContext = this + + override def stageId(): Int = 1 + + override def stageAttemptNumber(): Int = 1 + + override def partitionId(): Int = partitionId + + override def attemptNumber(): Int = 1 + + override def taskAttemptId(): Long = taskAttemptId + + override def getLocalProperty(key: String): String = null + + override def resources(): Map[String, ResourceInformation] = Map() + + override def resourcesJMap(): util.Map[String, ResourceInformation] = resources().asJava + + override def taskMetrics(): TaskMetrics = new TaskMetrics + + override def getMetricsSources(sourceName: String): Seq[Source] = Seq.empty + + override private[spark] def killTaskIfInterrupted(): Unit = {} + + override def getKillReason() = None + + override def taskMemoryManager() = null + + override private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {} + + override private[spark] def setFetchFailed(fetchFailed: FetchFailedException): Unit = {} + + override private[spark] def markInterrupted(reason: String): Unit = {} + + override private[spark] def markTaskFailed(error: Throwable): Unit = {} + + override private[spark] def markTaskCompleted(error: Option[Throwable]): Unit = {} + + override private[spark] def fetchFailed = None + + override private[spark] def getLocalProperties = new Properties() + + override private[spark] def interruptible(): Boolean = false + + override private[spark] def pendingInterrupt( + threadToInterrupt: Option[Thread], reason: String): Unit = {} + + override private[spark] def createResourceUninterruptibly[T <: Closeable]( + resourceBuilder: => T): T = resourceBuilder + + def cpus(): Int = 2 + + def numPartitions(): Int = 2 + + def taskLocality(): TaskLocality.TaskLocality = TaskLocality.ANY + + /** + * This is exposed to invoke the listeners onTaskCompletion + */ + def markTaskComplete(): Unit = { + listeners.foreach(_.onTaskCompletion(this)) + } + + /** + * This method was introduced in Spark-3.5.1. It's not shimmed and added to the common class by + * removing the override keyword. + */ + def isFailed(): Boolean = false +} From 5dbf0c3ae1c26125a58d29e205c381f56694809b Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 16 Jan 2025 17:14:48 -0800 Subject: [PATCH 3/6] Revert "Add file in Spark4 shim" This reverts commit 1264cae2ddb8f73f13a528dce95d8cff8e5fe1a8. --- .../metrics/source/MockTaskContext.scala | 118 ------------------ 1 file changed, 118 deletions(-) delete mode 100644 tests/src/test/spark400/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala diff --git a/tests/src/test/spark400/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala b/tests/src/test/spark400/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala deleted file mode 100644 index cf31af98c89..00000000000 --- a/tests/src/test/spark400/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2025, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "400"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.metrics.source - -import java.io.Closeable -import java.util -import java.util.Properties - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer - -import org.apache.spark.TaskContext -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.metrics.source.Source -import org.apache.spark.resource.ResourceInformation -import org.apache.spark.scheduler.TaskLocality -import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener} - -class MockTaskContext(taskAttemptId: Long, partitionId: Int) extends TaskContext { - - val listeners = new ListBuffer[TaskCompletionListener] - - override def isCompleted(): Boolean = false - - override def isInterrupted(): Boolean = false - - override def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext = { - listeners += listener - this - } - - override def addTaskFailureListener(listener: TaskFailureListener): TaskContext = this - - override def stageId(): Int = 1 - - override def stageAttemptNumber(): Int = 1 - - override def partitionId(): Int = partitionId - - override def attemptNumber(): Int = 1 - - override def taskAttemptId(): Long = taskAttemptId - - override def getLocalProperty(key: String): String = null - - override def resources(): Map[String, ResourceInformation] = Map() - - override def resourcesJMap(): util.Map[String, ResourceInformation] = resources().asJava - - override def taskMetrics(): TaskMetrics = new TaskMetrics - - override def getMetricsSources(sourceName: String): Seq[Source] = Seq.empty - - override private[spark] def killTaskIfInterrupted(): Unit = {} - - override def getKillReason() = None - - override def taskMemoryManager() = null - - override private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {} - - override private[spark] def setFetchFailed(fetchFailed: FetchFailedException): Unit = {} - - override private[spark] def markInterrupted(reason: String): Unit = {} - - override private[spark] def markTaskFailed(error: Throwable): Unit = {} - - override private[spark] def markTaskCompleted(error: Option[Throwable]): Unit = {} - - override private[spark] def fetchFailed = None - - override private[spark] def getLocalProperties = new Properties() - - override private[spark] def interruptible(): Boolean = false - - override private[spark] def pendingInterrupt( - threadToInterrupt: Option[Thread], reason: String): Unit = {} - - override private[spark] def createResourceUninterruptibly[T <: Closeable]( - resourceBuilder: => T): T = resourceBuilder - - def cpus(): Int = 2 - - def numPartitions(): Int = 2 - - def taskLocality(): TaskLocality.TaskLocality = TaskLocality.ANY - - /** - * This is exposed to invoke the listeners onTaskCompletion - */ - def markTaskComplete(): Unit = { - listeners.foreach(_.onTaskCompletion(this)) - } - - /** - * This method was introduced in Spark-3.5.1. It's not shimmed and added to the common class by - * removing the override keyword. - */ - def isFailed(): Boolean = false -} From 031ab8445b518e498005c674095085868cd70191 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 16 Jan 2025 17:15:02 -0800 Subject: [PATCH 4/6] Revert "Move mocktaskContext to Spark320 shim" This reverts commit 5436b5317417c72a2d7ac32856a260ef4b8b433b. --- .../metrics/source/MockTaskContext.scala | 32 +------------------ 1 file changed, 1 insertion(+), 31 deletions(-) rename tests/src/test/{spark320 => }/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala (84%) diff --git a/tests/src/test/spark320/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala similarity index 84% rename from tests/src/test/spark320/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala rename to tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala index 6a27c4ebca0..b0aec74feb7 100644 --- a/tests/src/test/spark320/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2025, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,36 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -/*** spark-rapids-shim-json-lines -{"spark": "320"} -{"spark": "321"} -{"spark": "321cdh"} -{"spark": "322"} -{"spark": "323"} -{"spark": "324"} -{"spark": "330"} -{"spark": "330cdh"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332cdh"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.metrics.source import java.util From 3609de8cf5793f0f8097628fea4346f3a42078d7 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 16 Jan 2025 17:42:40 -0800 Subject: [PATCH 5/6] addressed review comments Signed-off-by: Niranjan Artal --- docs/dev/shims.md | 10 ++++++++++ .../rapids/metrics/source/MockTaskContext.scala | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/docs/dev/shims.md b/docs/dev/shims.md index 24252df607e..07f253c03e8 100644 --- a/docs/dev/shims.md +++ b/docs/dev/shims.md @@ -40,6 +40,16 @@ for conflicting Shim implementations. ### Compile-time issues +#### Methods added in new versions + +If the new methods are a superset of functionality from previous versions and can be implemented +with default behavior, they can be added directly to the unshimmed class. For methods introduced +in newer versions that do not exist in older versions, removing the override keyword ensures that +these methods are treated as new additions rather than overrides. This allows the same class to work +across different Spark versions. + +#### Different parent class signatures + Upstream base classes we derive from might be incompatible in the sense that one version requires us to implement/override the method `M` whereas the other prohibits it by marking the base implementation `final`, E.g. `org.apache.spark.sql.catalyst.trees.TreeNode` changes diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala index b0aec74feb7..6f535b04357 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package org.apache.spark.sql.rapids.metrics.source +import java.io.Closeable import java.util import java.util.Properties @@ -102,4 +103,16 @@ class MockTaskContext(taskAttemptId: Long, partitionId: Int) extends TaskContext * removing the override keyword. */ def isFailed(): Boolean = false + + /** + * These below methods were introduced in Spark-4. It's not shimmed and added to the common class + * removing the override keyword. + */ + + private[spark] def interruptible(): Boolean = false + + private[spark] def pendingInterrupt(threadToInterrupt: Option[Thread], reason: String): Unit = {} + + private[spark] def createResourceUninterruptibly[T <: Closeable]( + resourceBuilder: => T): T = resourceBuilder } From 0bf63942107f09392c9c791a2fccae8f51007a3d Mon Sep 17 00:00:00 2001 From: Niranjan Artal <50492963+nartal1@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:22:50 -0800 Subject: [PATCH 6/6] Update docs/dev/shims.md Co-authored-by: Gera Shegalov --- docs/dev/shims.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dev/shims.md b/docs/dev/shims.md index 07f253c03e8..ae7838f902d 100644 --- a/docs/dev/shims.md +++ b/docs/dev/shims.md @@ -42,7 +42,7 @@ for conflicting Shim implementations. #### Methods added in new versions -If the new methods are a superset of functionality from previous versions and can be implemented +If the base class or trait in the new version just adds new methods on top of previous versions and can be implemented with default behavior, they can be added directly to the unshimmed class. For methods introduced in newer versions that do not exist in older versions, removing the override keyword ensures that these methods are treated as new additions rather than overrides. This allows the same class to work