From eac5feb81e0354c09375abb5a1a48d9635dac292 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 26 Nov 2024 17:31:59 +0000 Subject: [PATCH 1/2] refactor: SessionBuilder to return Result<_> --- ballista/core/src/utils.rs | 8 +++++--- ballista/scheduler/src/cluster/memory.rs | 4 ++-- ballista/scheduler/src/scheduler_server/mod.rs | 3 ++- ballista/scheduler/src/standalone.rs | 8 +++++--- ballista/scheduler/src/state/session_manager.rs | 8 ++++---- examples/examples/custom-client.rs | 2 +- examples/src/object_store.rs | 8 ++++---- examples/tests/object_store.rs | 17 +++++++++-------- 8 files changed, 32 insertions(+), 26 deletions(-) diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index bf7533259..b9c6e01e3 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -62,12 +62,14 @@ use tonic::codegen::StdError; use tonic::transport::{Channel, Error, Server}; /// Default session builder using the provided configuration -pub fn default_session_builder(config: SessionConfig) -> SessionState { - SessionStateBuilder::new() +pub fn default_session_builder( + config: SessionConfig, +) -> datafusion::common::Result { + Ok(SessionStateBuilder::new() .with_default_features() .with_config(config) .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap())) - .build() + .build()) } pub fn default_config_producer() -> SessionConfig { diff --git a/ballista/scheduler/src/cluster/memory.rs b/ballista/scheduler/src/cluster/memory.rs index 6df044035..c9eac5640 100644 --- a/ballista/scheduler/src/cluster/memory.rs +++ b/ballista/scheduler/src/cluster/memory.rs @@ -408,7 +408,7 @@ impl JobState for InMemoryJobState { &self, config: &SessionConfig, ) -> Result> { - let session = create_datafusion_context(config, self.session_builder.clone()); + let session = create_datafusion_context(config, self.session_builder.clone())?; self.sessions.insert(session.session_id(), session.clone()); Ok(session) @@ -419,7 +419,7 @@ impl JobState for InMemoryJobState { session_id: &str, config: &SessionConfig, ) -> Result> { - let session = create_datafusion_context(config, self.session_builder.clone()); + let session = create_datafusion_context(config, self.session_builder.clone())?; self.sessions .insert(session_id.to_string(), session.clone()); diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index b6eeafda6..653e2d410 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -56,7 +56,8 @@ mod external_scaler; mod grpc; pub(crate) mod query_stage_scheduler; -pub type SessionBuilder = Arc SessionState + Send + Sync>; +pub type SessionBuilder = + Arc datafusion::common::Result + Send + Sync>; #[derive(Clone)] pub struct SchedulerServer { diff --git a/ballista/scheduler/src/standalone.rs b/ballista/scheduler/src/standalone.rs index 9ad887c60..e9c483456 100644 --- a/ballista/scheduler/src/standalone.rs +++ b/ballista/scheduler/src/standalone.rs @@ -57,9 +57,11 @@ pub async fn new_standalone_scheduler_from_state( let session_config = session_state.config().clone(); let session_state = session_state.clone(); let session_builder = Arc::new(move |c: SessionConfig| { - SessionStateBuilder::new_from_existing(session_state.clone()) - .with_config(c) - .build() + Ok( + SessionStateBuilder::new_from_existing(session_state.clone()) + .with_config(c) + .build(), + ) }); let config_producer = Arc::new(move || session_config.clone()); diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs index 8a769edbd..598131670 100644 --- a/ballista/scheduler/src/state/session_manager.rs +++ b/ballista/scheduler/src/state/session_manager.rs @@ -67,7 +67,7 @@ impl SessionManager { pub fn create_datafusion_context( session_config: &SessionConfig, session_builder: SessionBuilder, -) -> Arc { +) -> datafusion::common::Result> { let session_state = if session_config.round_robin_repartition() { let session_config = session_config .clone() @@ -75,10 +75,10 @@ pub fn create_datafusion_context( .with_round_robin_repartition(false); log::warn!("session manager will override `datafusion.optimizer.enable_round_robin_repartition` to `false` "); - session_builder(session_config) + session_builder(session_config)? } else { - session_builder(session_config.clone()) + session_builder(session_config.clone())? }; - Arc::new(SessionContext::new_with_state(session_state)) + Ok(Arc::new(SessionContext::new_with_state(session_state))) } diff --git a/examples/examples/custom-client.rs b/examples/examples/custom-client.rs index 3577621e4..9e7ec8595 100644 --- a/examples/examples/custom-client.rs +++ b/examples/examples/custom-client.rs @@ -62,7 +62,7 @@ async fn main() -> Result<()> { // new sessions state with required custom session configuration and runtime environment let state = - custom_session_state_with_s3_support(custom_session_config_with_s3_options()); + custom_session_state_with_s3_support(custom_session_config_with_s3_options())?; let ctx: SessionContext = SessionContext::remote_with_state("df://localhost:50050", state).await?; diff --git a/examples/src/object_store.rs b/examples/src/object_store.rs index 130d47059..3cd22fa6a 100644 --- a/examples/src/object_store.rs +++ b/examples/src/object_store.rs @@ -88,13 +88,13 @@ pub fn custom_runtime_env_with_s3_support( /// and [RuntimeEnv]. pub fn custom_session_state_with_s3_support( session_config: SessionConfig, -) -> SessionState { - let runtime_env = custom_runtime_env_with_s3_support(&session_config).unwrap(); +) -> datafusion::common::Result { + let runtime_env = custom_runtime_env_with_s3_support(&session_config)?; - SessionStateBuilder::new() + Ok(SessionStateBuilder::new() .with_runtime_env(runtime_env) .with_config(session_config) - .build() + .build()) } /// Custom [ObjectStoreRegistry] which will create diff --git a/examples/tests/object_store.rs b/examples/tests/object_store.rs index cd5c2def0..ca47c5cb9 100644 --- a/examples/tests/object_store.rs +++ b/examples/tests/object_store.rs @@ -298,7 +298,7 @@ mod custom_s3_config { // object store registry. let session_builder = Arc::new(produce_state); - let state = session_builder(config_producer()); + let state = session_builder(config_producer())?; // setting up ballista cluster with new runtime, configuration, and session state producers let (host, port) = crate::common::setup_test_cluster_with_builders( @@ -416,7 +416,7 @@ mod custom_s3_config { // object store registry. let session_builder = Arc::new(produce_state); - let state = session_builder(config_producer()); + let state = session_builder(config_producer())?; // // establishing cluster connection, let ctx: SessionContext = SessionContext::standalone_with_state(state).await?; @@ -480,24 +480,25 @@ mod custom_s3_config { Ok(()) } - fn produce_state(session_config: SessionConfig) -> SessionState { + fn produce_state( + session_config: SessionConfig, + ) -> datafusion::common::Result { let s3options = session_config .options() .extensions .get::() .ok_or(DataFusionError::Configuration( "S3 Options not set".to_string(), - )) - .unwrap(); + ))?; let config = RuntimeConfig::new().with_object_store_registry(Arc::new( CustomObjectStoreRegistry::new(s3options.clone()), )); - let runtime_env = RuntimeEnv::new(config).unwrap(); + let runtime_env = RuntimeEnv::new(config)?; - SessionStateBuilder::new() + Ok(SessionStateBuilder::new() .with_runtime_env(runtime_env.into()) .with_config(session_config) - .build() + .build()) } } From e4dd747c7b76c9f07b1842b8e4f2438fec2095be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 26 Nov 2024 21:29:18 +0000 Subject: [PATCH 2/2] Update ballista/core/src/utils.rs Co-authored-by: Andy Grove --- ballista/core/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index b9c6e01e3..1506c2bb5 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -68,7 +68,7 @@ pub fn default_session_builder( Ok(SessionStateBuilder::new() .with_default_features() .with_config(config) - .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap())) + .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default())?)) .build()) }