Skip to content

Commit

Permalink
chore: spawn process
Browse files Browse the repository at this point in the history
  • Loading branch information
sangjanai committed Jan 10, 2025
1 parent 4df0704 commit acc5675
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 156 deletions.
1 change: 1 addition & 0 deletions engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ add_executable(${TARGET_NAME} main.cc
${CMAKE_CURRENT_SOURCE_DIR}/extensions/python-engine/python_engine.cc

${CMAKE_CURRENT_SOURCE_DIR}/utils/dylib_path_manager.cc
${CMAKE_CURRENT_SOURCE_DIR}/utils/process/utils.cc

${CMAKE_CURRENT_SOURCE_DIR}/extensions/remote-engine/remote_engine.cc

Expand Down
1 change: 1 addition & 0 deletions engine/cli/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ add_executable(${TARGET_NAME} main.cc
${CMAKE_CURRENT_SOURCE_DIR}/../utils/file_manager_utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/../utils/curl_utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/../utils/system_info_utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/../utils/process/utils.cc
)

target_link_libraries(${TARGET_NAME} PRIVATE CLI11::CLI11)
Expand Down
30 changes: 16 additions & 14 deletions engine/cli/commands/server_start_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "services/engine_service.h"
#include "utils/cortex_utils.h"
#include "utils/file_manager_utils.h"
#include "utils/process/utils.h"

#if defined(_WIN32) || defined(_WIN64)
#include "utils/widechar_conv.h"
Expand Down Expand Up @@ -103,25 +104,26 @@ bool ServerStartCmd::Exec(const std::string& host, int port,
}

#else
// Unix-like system-specific code to fork a child process
pid_t pid = fork();
std::vector<std::string> commands;
// Some engines requires to add lib search path before process being created
auto download_srv = std::make_shared<DownloadService>();
auto dylib_path_mng = std::make_shared<cortex::DylibPathManager>();
auto db_srv = std::make_shared<DatabaseService>();
EngineService(download_srv, dylib_path_mng, db_srv).RegisterEngineLibPath();

std::string p = cortex_utils::GetCurrentPath() + "/" + exe;
commands.push_back(p);
commands.push_back("--config_file_path");
commands.push_back(get_config_file_path());
commands.push_back("--data_folder_path");
commands.push_back(get_data_folder_path());
commands.push_back("--loglevel");
commands.push_back(log_level_);
auto pid = cortex::process::SpawnProcess(commands);
if (pid < 0) {
// Fork failed
std::cerr << "Could not start server: " << std::endl;
return false;
} else if (pid == 0) {
// Some engines requires to add lib search path before process being created
auto download_srv = std::make_shared<DownloadService>();
auto dylib_path_mng = std::make_shared<cortex::DylibPathManager>();
auto db_srv = std::make_shared<DatabaseService>();
EngineService(download_srv, dylib_path_mng, db_srv).RegisterEngineLibPath();

std::string p = cortex_utils::GetCurrentPath() + "/" + exe;
execl(p.c_str(), exe.c_str(), "--start-server", "--config_file_path",
get_config_file_path().c_str(), "--data_folder_path",
get_data_folder_path().c_str(), "--loglevel", log_level_.c_str(),
(char*)0);
} else {
// Parent process
if (!TryConnectToServer(host, port)) {
Expand Down
107 changes: 4 additions & 103 deletions engine/extensions/python-engine/python_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <iostream>
#include <sstream>
#include <string>

namespace python_engine {
namespace {
constexpr const int k200OK = 200;
Expand Down Expand Up @@ -61,33 +62,6 @@ static size_t WriteCallback(char* ptr, size_t size, size_t nmemb,
return size * nmemb;
}

std::string ConstructWindowsCommandLine(const std::vector<std::string>& args) {
std::string cmd_line;
for (const auto& arg : args) {
// Simple escaping for Windows command line
std::string escaped_arg = arg;
if (escaped_arg.find(' ') != std::string::npos) {
// Wrap in quotes and escape existing quotes
for (char& c : escaped_arg) {
if (c == '"')
c = '\\';
}
escaped_arg = "\"" + escaped_arg + "\"";
}
cmd_line += escaped_arg + " ";
}
return cmd_line;
}

std::vector<char*> ConvertToArgv(const std::vector<std::string>& args) {
std::vector<char*> argv;
for (const auto& arg : args) {
argv.push_back(const_cast<char*>(arg.c_str()));
}
argv.push_back(nullptr);
return argv;
}

} // namespace

PythonEngine::PythonEngine() : q_(4 /*n_parallel*/, "python_engine") {}
Expand All @@ -106,80 +80,6 @@ config::PythonModelConfig* PythonEngine::GetModelConfig(
return nullptr;
}

// TODO(sang) move to utils to re-use
pid_t PythonEngine::SpawnProcess(const std::string& model,
const std::vector<std::string>& command) {
try {
#if defined(_WIN32)
// Windows process creation
STARTUPINFOA si = {0};
PROCESS_INFORMATION pi = {0};
si.cb = sizeof(si);

// Construct command line
std::string cmd_line = ConstructWindowsCommandLine(command);

// Convert string to char* for Windows API
char command_buffer[4096];
strncpy_s(command_buffer, cmd_line.c_str(), sizeof(command_buffer));

if (!CreateProcessA(NULL, // lpApplicationName
command_buffer, // lpCommandLine
NULL, // lpProcessAttributes
NULL, // lpThreadAttributes
FALSE, // bInheritHandles
0, // dwCreationFlags
NULL, // lpEnvironment
NULL, // lpCurrentDirectory
&si, // lpStartupInfo
&pi // lpProcessInformation
)) {
throw std::runtime_error("Failed to create process on Windows");
}

// Store the process ID
pid_t pid = pi.dwProcessId;
process_map_[model] = pid;

// Close handles to avoid resource leaks
CloseHandle(pi.hProcess);
CloseHandle(pi.hThread);

return pid;

#elif defined(__APPLE__) || defined(__linux__)
// POSIX process creation
pid_t pid;

// Convert command vector to char*[]
auto argv = ConvertToArgv(command);

// Use posix_spawn for cross-platform compatibility
auto spawn_result = posix_spawn(&pid, // pid output
command[0].c_str(), // executable path
NULL, // file actions
NULL, // spawn attributes
argv.data(), // argument vector
NULL // environment (inherit)
);

if (spawn_result != 0) {
throw std::runtime_error("Failed to spawn process");
}

// Store the process ID
process_map_[model] = pid;
return pid;

#else
#error Unsupported platform
#endif
} catch (const std::exception& e) {
LOG_ERROR << "Process spawning error: " << e.what();
return -1;
}
}

bool PythonEngine::TerminateModelProcess(const std::string& model) {
auto it = process_map_.find(model);
if (it == process_map_.end()) {
Expand Down Expand Up @@ -405,7 +305,8 @@ void PythonEngine::LoadModel(

// Add the parsed arguments to the command
command.insert(command.end(), args.begin(), args.end());
pid = SpawnProcess(model, command);
pid = cortex::process::SpawnProcess(command);
process_map_[model] = pid;
if (pid == -1) {
std::unique_lock lock(models_mutex_);
if (models_.find(model) != models_.end()) {
Expand Down Expand Up @@ -467,7 +368,7 @@ void PythonEngine::UnloadModel(

auto model = (*json_body)["model"].asString();

{
{
if (TerminateModelProcess(model)) {
std::unique_lock lock(models_mutex_);
models_.erase(model);
Expand Down
16 changes: 2 additions & 14 deletions engine/extensions/python-engine/python_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,8 @@
#include "utils/file_manager_utils.h"

#include "utils/curl_utils.h"
#if defined(_WIN32)
#include <process.h>
#include <windows.h>
using pid_t = DWORD;
#elif defined(__APPLE__) || defined(__linux__)
#include <signal.h>
#include <spawn.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#endif
#include "utils/process/utils.h"

// Helper for CURL response
namespace python_engine {
struct StreamContext {
Expand All @@ -52,7 +43,6 @@ class PythonEngine : public EngineI {
std::unordered_map<std::string, pid_t> process_map_;
trantor::ConcurrentTaskQueue q_;


// Helper functions
CurlResponse MakePostRequest(const std::string& model,
const std::string& path,
Expand All @@ -67,8 +57,6 @@ class PythonEngine : public EngineI {
const std::function<void(Json::Value&&, Json::Value&&)>& callback);

// Process manager functions
pid_t SpawnProcess(const std::string& model,
const std::vector<std::string>& command);
bool TerminateModelProcess(const std::string& model);

// Internal model management
Expand Down
48 changes: 23 additions & 25 deletions engine/services/hardware_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
#endif
#include "cli/commands/cortex_upd_cmd.h"
#include "database/hardware.h"
#include "services/engine_service.h"
#include "utils/cortex_utils.h"
#include "utils/dylib_path_manager.h"
#include "utils/process/utils.h"

namespace {
bool TryConnectToServer(const std::string& host, int port) {
Expand Down Expand Up @@ -149,41 +152,36 @@ bool HardwareService::Restart(const std::string& host, int port) {
}

#else
// Unix-like system-specific code to fork a child process
pid_t pid = fork();

std::vector<std::string> commands;
// Some engines requires to add lib search path before process being created
auto download_srv = std::make_shared<DownloadService>();
auto dylib_path_mng = std::make_shared<cortex::DylibPathManager>();
auto db_srv = std::make_shared<DatabaseService>();
EngineService(download_srv, dylib_path_mng, db_srv).RegisterEngineLibPath();
std::string p = cortex_utils::GetCurrentPath() + "/" + exe;
commands.push_back(p);
commands.push_back("--ignore_cout");
commands.push_back("--config_file_path");
commands.push_back(get_config_file_path());
commands.push_back("--data_folder_path");
commands.push_back(get_data_folder_path());
commands.push_back("--loglevel");
commands.push_back(luh::LogLevelStr(luh::global_log_level));
auto pid = cortex::process::SpawnProcess(commands);
if (pid < 0) {
// Fork failed
std::cerr << "Could not start server: " << std::endl;
return false;
} else if (pid == 0) {
// No need to configure LD_LIBRARY_PATH for macOS
#if !defined(__APPLE__) || !defined(__MACH__)
const char* name = "LD_LIBRARY_PATH";
auto data = getenv(name);
std::string v;
if (auto g = getenv(name); g) {
v += g;
}
CTL_INF("LD_LIBRARY_PATH: " << v);
auto llamacpp_path = file_manager_utils::GetCudaToolkitPath(kLlamaRepo);
auto trt_path = file_manager_utils::GetCudaToolkitPath(kTrtLlmRepo);

auto new_v = trt_path.string() + ":" + llamacpp_path.string() + ":" + v;
setenv(name, new_v.c_str(), true);
CTL_INF("LD_LIBRARY_PATH: " << getenv(name));
#endif
std::string p = cortex_utils::GetCurrentPath() + "/" + exe;
execl(p.c_str(), exe.c_str(), "--ignore_cout", "--config_file_path",
get_config_file_path().c_str(), "--data_folder_path",
get_data_folder_path().c_str(), "--loglevel",
luh::LogLevelStr(luh::global_log_level).c_str(), (char*)0);
} else {
// Parent process
if (!TryConnectToServer(host, port)) {
return false;
}
std::cout << "Server started" << std::endl;
std::cout << "API Documentation available at: http://" << host << ":"
<< port << std::endl;
}

#endif
return true;
}
Expand Down
Loading

0 comments on commit acc5675

Please sign in to comment.