Skip to content

Commit

Permalink
Save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 12, 2024
1 parent ea10aad commit 49d3a58
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 16 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "51.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "37.0.0"
datafusion-cli = "37.0.0"
datafusion-proto = "37.0.0"
datafusion = "38.0.0"
datafusion-cli = "38.0.0"
datafusion-proto = "38.0.0"
object_store = "0.9.0"
sqlparser = "0.44.0"
sqlparser = "0.45.0"
tonic = { version = "0.11" }
tonic-build = { version = "0.11", default-features = false, features = [
"transport",
Expand Down
1 change: 1 addition & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ message OperatorMetric {
NamedTime time = 8;
int64 start_timestamp = 9;
int64 end_timestamp = 10;
uint64 spilled_rows = 11;
}
}

Expand Down
3 changes: 1 addition & 2 deletions ballista/core/src/plugin/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
use crate::error::{BallistaError, Result};
use crate::plugin::plugin_manager::global_plugin_manager;
use crate::plugin::{Plugin, PluginEnum, PluginRegistrar};
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use libloading::{Library, Symbol};
use std::any::Any;
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use datafusion::logical_expr::{AggregateUDF, AggregateUDFImpl, ScalarUDF};

/// UDF plugin trait
pub trait UDFPlugin: Plugin {
Expand Down
7 changes: 6 additions & 1 deletion ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,10 @@ pub struct NamedTime {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OperatorMetric {
#[prost(oneof = "operator_metric::Metric", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10")]
#[prost(
oneof = "operator_metric::Metric",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11"
)]
pub metric: ::core::option::Option<operator_metric::Metric>,
}
/// Nested message and enum types in `OperatorMetric`.
Expand Down Expand Up @@ -448,6 +451,8 @@ pub mod operator_metric {
StartTimestamp(i64),
#[prost(int64, tag = "10")]
EndTimestamp(i64),
#[prost(uint64, tag = "11")]
SpilledRows(u64),
}
}
/// Used by scheduler
Expand Down
4 changes: 4 additions & 0 deletions ballista/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
PhysicalPlanType::ShuffleWriter(shuffle_writer) => {
let input = inputs[0].clone();

let default_codec =
datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {};

let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
shuffle_writer.output_partitioning.as_ref(),
registry,
input.schema().as_ref(),
&default_codec
)?;

Ok(Arc::new(ShuffleWriterExec::try_new(
Expand Down
7 changes: 6 additions & 1 deletion ballista/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ impl TryInto<MetricValue> for protobuf::OperatorMetric {
count.add(value as usize);
Ok(MetricValue::SpilledBytes(count))
}
Some(operator_metric::Metric::SpilledRows(value)) => {
let count = Count::new();
count.add(value as usize);
Ok(MetricValue::SpilledRows(count))
}
Some(operator_metric::Metric::CurrentMemoryUsage(value)) => {
let gauge = Gauge::new();
gauge.add(value as usize);
Expand Down Expand Up @@ -415,7 +420,7 @@ pub fn get_task_definition_vec<
fn reset_metrics_for_execution_plan(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
plan.transform(&|plan| {
plan.transform(&|plan: Arc<dyn ExecutionPlan> | {
let children = plan.children().clone();
plan.with_new_children(children).map(Transformed::yes)
})
Expand Down
3 changes: 3 additions & 0 deletions ballista/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ impl TryInto<protobuf::OperatorMetric> for &MetricValue {
MetricValue::SpilledBytes(count) => Ok(protobuf::OperatorMetric {
metric: Some(operator_metric::Metric::SpilledBytes(count.value() as u64)),
}),
MetricValue::SpilledRows(count) => Ok(protobuf::OperatorMetric {
metric: Some(operator_metric::Metric::SpilledRows(count.value() as u64)),
}),
MetricValue::CurrentMemoryUsage(gauge) => Ok(protobuf::OperatorMetric {
metric: Some(operator_metric::Metric::CurrentMemoryUsage(
gauge.value() as u64
Expand Down
4 changes: 1 addition & 3 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use ballista_core::serde::scheduler::PartitionId;
use dashmap::DashMap;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_expr::WindowUDF;
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::logical_expr::{AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF};
use futures::future::AbortHandle;
use std::collections::HashMap;
use std::future::Future;
Expand Down
3 changes: 2 additions & 1 deletion ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ pub(crate) fn get_scan_files(
plan: Arc<dyn ExecutionPlan>,
) -> std::result::Result<Vec<Vec<Vec<PartitionedFile>>>, DataFusionError> {
let mut collector: Vec<Vec<Vec<PartitionedFile>>> = vec![];
plan.apply(&mut |plan| {
plan.apply(&mut |plan: &Arc<dyn ExecutionPlan>| {
let plan_any = plan.as_any();
let file_groups =
if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>() {
Expand Down Expand Up @@ -1097,6 +1097,7 @@ mod test {
partition_values: vec![],
range: None,
extensions: None,
statistics: None
}]);
}
vec![scan_files]
Expand Down
9 changes: 5 additions & 4 deletions ballista/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
use datafusion::datasource::source_as_provider;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -365,7 +365,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
debug!("Optimized plan: {}", optimized_plan.display_indent());
}

plan.apply(&mut |plan| {
plan. apply(&mut |plan: &LogicalPlan| {
if let LogicalPlan::TableScan(scan) = plan {
let provider = source_as_provider(&scan.source)?;
if let Some(table) = provider.as_any().downcast_ref::<ListingTable>() {
Expand Down Expand Up @@ -410,9 +410,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);

let plan = plan.transform_down(&|node| {
let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
if node.output_partitioning().partition_count() == 0 {
Ok(Transformed::yes(Arc::new(EmptyExec::new(node.schema()))))
let empty: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(node.schema()));
Ok(Transformed::yes(empty))
} else {
Ok(Transformed::no(node))
}
Expand Down

0 comments on commit 49d3a58

Please sign in to comment.