Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for kudo write metrics #11784

Open
wants to merge 4 commits into
base: branch-25.02
Choose a base branch
from

Conversation

liurenjie1024
Copy link
Collaborator

@liurenjie1024 liurenjie1024 commented Nov 27, 2024

This pr address two things:

  1. Refactor metrics in gpu exchange to use constants rather string literals.
  2. Add support for kudo write metrics. This part relys on Add write metrics for kudo. spark-rapids-jni#2630 to be merged.

Signed-off-by: liurenjie1024 <[email protected]>
@liurenjie1024 liurenjie1024 changed the base branch from branch-24.12 to branch-25.02 November 27, 2024 06:11
val METRIC_DATA_READ_SIZE = "dataReadSize"
val METRIC_DESC_DATA_READ_SIZE = "data read size"
val METRIC_SHUFFLE_SERIALIZATION_TIME = "rapidsShuffleSerializationTime"
val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "rs. serialization time"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not new with this PR, but "rs." is not a very helpful description and is more cryptic than the metric name itself.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -315,8 +356,11 @@ object GpuShuffleExchangeExecBase {
rdd
}
val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning)
val partitonTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val partitonTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)
val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -37,6 +38,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuShuffleDependency
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{createAdditionalExchangeMetris, METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{createAdditionalExchangeMetris, METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetrics

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime"
val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "rs. serialization copy buffer time"

def createAdditionalExchangeMetris(gpu: GpuExec): Map[String, GpuMetric] = Map(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def createAdditionalExchangeMetris(gpu: GpuExec): Map[String, GpuMetric] = Map(
def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -679,7 +675,6 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](
}
val res = currentIter.next()
val fetchTime = System.nanoTime() - fetchTimeStart
deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the new deserialization time metric does the same thing the old metric did. The new metric is timing how long it takes the serializer to produce a new batch, but that's measuring I/O time as well as deserialization time once the I/O is completed. The original metric appears to go out of its way to remove the time spent on I/O to get a more accurate deserialization metric.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case is different when using rss such as celeborn, e.g. it's pure serialization time which simply write to in memory buffer. I will restore the change here and add a new metric for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants