diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 8a01f56fb..daabe8a27 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -60,7 +60,6 @@ futures = "0.3" hashbrown = "0.14" itertools = "0.12" -libloading = "0.8.0" log = "0.4" md-5 = { version = "^0.10.0" } object_store = { workspace = true } diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index 301bc0507..5306e8b98 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -31,8 +31,6 @@ pub mod error; pub mod event_loop; pub mod execution_plans; pub mod object_store_registry; -/// some plugins -pub mod plugin; pub mod utils; #[macro_use] diff --git a/ballista/core/src/plugin/mod.rs b/ballista/core/src/plugin/mod.rs deleted file mode 100644 index 8d2cd6b9b..000000000 --- a/ballista/core/src/plugin/mod.rs +++ /dev/null @@ -1,128 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::error::Result; -use crate::plugin::udf::UDFPluginManager; -use libloading::Library; -use std::any::Any; -use std::env; -use std::sync::Arc; - -/// plugin manager -pub mod plugin_manager; -/// udf plugin -pub mod udf; - -/// CARGO_PKG_VERSION -pub static CORE_VERSION: &str = env!("CARGO_PKG_VERSION"); -/// RUSTC_VERSION -pub static RUSTC_VERSION: &str = env!("RUSTC_VERSION"); - -/// Top plugin trait -pub trait Plugin { - /// Returns the plugin as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; -} - -/// The enum of Plugin -#[repr(C)] -#[derive(PartialEq, std::cmp::Eq, std::hash::Hash, Copy, Clone)] -pub enum PluginEnum { - /// UDF/UDAF plugin - UDF, -} - -impl PluginEnum { - /// new a struct which impl the PluginRegistrar trait - pub fn init_plugin_manager(&self) -> Box { - match self { - PluginEnum::UDF => Box::::default(), - } - } -} - -/// Every plugin need a PluginDeclaration -#[derive(Copy, Clone)] -pub struct PluginDeclaration { - /// Rust doesn’t have a stable ABI, meaning different compiler versions can generate incompatible code. - /// For these reasons, the UDF plug-in must be compiled using the same version of rustc as datafusion. - pub rustc_version: &'static str, - - /// core version of the plugin. The plugin's core_version need same as plugin manager. - pub core_version: &'static str, - - /// One of PluginEnum - pub plugin_type: unsafe extern "C" fn() -> PluginEnum, -} - -/// Plugin Registrar , Every plugin need implement this trait -pub trait PluginRegistrar: Send + Sync + 'static { - /// # Safety - /// load plugin from library - unsafe fn load(&mut self, library: Arc) -> Result<()>; - - /// Returns the plugin as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; -} - -/// Declare a plugin's PluginDeclaration. -/// -/// # Notes -/// -/// This works by automatically generating an `extern "C"` function named `get_plugin_type` with a -/// pre-defined signature and symbol name. And then generating a PluginDeclaration. -/// Therefore you will only be able to declare one plugin per library. -#[macro_export] -macro_rules! declare_plugin { - ($plugin_type:expr) => { - #[no_mangle] - pub extern "C" fn get_plugin_type() -> $crate::plugin::PluginEnum { - $plugin_type - } - - #[no_mangle] - pub static plugin_declaration: $crate::plugin::PluginDeclaration = - $crate::plugin::PluginDeclaration { - rustc_version: $crate::plugin::RUSTC_VERSION, - core_version: $crate::plugin::CORE_VERSION, - plugin_type: get_plugin_type, - }; - }; -} - -/// get the plugin dir -pub fn plugin_dir() -> String { - let current_exe_dir = match env::current_exe() { - Ok(exe_path) => exe_path.display().to_string(), - Err(_e) => "".to_string(), - }; - - // If current_exe_dir contain `deps` the root dir is the parent dir - // eg: /Users/xxx/workspace/rust_plugin_sty/target/debug/deps/plugins_app-067452b3ff2af70e - // the plugin dir is /Users/xxx/workspace/rust_plugin_sty/target/debug/deps - // else eg: /Users/xxx/workspace/rust_plugin_sty/target/debug/plugins_app - // the plugin dir is /Users/xxx/workspace/rust_plugin_sty/target/debug/ - if current_exe_dir.contains("/deps/") { - let i = current_exe_dir.find("/deps/").unwrap(); - String::from(¤t_exe_dir.as_str()[..i + 6]) - } else { - let i = current_exe_dir.rfind('/').unwrap(); - String::from(¤t_exe_dir.as_str()[..i]) - } -} diff --git a/ballista/core/src/plugin/plugin_manager.rs b/ballista/core/src/plugin/plugin_manager.rs deleted file mode 100644 index 6c19f0542..000000000 --- a/ballista/core/src/plugin/plugin_manager.rs +++ /dev/null @@ -1,150 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -use crate::error::{BallistaError, Result}; -use libloading::Library; -use log::info; -use std::collections::HashMap; -use std::io; -use std::sync::{Arc, Mutex}; -use walkdir::{DirEntry, WalkDir}; - -use crate::plugin::{ - PluginDeclaration, PluginEnum, PluginRegistrar, CORE_VERSION, RUSTC_VERSION, -}; -use once_cell::sync::OnceCell; - -/// To prevent the library from being loaded multiple times, we use once_cell defines a Arc> -/// Because datafusion is a library, not a service, users may not need to load all plug-ins in the process. -/// So fn global_plugin_manager return Arc>. In this way, users can load the required library through the load method of GlobalPluginManager when needed -static INSTANCE: OnceCell>> = OnceCell::new(); - -/// global_plugin_manager -pub fn global_plugin_manager( - plugin_path: &str, -) -> &'static Arc> { - INSTANCE.get_or_init(move || unsafe { - let mut gpm = GlobalPluginManager::default(); - gpm.load(plugin_path).unwrap(); - Arc::new(Mutex::new(gpm)) - }) -} - -#[derive(Default)] -/// manager all plugin_type's plugin_manager -pub struct GlobalPluginManager { - /// every plugin need a plugin registrar - pub plugin_managers: HashMap>, - - /// loaded plugin files - pub plugin_files: Vec, -} - -impl GlobalPluginManager { - /// # Safety - /// find plugin file from `plugin_path` and load it . - unsafe fn load(&mut self, plugin_path: &str) -> Result<()> { - if "".eq(plugin_path) { - return Ok(()); - } - // find library file from udaf_plugin_path - info!("load plugin from dir:{}", plugin_path); - - let plugin_files = self.get_all_plugin_files(plugin_path)?; - - for plugin_file in plugin_files { - let library = Library::new(plugin_file.path()).map_err(|e| { - BallistaError::IoError(io::Error::new( - io::ErrorKind::Other, - format!("load library error: {e}"), - )) - })?; - - let library = Arc::new(library); - - let dec = library.get::<*mut PluginDeclaration>(b"plugin_declaration\0"); - if dec.is_err() { - info!( - "not found plugin_declaration in the library: {}", - plugin_file.path().to_str().unwrap() - ); - continue; - } - - let dec = dec.unwrap().read(); - - // ersion checks to prevent accidental ABI incompatibilities - if dec.rustc_version != RUSTC_VERSION || dec.core_version != CORE_VERSION { - return Err(BallistaError::IoError(io::Error::new( - io::ErrorKind::Other, - "Version mismatch", - ))); - } - - let plugin_enum = (dec.plugin_type)(); - let curr_plugin_manager = match self.plugin_managers.get_mut(&plugin_enum) { - None => { - let plugin_manager = plugin_enum.init_plugin_manager(); - self.plugin_managers.insert(plugin_enum, plugin_manager); - self.plugin_managers.get_mut(&plugin_enum).unwrap() - } - Some(manager) => manager, - }; - curr_plugin_manager.load(library)?; - self.plugin_files - .push(plugin_file.path().to_str().unwrap().to_string()); - } - - Ok(()) - } - - /// get all plugin file in the dir - fn get_all_plugin_files(&self, plugin_path: &str) -> io::Result> { - let mut plugin_files = Vec::new(); - for entry in WalkDir::new(plugin_path).into_iter().filter_map(|e| { - let item = e.unwrap(); - // every file only load once - if self - .plugin_files - .contains(&item.path().to_str().unwrap().to_string()) - { - return None; - } - - let file_type = item.file_type(); - if !file_type.is_file() { - return None; - } - - if let Some(path) = item.path().extension() { - if let Some(suffix) = path.to_str() { - if suffix == "dylib" || suffix == "so" || suffix == "dll" { - info!( - "load plugin from library file:{}", - item.path().to_str().unwrap() - ); - return Some(item); - } - } - } - - None - }) { - plugin_files.push(entry); - } - Ok(plugin_files) - } -} diff --git a/ballista/core/src/plugin/udf.rs b/ballista/core/src/plugin/udf.rs deleted file mode 100644 index 36ebd5e32..000000000 --- a/ballista/core/src/plugin/udf.rs +++ /dev/null @@ -1,151 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -use crate::error::{BallistaError, Result}; -use crate::plugin::plugin_manager::global_plugin_manager; -use crate::plugin::{Plugin, PluginEnum, PluginRegistrar}; -use datafusion::logical_expr::{AggregateUDF, ScalarUDF}; -use libloading::{Library, Symbol}; -use std::any::Any; -use std::collections::HashMap; -use std::io; -use std::sync::Arc; - -/// UDF plugin trait -pub trait UDFPlugin: Plugin { - /// get a ScalarUDF by name - fn get_scalar_udf_by_name(&self, fun_name: &str) -> Result; - - /// return all udf names in the plugin - fn udf_names(&self) -> Result>; - - /// get a aggregate udf by name - fn get_aggregate_udf_by_name(&self, fun_name: &str) -> Result; - - /// return all udaf names - fn udaf_names(&self) -> Result>; -} - -/// UDFPluginManager -#[derive(Default, Clone)] -pub struct UDFPluginManager { - /// scalar udfs - pub scalar_udfs: HashMap>, - - /// aggregate udfs - pub aggregate_udfs: HashMap>, - - /// All libraries load from the plugin dir. - pub libraries: Vec>, -} - -impl PluginRegistrar for UDFPluginManager { - unsafe fn load(&mut self, library: Arc) -> Result<()> { - type PluginRegister = unsafe fn() -> Box; - let register_fun: Symbol = - library.get(b"registrar_udf_plugin\0").map_err(|e| { - BallistaError::IoError(io::Error::new( - io::ErrorKind::Other, - format!("not found fn registrar_udf_plugin in the library: {e}"), - )) - })?; - - let udf_plugin: Box = register_fun(); - udf_plugin - .udf_names() - .unwrap() - .iter() - .try_for_each(|udf_name| { - if self.scalar_udfs.contains_key(udf_name) { - Err(BallistaError::IoError(io::Error::new( - io::ErrorKind::Other, - format!("udf name: {udf_name} already exists"), - ))) - } else { - let scalar_udf = udf_plugin.get_scalar_udf_by_name(udf_name)?; - self.scalar_udfs - .insert(udf_name.to_string(), Arc::new(scalar_udf)); - Ok(()) - } - })?; - - udf_plugin - .udaf_names() - .unwrap() - .iter() - .try_for_each(|udaf_name| { - if self.aggregate_udfs.contains_key(udaf_name) { - Err(BallistaError::IoError(io::Error::new( - io::ErrorKind::Other, - format!("udaf name: {udaf_name} already exists"), - ))) - } else { - let aggregate_udf = - udf_plugin.get_aggregate_udf_by_name(udaf_name)?; - self.aggregate_udfs - .insert(udaf_name.to_string(), Arc::new(aggregate_udf)); - Ok(()) - } - })?; - self.libraries.push(library); - Ok(()) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -/// Declare a udf plugin registrar callback -/// -/// # Notes -/// -/// This works by automatically generating an `extern "C"` function named `registrar_udf_plugin` with a -/// pre-defined signature and symbol name. -/// Therefore you will only be able to declare one plugin per library. -#[macro_export] -macro_rules! declare_udf_plugin { - ($curr_plugin_type:ty, $constructor:path) => { - #[no_mangle] - pub extern "C" fn registrar_udf_plugin() -> Box { - // make sure the constructor is the correct type. - let constructor: fn() -> $curr_plugin_type = $constructor; - let object = constructor(); - Box::new(object) - } - - $crate::declare_plugin!($crate::plugin::PluginEnum::UDF); - }; -} - -/// get a Option of Immutable UDFPluginManager -pub fn get_udf_plugin_manager(path: &str) -> Option { - let udf_plugin_manager_opt = { - let gpm = global_plugin_manager(path).lock().unwrap(); - let plugin_registrar_opt = gpm.plugin_managers.get(&PluginEnum::UDF); - if let Some(plugin_registrar) = plugin_registrar_opt { - if let Some(udf_plugin_manager) = - plugin_registrar.as_any().downcast_ref::() - { - return Some(udf_plugin_manager.clone()); - } else { - return None; - } - } - None - }; - udf_plugin_manager_opt -}