Skip to content

Commit

Permalink
Add option to disable measuring buffer copy
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Jan 22, 2025
1 parent f9613ab commit 6bdaa67
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric)
* @note The RAPIDS shuffle does not use this code.
*/
class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Array[DataType],
useKudo: Boolean)
useKudo: Boolean, kudoMeasureBufferCopy: Boolean)
extends Serializer with Serializable {

private lazy val kudo = {
Expand All @@ -143,7 +143,7 @@ class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Arr

override def newInstance(): SerializerInstance = {
if (useKudo) {
new KudoSerializerInstance(metrics, dataTypes, kudo)
new KudoSerializerInstance(metrics, dataTypes, kudo, kudoMeasureBufferCopy)
} else {
new GpuColumnarBatchSerializerInstance(metrics)
}
Expand Down Expand Up @@ -348,7 +348,8 @@ object SerializedTableColumn {
private class KudoSerializerInstance(
val metrics: Map[String, GpuMetric],
val dataTypes: Array[DataType],
val kudo: Option[KudoSerializer]
val kudo: Option[KudoSerializer],
val measureBufferCopyTime: Boolean,
) extends SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME)
Expand Down Expand Up @@ -399,8 +400,10 @@ private class KudoSerializerInstance(

dataSize += writeMetric.getWrittenBytes
serCalcHeaderTime += writeMetric.getCalcHeaderTime
serCopyHeaderTime += writeMetric.getCopyHeaderTime
serCopyBufferTime += writeMetric.getCopyBufferTime
if (measureBufferCopyTime) {
serCopyHeaderTime += writeMetric.getCopyHeaderTime
serCopyBufferTime += writeMetric.getCopyBufferTime
}
}
} else {
withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ =>
Expand Down
12 changes: 12 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,15 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(false)

val SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED =
conf("spark.rapids.shuffle.kudo.serializer.measure.buffer.copy.enabled")
.doc("Enable or disable measuring buffer copy time when using Kudo serializer for the shuffle.")
.internal()
.startupOnly()
.booleanConf
.createWithDefault(false)


// USER FACING DEBUG CONFIGS

val SHUFFLE_COMPRESSION_MAX_BATCH_MEMORY =
Expand Down Expand Up @@ -3112,6 +3121,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val shuffleKudoSerializerEnabled: Boolean = get(SHUFFLE_KUDO_SERIALIZER_ENABLED)

lazy val shuffleKudoMeasureBufferCopyEnabled: Boolean =
get(SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED)

def isUCXShuffleManagerMode: Boolean =
RapidsShuffleManagerMode
.withName(get(SHUFFLE_MANAGER_MODE)) == RapidsShuffleManagerMode.UCX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ abstract class GpuShuffleExchangeExecBase(
import GpuMetric._

private lazy val useKudo = RapidsConf.SHUFFLE_KUDO_SERIALIZER_ENABLED.get(child.conf)
private lazy val kudoMeasureBufferCopy = RapidsConf
.SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED
.get(child.conf)

private lazy val useGPUShuffle = {
gpuOutputPartitioning match {
Expand Down Expand Up @@ -223,7 +226,7 @@ abstract class GpuShuffleExchangeExecBase(
// This value must be lazy because the child's output may not have been resolved
// yet in all cases.
private lazy val serializer: Serializer = new GpuColumnarBatchSerializer(
allMetrics, sparkTypes, useKudo)
allMetrics, sparkTypes, useKudo, kudoMeasureBufferCopy)

@transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar()

Expand Down

0 comments on commit 6bdaa67

Please sign in to comment.