Skip to content

Commit

Permalink
Fix the prepare_for_closing system
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Dec 1, 2024
1 parent 36a785b commit de53bec
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 19 deletions.
342 changes: 331 additions & 11 deletions heed/src/envs/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ pub struct Env {
}

impl Env {
pub(crate) fn new(env_ptr: NonNull<MDB_env>, path: PathBuf) -> Env {
Env {
inner: Arc::new(EnvInner {
env_ptr,
path,
signal_event: Arc::new(SignalEvent::manual(false)),
}),
}
pub(crate) fn new(
env_ptr: NonNull<MDB_env>,
path: PathBuf,
signal_event: Arc<SignalEvent>,
) -> Env {
Env { inner: Arc::new(EnvInner { env_ptr, path, signal_event }) }
}

pub(crate) fn env_mut_ptr(&self) -> NonNull<ffi::MDB_env> {
Expand Down Expand Up @@ -492,7 +490,6 @@ impl fmt::Debug for Env {
}
}

#[derive(Clone)]
pub(crate) struct EnvInner {
env_ptr: NonNull<MDB_env>,
signal_event: Arc<SignalEvent>,
Expand All @@ -501,9 +498,332 @@ pub(crate) struct EnvInner {

impl Drop for EnvInner {
fn drop(&mut self) {
unsafe { ffi::mdb_env_close(self.env_ptr.as_mut()) };
let mut lock = OPENED_ENV.write().unwrap();
let removed = lock.remove(&self.path);
debug_assert!(removed);
debug_assert!(removed.is_some());
unsafe { ffi::mdb_env_close(self.env_ptr.as_mut()) };
self.signal_event.signal();
}
}

#[cfg(test)]
mod tests {
use std::io::ErrorKind;
use std::time::Duration;
use std::{fs, thread};

use crate::types::*;
use crate::{env_closing_event, EnvOpenOptions, Error};

#[test]
fn close_env() {
let dir = tempfile::tempdir().unwrap();
let env = unsafe {
EnvOpenOptions::new()
.map_size(10 * 1024 * 1024) // 10MB
.max_dbs(30)
.open(dir.path())
.unwrap()
};

// Force a thread to keep the env for 1 second.
let env_cloned = env.clone();
thread::spawn(move || {
let _env = env_cloned;
thread::sleep(Duration::from_secs(1));
});

let mut wtxn = env.write_txn().unwrap();
let db = env.create_database::<Str, Str>(&mut wtxn, None).unwrap();
wtxn.commit().unwrap();

// Create an ordered list of keys...
let mut wtxn = env.write_txn().unwrap();
db.put(&mut wtxn, "hello", "hello").unwrap();
db.put(&mut wtxn, "world", "world").unwrap();

let mut iter = db.iter(&wtxn).unwrap();
assert_eq!(iter.next().transpose().unwrap(), Some(("hello", "hello")));
assert_eq!(iter.next().transpose().unwrap(), Some(("world", "world")));
assert_eq!(iter.next().transpose().unwrap(), None);
drop(iter);

wtxn.commit().unwrap();

let signal_event = env.prepare_for_closing();

eprintln!("waiting for the env to be closed");
signal_event.wait();
eprintln!("env closed successfully");

// Make sure we don't have a reference to the env
assert!(env_closing_event(dir.path()).is_none());
}

#[test]
fn reopen_env_with_different_options_is_err() {
let dir = tempfile::tempdir().unwrap();
let _env = unsafe {
EnvOpenOptions::new()
.map_size(10 * 1024 * 1024) // 10MB
.open(dir.path())
.unwrap()
};

let result = unsafe {
EnvOpenOptions::new()
.map_size(12 * 1024 * 1024) // 12MB
.open(dir.path())
};

assert!(matches!(result, Err(Error::EnvAlreadyOpened)));
}

#[test]
fn open_env_with_named_path() {
let dir = tempfile::tempdir().unwrap();
fs::create_dir_all(dir.path().join("babar.mdb")).unwrap();
let _env = unsafe {
EnvOpenOptions::new()
.map_size(10 * 1024 * 1024) // 10MB
.open(dir.path().join("babar.mdb"))
.unwrap()
};

let error = unsafe {
EnvOpenOptions::new()
.map_size(10 * 1024 * 1024) // 10MB
.open(dir.path().join("babar.mdb"))
.unwrap_err()
};

assert!(matches!(error, Error::EnvAlreadyOpened));
}

#[test]
#[cfg(not(windows))]
fn open_database_with_writemap_flag() {
let dir = tempfile::tempdir().unwrap();
let mut envbuilder = EnvOpenOptions::new();
envbuilder.map_size(10 * 1024 * 1024); // 10MB
envbuilder.max_dbs(10);
unsafe { envbuilder.flags(crate::EnvFlags::WRITE_MAP) };
let env = unsafe { envbuilder.open(dir.path()).unwrap() };

let mut wtxn = env.write_txn().unwrap();
let _db = env.create_database::<Str, Str>(&mut wtxn, Some("my-super-db")).unwrap();
wtxn.commit().unwrap();
}

#[test]
fn open_database_with_nosubdir() {
let dir = tempfile::tempdir().unwrap();
let mut envbuilder = EnvOpenOptions::new();
unsafe { envbuilder.flags(crate::EnvFlags::NO_SUB_DIR) };
let _env = unsafe { envbuilder.open(dir.path().join("data.mdb")).unwrap() };
}

#[test]
fn create_database_without_commit() {
let dir = tempfile::tempdir().unwrap();
let env = unsafe {
EnvOpenOptions::new()
.map_size(10 * 1024 * 1024) // 10MB
.max_dbs(10)
.open(dir.path())
.unwrap()
};

let mut wtxn = env.write_txn().unwrap();
let _db = env.create_database::<Str, Str>(&mut wtxn, Some("my-super-db")).unwrap();
wtxn.abort();

let rtxn = env.read_txn().unwrap();
let option = env.open_database::<Str, Str>(&rtxn, Some("my-super-db")).unwrap();
assert!(option.is_none());
}

#[test]
fn open_already_existing_database() {
let dir = tempfile::tempdir().unwrap();
let env = unsafe {
EnvOpenOptions::new()
.map_size(10 * 1024 * 1024) // 10MB
.max_dbs(10)
.open(dir.path())
.unwrap()
};

// we first create a database
let mut wtxn = env.write_txn().unwrap();
let _db = env.create_database::<Str, Str>(&mut wtxn, Some("my-super-db")).unwrap();
wtxn.commit().unwrap();

// Close the environement and reopen it, databases must not be loaded in memory.
env.prepare_for_closing().wait();
let env = unsafe {
EnvOpenOptions::new()
.map_size(10 * 1024 * 1024) // 10MB
.max_dbs(10)
.open(dir.path())
.unwrap()
};

let rtxn = env.read_txn().unwrap();
let option = env.open_database::<Str, Str>(&rtxn, Some("my-super-db")).unwrap();
assert!(option.is_some());
}

#[test]
fn resize_database() {
let dir = tempfile::tempdir().unwrap();
let page_size = page_size::get();
let env = unsafe {
EnvOpenOptions::new().map_size(9 * page_size).max_dbs(1).open(dir.path()).unwrap()
};

let mut wtxn = env.write_txn().unwrap();
let db = env.create_database::<Str, Str>(&mut wtxn, Some("my-super-db")).unwrap();
wtxn.commit().unwrap();

let mut wtxn = env.write_txn().unwrap();
for i in 0..64 {
db.put(&mut wtxn, &i.to_string(), "world").unwrap();
}
wtxn.commit().unwrap();

let mut wtxn = env.write_txn().unwrap();
for i in 64..128 {
db.put(&mut wtxn, &i.to_string(), "world").unwrap();
}
wtxn.commit().expect_err("cannot commit a transaction that would reach the map size limit");

unsafe {
env.resize(10 * page_size).unwrap();
}
let mut wtxn = env.write_txn().unwrap();
for i in 64..128 {
db.put(&mut wtxn, &i.to_string(), "world").unwrap();
}
wtxn.commit().expect("transaction should commit after resizing the map size");

assert_eq!(10 * page_size, env.info().map_size);
}

/// Non-regression test for
/// <https://github.com/meilisearch/heed/issues/183>
///
/// We should be able to open database Read-Only Env with
/// no prior Read-Write Env opening. And query data.
#[test]
fn open_read_only_without_no_env_opened_before() {
let expected_data0 = "Data Expected db0";
let dir = tempfile::tempdir().unwrap();

{
// We really need this env to be dropped before the read-only access.
let env = unsafe {
EnvOpenOptions::new()
.map_size(16 * 1024 * 1024 * 1024) // 10MB
.max_dbs(32)
.open(dir.path())
.unwrap()
};
let mut wtxn = env.write_txn().unwrap();
let database0 = env.create_database::<Str, Str>(&mut wtxn, Some("shared0")).unwrap();

wtxn.commit().unwrap();
let mut wtxn = env.write_txn().unwrap();
database0.put(&mut wtxn, "shared0", expected_data0).unwrap();
wtxn.commit().unwrap();
// We also really need that no other env reside in memory in other thread doing tests.
env.prepare_for_closing().wait();
}

{
// Open now we do a read-only opening
let env = unsafe {
EnvOpenOptions::new()
.map_size(16 * 1024 * 1024 * 1024) // 10MB
.max_dbs(32)
.open(dir.path())
.unwrap()
};
let database0 = {
let rtxn = env.read_txn().unwrap();
let database0 =
env.open_database::<Str, Str>(&rtxn, Some("shared0")).unwrap().unwrap();
// This commit is mandatory if not committed you might get
// Io(Os { code: 22, kind: InvalidInput, message: "Invalid argument" })
rtxn.commit().unwrap();
database0
};

{
// If we didn't committed the opening it might fail with EINVAL.
let rtxn = env.read_txn().unwrap();
let value = database0.get(&rtxn, "shared0").unwrap().unwrap();
assert_eq!(value, expected_data0);
}

env.prepare_for_closing().wait();
}

// To avoid reintroducing the bug let's try to open again but without the commit
{
// Open now we do a read-only opening
let env = unsafe {
EnvOpenOptions::new()
.map_size(16 * 1024 * 1024 * 1024) // 10MB
.max_dbs(32)
.open(dir.path())
.unwrap()
};
let database0 = {
let rtxn = env.read_txn().unwrap();
let database0 =
env.open_database::<Str, Str>(&rtxn, Some("shared0")).unwrap().unwrap();
// No commit it's important, dropping explicitly
drop(rtxn);
database0
};

{
// We didn't committed the opening we will get EINVAL.
let rtxn = env.read_txn().unwrap();
// The dbg!() is intentional in case of a change in rust-std or in lmdb related
// to the windows error.
let err = dbg!(database0.get(&rtxn, "shared0"));

// The error kind is still ErrorKind Uncategorized on windows.
// Behind it's a ERROR_BAD_COMMAND code 22 like EINVAL.
if cfg!(windows) {
assert!(err.is_err());
} else {
assert!(
matches!(err, Err(Error::Io(ref e)) if e.kind() == ErrorKind::InvalidInput)
);
}
}

env.prepare_for_closing().wait();
}
}

#[test]
fn max_key_size() {
let dir = tempfile::tempdir().unwrap();
let env = unsafe { EnvOpenOptions::new().open(dir.path().join(dir.path())).unwrap() };
let maxkeysize = env.max_key_size();

eprintln!("maxkeysize: {}", maxkeysize);

if cfg!(feature = "longer-keys") {
// Should be larger than the default of 511
assert!(maxkeysize > 511);
} else {
// Should be the default of 511
assert_eq!(maxkeysize, 511);
}
}
}
11 changes: 7 additions & 4 deletions heed/src/envs/env_open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use std::io::ErrorKind::NotFound;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{io, ptr};

#[cfg(master3)]
use aead::{generic_array::typenum::Unsigned, AeadCore, AeadMutInPlace, Key, KeyInit};
use synchronoise::SignalEvent;

#[cfg(master3)]
use super::encrypted_env::{encrypt_func_wrapper, EncryptedEnv};
Expand Down Expand Up @@ -297,7 +299,7 @@ impl EnvOpenOptions {
Ok(path) => path,
};

if lock.contains(&path) {
if lock.contains_key(&path) {
Err(Error::EnvAlreadyOpened)
} else {
let path_str = CString::new(path.as_os_str().as_bytes()).unwrap();
Expand Down Expand Up @@ -350,9 +352,10 @@ impl EnvOpenOptions {
match mdb_result(result) {
Ok(()) => {
let env_ptr = NonNull::new(env).unwrap();
let inserted = lock.insert(path.clone());
debug_assert!(inserted);
Ok(Env::new(env_ptr, path))
let signal_event = Arc::new(SignalEvent::manual(false));
let inserted = lock.insert(path.clone(), signal_event.clone());
debug_assert!(inserted.is_none());
Ok(Env::new(env_ptr, path, signal_event))
}
Err(e) => {
ffi::mdb_env_close(env);
Expand Down
Loading

0 comments on commit de53bec

Please sign in to comment.