use std::borrow::{Borrow, Cow};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::Infallible;
use std::ops::{self, Deref};
use std::sync::Arc;
use std::u8;
use bonsaidb_core::arc_bytes::serde::CowBytes;
use bonsaidb_core::arc_bytes::ArcBytes;
use bonsaidb_core::connection::{
self, AccessPolicy, Connection, HasSchema, HasSession, LowLevelConnection, Range,
SerializedQueryKey, Session, Sort, StorageConnection,
};
#[cfg(any(feature = "encryption", feature = "compression"))]
use bonsaidb_core::document::KeyId;
use bonsaidb_core::document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision};
use bonsaidb_core::keyvalue::{KeyOperation, Output, Timestamp};
use bonsaidb_core::limits::{
LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS,
};
use bonsaidb_core::permissions::bonsai::{
collection_resource_name, database_resource_name, document_resource_name, kv_resource_name,
view_resource_name, BonsaiAction, DatabaseAction, DocumentAction, TransactionAction,
ViewAction,
};
use bonsaidb_core::permissions::Permissions;
use bonsaidb_core::schema::view::map::MappedSerializedValue;
use bonsaidb_core::schema::view::{self};
use bonsaidb_core::schema::{self, CollectionName, Schema, Schematic, ViewName};
use bonsaidb_core::transaction::{
self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
Transaction,
};
use itertools::Itertools;
use nebari::io::any::AnyFile;
use nebari::tree::{
AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
Unversioned, Versioned,
};
use nebari::{AbortError, ExecutingTransaction, Roots, Tree};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use watchable::Watchable;
use crate::config::{Builder, KeyValuePersistence, StorageConfiguration};
use crate::database::keyvalue::BackgroundWorkerProcessTarget;
use crate::error::Error;
use crate::open_trees::OpenTrees;
use crate::storage::StorageLock;
#[cfg(feature = "encryption")]
use crate::storage::TreeVault;
use crate::views::{
mapper, view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
ViewEntry,
};
use crate::Storage;
pub mod keyvalue;
pub(crate) mod compat;
pub mod pubsub;
#[derive(Debug, Clone)]
pub struct Database {
pub(crate) data: Arc<Data>,
pub(crate) storage: Storage,
}
#[derive(Debug)]
pub struct Data {
pub name: Arc<Cow<'static, str>>,
context: Context,
pub(crate) schema: Arc<Schematic>,
}
impl Database {
pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
name: S,
context: Context,
storage: &Storage,
) -> Result<Self, Error> {
let name = name.into();
let schema = Arc::new(DB::schematic()?);
let db = Self {
storage: storage.clone(),
data: Arc::new(Data {
name: Arc::new(name),
context,
schema,
}),
};
if storage.instance.check_view_integrity_on_database_open() {
for view in db.data.schema.views() {
storage.instance.tasks().spawn_integrity_check(view, &db);
}
}
storage
.instance
.tasks()
.spawn_key_value_expiration_loader(&db);
Ok(db)
}
#[must_use]
pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
self.storage
.with_effective_permissions(effective_permissions)
.map(|storage| Self {
storage,
data: self.data.clone(),
})
}
pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
let storage = Storage::open(configuration.with_schema::<DB>()?)?;
Ok(storage.create_database::<DB>("default", true)?)
}
#[must_use]
pub fn schematic(&self) -> &'_ Schematic {
&self.data.schema
}
pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
&self.data.context.roots
}
fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
&self,
view: &dyn view::Serialized,
key: Option<SerializedQueryKey>,
order: Sort,
limit: Option<u32>,
access_policy: AccessPolicy,
mut callback: F,
) -> Result<(), bonsaidb_core::Error> {
if matches!(access_policy, AccessPolicy::UpdateBefore) {
self.storage
.instance
.tasks()
.update_view_if_needed(view, self, true)?;
} else if let Some(integrity_check) = self
.storage
.instance
.tasks()
.spawn_integrity_check(view, self)
{
integrity_check
.receive()
.map_err(Error::from)?
.map_err(Error::from)?;
}
let view_entries = self
.roots()
.tree(self.collection_tree(
&view.collection(),
view_entries_tree_name(&view.view_name()),
)?)
.map_err(Error::from)?;
{
for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
callback(entry)?;
}
}
if matches!(access_policy, AccessPolicy::UpdateAfter) {
let db = self.clone();
let view_name = view.view_name();
let view = db
.data
.schema
.view_by_name(&view_name)
.expect("query made with view that isn't registered with this database");
db.storage
.instance
.tasks()
.update_view_if_needed(view, &db, false)?;
}
Ok(())
}
fn open_trees_for_transaction(&self, transaction: &Transaction) -> Result<OpenTrees, Error> {
let mut open_trees = OpenTrees::default();
for op in &transaction.operations {
if self
.data
.schema
.collection_primary_key_description(&op.collection)
.is_none()
{
return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
}
#[cfg(any(feature = "encryption", feature = "compression"))]
let vault = if let Some(encryption_key) =
self.collection_encryption_key(&op.collection).cloned()
{
#[cfg(feature = "encryption")]
if let Some(mut vault) = self.storage().tree_vault().cloned() {
vault.key = Some(encryption_key);
Some(vault)
} else {
TreeVault::new_if_needed(
Some(encryption_key),
self.storage().vault(),
#[cfg(feature = "compression")]
None,
)
}
#[cfg(not(feature = "encryption"))]
{
drop(encryption_key);
return Err(Error::EncryptionDisabled);
}
} else {
self.storage().tree_vault().cloned()
};
open_trees.open_trees_for_document_change(
&op.collection,
&self.data.schema,
#[cfg(any(feature = "encryption", feature = "compression"))]
vault,
);
}
Ok(open_trees)
}
fn apply_transaction_to_roots(
&self,
transaction: &Transaction,
) -> Result<Vec<OperationResult>, Error> {
let open_trees = self.open_trees_for_transaction(transaction)?;
let mut roots_transaction = self
.data
.context
.roots
.transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
let mut results = Vec::new();
let mut changed_documents = Vec::new();
let mut collection_indexes = HashMap::new();
let mut collections = Vec::new();
for op in &transaction.operations {
let result = self.execute_operation(
op,
&mut roots_transaction,
&open_trees.trees_index_by_name,
)?;
if let Some((collection, id, deleted)) = match &result {
OperationResult::DocumentUpdated { header, collection } => {
Some((collection, header.id.clone(), false))
}
OperationResult::DocumentDeleted { id, collection } => {
Some((collection, id.clone(), true))
}
OperationResult::Success => None,
} {
let collection = match collection_indexes.get(collection) {
Some(index) => *index,
None => {
if let Ok(id) = u16::try_from(collections.len()) {
collection_indexes.insert(collection.clone(), id);
collections.push(collection.clone());
id
} else {
return Err(Error::TransactionTooLarge);
}
}
};
changed_documents.push(ChangedDocument {
collection,
id,
deleted,
});
}
results.push(result);
}
self.invalidate_changed_documents(
&mut roots_transaction,
&open_trees,
&collections,
&changed_documents,
)?;
roots_transaction
.entry_mut()
.set_data(compat::serialize_executed_transaction_changes(
&Changes::Documents(DocumentChanges {
collections,
documents: changed_documents,
}),
)?)?;
roots_transaction.commit()?;
Ok(results)
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn invalidate_changed_documents(
&self,
roots_transaction: &mut ExecutingTransaction<AnyFile>,
open_trees: &OpenTrees,
collections: &[CollectionName],
changed_documents: &[ChangedDocument],
) -> Result<(), Error> {
for (collection, changed_documents) in &changed_documents
.iter()
.group_by(|doc| &collections[usize::from(doc.collection)])
{
let mut views = self
.data
.schema
.views_in_collection(collection)
.filter(|view| !view.update_policy().is_eager())
.peekable();
if views.peek().is_some() {
let changed_documents = changed_documents.collect::<Vec<_>>();
for view in views {
let view_name = view.view_name();
let tree_name = view_invalidated_docs_tree_name(&view_name);
for changed_document in &changed_documents {
let mut invalidated_docs = roots_transaction
.tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
.unwrap();
invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
}
}
}
}
Ok(())
}
fn execute_operation(
&self,
operation: &Operation,
transaction: &mut ExecutingTransaction<AnyFile>,
tree_index_map: &HashMap<String, usize>,
) -> Result<OperationResult, Error> {
match &operation.command {
Command::Insert { id, contents } => {
self.execute_insert(operation, transaction, tree_index_map, id.clone(), contents)
}
Command::Update { header, contents } => self.execute_update(
operation,
transaction,
tree_index_map,
&header.id,
Some(&header.revision),
contents,
),
Command::Overwrite { id, contents } => {
self.execute_update(operation, transaction, tree_index_map, id, None, contents)
}
Command::Delete { header } => {
self.execute_delete(operation, transaction, tree_index_map, header)
}
Command::Check { id, revision } => Self::execute_check(
operation,
transaction,
tree_index_map,
id.clone(),
*revision,
),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
level = "trace",
skip(self, operation, transaction, tree_index_map, contents),
fields(
database = self.name(),
collection.name = operation.collection.name.as_ref(),
collection.authority = operation.collection.authority.as_ref()
)
)
)]
fn execute_update(
&self,
operation: &Operation,
transaction: &mut ExecutingTransaction<AnyFile>,
tree_index_map: &HashMap<String, usize>,
id: &DocumentId,
check_revision: Option<&Revision>,
contents: &[u8],
) -> Result<OperationResult, crate::Error> {
let mut documents = transaction
.tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
.unwrap();
let document_id = ArcBytes::from(id.to_vec());
let mut result = None;
let mut updated = false;
documents.modify(
vec![document_id.clone()],
nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
value: Option<
ArcBytes<'_>,
>| {
if let Some(old) = value {
let doc = match deserialize_document(&old) {
Ok(doc) => doc,
Err(err) => {
result = Some(Err(err));
return nebari::tree::KeyOperation::Skip;
}
};
if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
if let Some(updated_revision) = doc.header.revision.next_revision(contents)
{
let updated_header = Header {
id: id.clone(),
revision: updated_revision,
};
let serialized_doc = match serialize_document(&BorrowedDocument {
header: updated_header.clone(),
contents: CowBytes::from(contents),
}) {
Ok(bytes) => bytes,
Err(err) => {
result = Some(Err(Error::from(err)));
return nebari::tree::KeyOperation::Skip;
}
};
result = Some(Ok(OperationResult::DocumentUpdated {
collection: operation.collection.clone(),
header: updated_header,
}));
updated = true;
return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
}
result = Some(Ok(OperationResult::DocumentUpdated {
collection: operation.collection.clone(),
header: doc.header,
}));
} else {
result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
operation.collection.clone(),
Box::new(doc.header),
))));
}
} else if check_revision.is_none() {
let doc = BorrowedDocument::new(id.clone(), contents);
match serialize_document(&doc).map(|bytes| (doc, bytes)) {
Ok((doc, serialized)) => {
result = Some(Ok(OperationResult::DocumentUpdated {
collection: operation.collection.clone(),
header: doc.header,
}));
updated = true;
return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
}
Err(err) => {
result = Some(Err(Error::from(err)));
}
}
} else {
result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
operation.collection.clone(),
Box::new(id.clone()),
))));
}
nebari::tree::KeyOperation::Skip
})),
)?;
drop(documents);
if updated {
self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
}
result.expect("nebari should invoke the callback even when the key isn't found")
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
level = "trace",
skip(self, operation, transaction, tree_index_map, contents),
fields(
database = self.name(),
collection.name = operation.collection.name.as_ref(),
collection.authority = operation.collection.authority.as_ref()
)
)
)]
fn execute_insert(
&self,
operation: &Operation,
transaction: &mut ExecutingTransaction<AnyFile>,
tree_index_map: &HashMap<String, usize>,
id: Option<DocumentId>,
contents: &[u8],
) -> Result<OperationResult, Error> {
let mut documents = transaction
.tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
.unwrap();
let id = if let Some(id) = id {
id
} else if let Some(last_key) = documents.last_key()? {
let id = DocumentId::try_from(last_key.as_slice())?;
self.data
.schema
.next_id_for_collection(&operation.collection, Some(id))?
} else {
self.data
.schema
.next_id_for_collection(&operation.collection, None)?
};
let doc = BorrowedDocument::new(id, contents);
let serialized: Vec<u8> = serialize_document(&doc)?;
let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
if let Some(document) = documents.replace(document_id.clone(), serialized)? {
let doc = deserialize_document(&document)?;
Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
operation.collection.clone(),
Box::new(doc.header),
)))
} else {
drop(documents);
self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
Ok(OperationResult::DocumentUpdated {
collection: operation.collection.clone(),
header: doc.header,
})
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, operation, transaction, tree_index_map),
fields(
database = self.name(),
collection.name = operation.collection.name.as_ref(),
collection.authority = operation.collection.authority.as_ref()
)
))]
fn execute_delete(
&self,
operation: &Operation,
transaction: &mut ExecutingTransaction<AnyFile>,
tree_index_map: &HashMap<String, usize>,
header: &Header,
) -> Result<OperationResult, Error> {
let mut documents = transaction
.tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
.unwrap();
if let Some(vec) = documents.remove(header.id.as_ref())? {
drop(documents);
let doc = deserialize_document(&vec)?;
if &doc.header == header {
self.update_eager_views(
&ArcBytes::from(doc.header.id.to_vec()),
operation,
transaction,
tree_index_map,
)?;
Ok(OperationResult::DocumentDeleted {
collection: operation.collection.clone(),
id: header.id.clone(),
})
} else {
Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
operation.collection.clone(),
Box::new(header.clone()),
)))
}
} else {
Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
operation.collection.clone(),
Box::new(header.id.clone()),
)))
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, operation, transaction, tree_index_map),
fields(
database = self.name(),
collection.name = operation.collection.name.as_ref(),
collection.authority = operation.collection.authority.as_ref()
)
))]
fn update_eager_views(
&self,
document_id: &ArcBytes<'static>,
operation: &Operation,
transaction: &mut ExecutingTransaction<AnyFile>,
tree_index_map: &HashMap<String, usize>,
) -> Result<(), Error> {
let mut eager_views = self
.data
.schema
.eager_views_in_collection(&operation.collection)
.peekable();
if eager_views.peek().is_some() {
let documents = transaction
.unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
.unwrap();
for view in eager_views {
let name = view.view_name();
let document_map = transaction
.unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
.unwrap();
let view_entries = transaction
.unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
.unwrap();
mapper::DocumentRequest {
database: self,
document_ids: vec![document_id.clone()],
map_request: &mapper::Map {
database: self.data.name.clone(),
collection: operation.collection.clone(),
view_name: name.clone(),
},
document_map,
documents,
view_entries,
view,
}
.map()?;
}
}
Ok(())
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(operation, transaction, tree_index_map),
fields(
collection.name = operation.collection.name.as_ref(),
collection.authority = operation.collection.authority.as_ref(),
),
))]
fn execute_check(
operation: &Operation,
transaction: &mut ExecutingTransaction<AnyFile>,
tree_index_map: &HashMap<String, usize>,
id: DocumentId,
revision: Option<Revision>,
) -> Result<OperationResult, Error> {
let mut documents = transaction
.tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
.unwrap();
if let Some(vec) = documents.get(id.as_ref())? {
drop(documents);
if let Some(revision) = revision {
let doc = deserialize_document(&vec)?;
if doc.header.revision != revision {
return Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
operation.collection.clone(),
Box::new(Header { id, revision }),
)));
}
}
Ok(OperationResult::Success)
} else {
Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
operation.collection.clone(),
Box::new(id),
)))
}
}
fn create_view_iterator(
view_entries: &Tree<Unversioned, AnyFile>,
key: Option<SerializedQueryKey>,
order: Sort,
limit: Option<u32>,
) -> Result<Vec<ViewEntry>, Error> {
let mut values = Vec::new();
let forwards = match order {
Sort::Ascending => true,
Sort::Descending => false,
};
let mut values_read = 0;
if let Some(key) = key {
match key {
SerializedQueryKey::Range(range) => {
view_entries.scan::<Infallible, _, _, _, _>(
&range.map_ref(|bytes| &bytes[..]),
forwards,
|_, _, _| ScanEvaluation::ReadData,
|_, _| {
if let Some(limit) = limit {
if values_read >= limit {
return ScanEvaluation::Stop;
}
values_read += 1;
}
ScanEvaluation::ReadData
},
|_key, _index, value| {
values.push(value);
Ok(())
},
)?;
}
SerializedQueryKey::Matches(key) => {
values.extend(view_entries.get(&key)?);
}
SerializedQueryKey::Multiple(mut list) => {
list.sort();
values.extend(
view_entries
.get_multiple(list.iter().map(|bytes| bytes.as_slice()))?
.into_iter()
.map(|(_, value)| value),
);
}
}
} else {
view_entries.scan::<Infallible, _, _, _, _>(
&(..),
forwards,
|_, _, _| ScanEvaluation::ReadData,
|_, _| {
if let Some(limit) = limit {
if values_read >= limit {
return ScanEvaluation::Stop;
}
values_read += 1;
}
ScanEvaluation::ReadData
},
|_, _, value| {
values.push(value);
Ok(())
},
)?;
}
values
.into_iter()
.map(|value| bincode::deserialize(&value).map_err(Error::from))
.collect::<Result<Vec<_>, Error>>()
}
#[cfg(any(feature = "encryption", feature = "compression"))]
pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
self.schematic()
.encryption_key_for_collection(collection)
.or_else(|| self.storage.default_encryption_key())
}
#[cfg_attr(
not(feature = "encryption"),
allow(
unused_mut,
unused_variables,
clippy::unused_self,
clippy::let_and_return
)
)]
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
&self,
collection: &CollectionName,
name: S,
) -> Result<TreeRoot<R, AnyFile>, Error> {
let mut tree = R::tree(name);
#[cfg(any(feature = "encryption", feature = "compression"))]
match (
self.collection_encryption_key(collection),
self.storage().tree_vault().cloned(),
) {
(Some(override_key), Some(mut vault)) => {
#[cfg(feature = "encryption")]
{
vault.key = Some(override_key.clone());
tree = tree.with_vault(vault);
}
#[cfg(not(feature = "encryption"))]
{
return Err(Error::EncryptionDisabled);
}
}
(None, Some(vault)) => {
tree = tree.with_vault(vault);
}
(key, None) => {
#[cfg(feature = "encryption")]
if let Some(vault) = TreeVault::new_if_needed(
key.cloned(),
self.storage().vault(),
#[cfg(feature = "compression")]
None,
) {
tree = tree.with_vault(vault);
}
#[cfg(not(feature = "encryption"))]
if key.is_some() {
return Err(Error::EncryptionDisabled);
}
}
}
Ok(tree)
}
pub(crate) fn update_key_expiration<'key>(
&self,
tree_key: impl Into<Cow<'key, str>>,
expiration: Option<Timestamp>,
) {
self.data
.context
.update_key_expiration(tree_key, expiration);
}
#[cfg(feature = "async")]
#[must_use]
pub fn into_async(self) -> crate::AsyncDatabase {
self.into_async_with_runtime(tokio::runtime::Handle::current())
}
#[cfg(feature = "async")]
#[must_use]
pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
crate::AsyncDatabase {
database: self,
runtime: Arc::new(runtime),
}
}
#[cfg(feature = "async")]
#[must_use]
pub fn to_async(&self) -> crate::AsyncDatabase {
self.clone().into_async()
}
#[cfg(feature = "async")]
#[must_use]
pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
self.clone().into_async_with_runtime(runtime)
}
}
#[derive(Serialize, Deserialize)]
struct LegacyHeader {
id: u64,
revision: Revision,
}
#[derive(Serialize, Deserialize)]
struct LegacyDocument<'a> {
header: LegacyHeader,
#[serde(borrow)]
contents: &'a [u8],
}
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
Ok(document) => Ok(document),
Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
Ok(legacy_doc) => Ok(BorrowedDocument {
header: Header {
id: DocumentId::from_u64(legacy_doc.header.id),
revision: legacy_doc.header.revision,
},
contents: CowBytes::from(legacy_doc.contents),
}),
Err(_) => Err(Error::from(err)),
},
}
}
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
pot::to_vec(document)
.map_err(Error::from)
.map_err(bonsaidb_core::Error::from)
}
impl HasSession for Database {
fn session(&self) -> Option<&Session> {
self.storage.session()
}
}
impl Connection for Database {
type Storage = Storage;
fn storage(&self) -> Self::Storage {
self.storage.clone()
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self),
fields(
database = self.name(),
)
))]
fn list_executed_transactions(
&self,
starting_id: Option<u64>,
result_limit: Option<u32>,
) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
self.check_permission(
database_resource_name(self.name()),
&BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
)?;
let result_limit = usize::try_from(
result_limit
.unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
.min(LIST_TRANSACTIONS_MAX_RESULTS),
)
.unwrap();
if result_limit > 0 {
let range = if let Some(starting_id) = starting_id {
Range::from(starting_id..)
} else {
Range::from(..)
};
let mut entries = Vec::new();
self.roots()
.transactions()
.scan(range, |entry| {
if entry.data().is_some() {
entries.push(entry);
}
entries.len() < result_limit
})
.map_err(Error::from)?;
entries
.into_iter()
.map(|entry| {
if let Some(data) = entry.data() {
let changes = compat::deserialize_executed_transaction_changes(data)?;
Ok(Some(transaction::Executed {
id: entry.id,
changes,
}))
} else {
Ok(None)
}
})
.filter_map(Result::transpose)
.collect::<Result<Vec<_>, Error>>()
.map_err(bonsaidb_core::Error::from)
} else {
Ok(Vec::default())
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self),
fields(
database = self.name(),
)
))]
fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
self.check_permission(
database_resource_name(self.name()),
&BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
)?;
Ok(self.roots().transactions().current_transaction_id())
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self),
fields(
database = self.name(),
)
))]
fn compact(&self) -> Result<(), bonsaidb_core::Error> {
self.check_permission(
database_resource_name(self.name()),
&BonsaiAction::Database(DatabaseAction::Compact),
)?;
self.storage()
.instance
.tasks()
.compact_database(self.clone())?;
Ok(())
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self),
fields(
database = self.name(),
)
))]
fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
self.check_permission(
kv_resource_name(self.name()),
&BonsaiAction::Database(DatabaseAction::Compact),
)?;
self.storage()
.instance
.tasks()
.compact_key_value_store(self.clone())?;
Ok(())
}
}
impl LowLevelConnection for Database {
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, transaction),
fields(
database = self.name(),
)
))]
fn apply_transaction(
&self,
transaction: Transaction,
) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
for op in &transaction.operations {
let (resource, action) = match &op.command {
Command::Insert { .. } => (
collection_resource_name(self.name(), &op.collection),
BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
),
Command::Update { header, .. } => (
document_resource_name(self.name(), &op.collection, &header.id),
BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
),
Command::Overwrite { id, .. } => (
document_resource_name(self.name(), &op.collection, id),
BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
),
Command::Delete { header } => (
document_resource_name(self.name(), &op.collection, &header.id),
BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
),
Command::Check { id, .. } => (
document_resource_name(self.name(), &op.collection, id),
BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
),
};
self.check_permission(resource, &action)?;
}
let mut eager_view_tasks = Vec::new();
for collection_name in transaction
.operations
.iter()
.map(|op| &op.collection)
.collect::<HashSet<_>>()
{
for view in self.data.schema.eager_views_in_collection(collection_name) {
if let Some(task) = self
.storage
.instance
.tasks()
.spawn_integrity_check(view, self)
{
eager_view_tasks.push(task);
}
}
}
let mut eager_view_mapping_tasks = Vec::new();
for task in eager_view_tasks {
if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
eager_view_mapping_tasks.push(spawned_task);
}
}
for task in eager_view_mapping_tasks {
let mut task = task.lock();
if let Some(task) = task.take() {
task.receive().map_err(Error::from)?.map_err(Error::from)?;
}
}
self.apply_transaction_to_roots(&transaction)
.map_err(bonsaidb_core::Error::from)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, collection),
fields(
database = self.name(),
collection.name = collection.name.as_ref(),
collection.authority = collection.authority.as_ref(),
)
))]
fn get_from_collection(
&self,
id: DocumentId,
collection: &CollectionName,
) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
self.check_permission(
document_resource_name(self.name(), collection, &id),
&BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
)?;
let tree = self
.data
.context
.roots
.tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
.map_err(Error::from)?;
if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
Ok(Some(deserialize_document(&vec)?.into_owned()))
} else {
Ok(None)
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, collection),
fields(
database = self.name(),
collection.name = collection.name.as_ref(),
collection.authority = collection.authority.as_ref(),
)
))]
fn list_from_collection(
&self,
ids: Range<DocumentId>,
sort: Sort,
limit: Option<u32>,
collection: &CollectionName,
) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
self.check_permission(
collection_resource_name(self.name(), collection),
&BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
)?;
let tree = self
.data
.context
.roots
.tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
.map_err(Error::from)?;
let mut found_docs = Vec::new();
let mut keys_read = 0;
let ids = DocumentIdRange(ids);
tree.scan(
&ids.borrow_as_bytes(),
match sort {
Sort::Ascending => true,
Sort::Descending => false,
},
|_, _, _| ScanEvaluation::ReadData,
|_, _| {
if let Some(limit) = limit {
if keys_read >= limit {
return ScanEvaluation::Stop;
}
keys_read += 1;
}
ScanEvaluation::ReadData
},
|_, _, doc| {
found_docs.push(
deserialize_document(&doc)
.map(BorrowedDocument::into_owned)
.map_err(AbortError::Other)?,
);
Ok(())
},
)
.map_err(|err| match err {
AbortError::Other(err) => err,
AbortError::Nebari(err) => crate::Error::from(err),
})?;
Ok(found_docs)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, collection),
fields(
database = self.name(),
collection.name = collection.name.as_ref(),
collection.authority = collection.authority.as_ref(),
)
))]
fn list_headers_from_collection(
&self,
ids: Range<DocumentId>,
sort: Sort,
limit: Option<u32>,
collection: &CollectionName,
) -> Result<Vec<Header>, bonsaidb_core::Error> {
self.check_permission(
collection_resource_name(self.name(), collection),
&BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
)?;
let tree = self
.data
.context
.roots
.tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
.map_err(Error::from)?;
let mut found_headers = Vec::new();
let mut keys_read = 0;
let ids = DocumentIdRange(ids);
tree.scan(
&ids.borrow_as_bytes(),
match sort {
Sort::Ascending => true,
Sort::Descending => false,
},
|_, _, _| ScanEvaluation::ReadData,
|_, _| {
if let Some(limit) = limit {
if keys_read >= limit {
return ScanEvaluation::Stop;
}
keys_read += 1;
}
ScanEvaluation::ReadData
},
|_, _, doc| {
found_headers.push(
deserialize_document(&doc)
.map(|doc| doc.header)
.map_err(AbortError::Other)?,
);
Ok(())
},
)
.map_err(|err| match err {
AbortError::Other(err) => err,
AbortError::Nebari(err) => crate::Error::from(err),
})?;
Ok(found_headers)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, collection),
fields(
database = self.name(),
collection.name = collection.name.as_ref(),
collection.authority = collection.authority.as_ref(),
)
))]
fn count_from_collection(
&self,
ids: Range<DocumentId>,
collection: &CollectionName,
) -> Result<u64, bonsaidb_core::Error> {
self.check_permission(
collection_resource_name(self.name(), collection),
&BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
)?;
let tree = self
.data
.context
.roots
.tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
.map_err(Error::from)?;
let ids = DocumentIdRange(ids);
let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;
Ok(stats.alive_keys)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, collection),
fields(
database = self.name(),
collection.name = collection.name.as_ref(),
collection.authority = collection.authority.as_ref(),
)
))]
fn get_multiple_from_collection(
&self,
ids: &[DocumentId],
collection: &CollectionName,
) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
for id in ids {
self.check_permission(
document_resource_name(self.name(), collection, id),
&BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
)?;
}
let mut ids = ids.to_vec();
let collection = collection.clone();
let tree = self
.data
.context
.roots
.tree(
self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
)
.map_err(Error::from)?;
ids.sort();
let keys_and_values = tree
.get_multiple(ids.iter().map(|id| id.as_ref()))
.map_err(Error::from)?;
keys_and_values
.into_iter()
.map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
.collect::<Result<Vec<_>, Error>>()
.map_err(bonsaidb_core::Error::from)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, collection),
fields(
database = self.name(),
collection.name = collection.name.as_ref(),
collection.authority = collection.authority.as_ref(),
)
))]
fn compact_collection_by_name(
&self,
collection: CollectionName,
) -> Result<(), bonsaidb_core::Error> {
self.check_permission(
collection_resource_name(self.name(), &collection),
&BonsaiAction::Database(DatabaseAction::Compact),
)?;
self.storage()
.instance
.tasks()
.compact_collection(self.clone(), collection)?;
Ok(())
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, view),
fields(
database = self.name(),
view.collection.name = view.collection.name.as_ref(),
view.collection.authority = view.collection.authority.as_ref(),
view.name = view.name.as_ref(),
)
))]
fn query_by_name(
&self,
view: &ViewName,
key: Option<SerializedQueryKey>,
order: Sort,
limit: Option<u32>,
access_policy: AccessPolicy,
) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
let view = self.schematic().view_by_name(view)?;
self.check_permission(
view_resource_name(self.name(), &view.view_name()),
&BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
)?;
let mut results = Vec::new();
self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
for mapping in entry.mappings {
results.push(bonsaidb_core::schema::view::map::Serialized {
source: mapping.source,
key: entry.key.clone(),
value: mapping.value,
});
}
Ok(())
})?;
Ok(results)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, view),
fields(
database = self.name(),
view.collection.name = view.collection.name.as_ref(),
view.collection.authority = view.collection.authority.as_ref(),
view.name = view.name.as_ref(),
)
))]
fn query_by_name_with_docs(
&self,
view: &ViewName,
key: Option<SerializedQueryKey>,
order: Sort,
limit: Option<u32>,
access_policy: AccessPolicy,
) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
let results = self.query_by_name(view, key, order, limit, access_policy)?;
let view = self.schematic().view_by_name(view).unwrap(); let documents = self
.get_multiple_from_collection(
&results
.iter()
.map(|m| m.source.id.clone())
.collect::<Vec<_>>(),
&view.collection(),
)?
.into_iter()
.map(|doc| (doc.header.id.clone(), doc))
.collect::<BTreeMap<_, _>>();
Ok(
bonsaidb_core::schema::view::map::MappedSerializedDocuments {
mappings: results,
documents,
},
)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, view_name),
fields(
database = self.name(),
view.collection.name = view_name.collection.name.as_ref(),
view.collection.authority = view_name.collection.authority.as_ref(),
view.name = view_name.name.as_ref(),
)
))]
fn reduce_by_name(
&self,
view_name: &ViewName,
key: Option<SerializedQueryKey>,
access_policy: AccessPolicy,
) -> Result<Vec<u8>, bonsaidb_core::Error> {
let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;
let result = if mappings.len() == 1 {
mappings.pop().unwrap().value.into_vec()
} else {
let view = self.data.schema.view_by_name(view_name)?;
view.reduce(
&mappings
.iter()
.map(|map| (map.key.as_ref(), map.value.as_ref()))
.collect::<Vec<_>>(),
true,
)
.map_err(Error::from)?
};
Ok(result)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, view_name),
fields(
database = self.name(),
view.collection.name = view_name.collection.name.as_ref(),
view.collection.authority = view_name.collection.authority.as_ref(),
view.name = view_name.name.as_ref(),
)
))]
fn reduce_grouped_by_name(
&self,
view_name: &ViewName,
key: Option<SerializedQueryKey>,
access_policy: AccessPolicy,
) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
let view = self.data.schema.view_by_name(view_name)?;
self.check_permission(
view_resource_name(self.name(), &view.view_name()),
&BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
)?;
let mut mappings = Vec::new();
self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
mappings.push(MappedSerializedValue {
key: entry.key,
value: entry.reduced_value,
});
Ok(())
})?;
Ok(mappings)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, view),
fields(
database = self.name(),
view.collection.name = view.collection.name.as_ref(),
view.collection.authority = view.collection.authority.as_ref(),
view.name = view.name.as_ref(),
)
))]
fn delete_docs_by_name(
&self,
view: &ViewName,
key: Option<SerializedQueryKey>,
access_policy: AccessPolicy,
) -> Result<u64, bonsaidb_core::Error> {
let view = self.data.schema.view_by_name(view)?;
let collection = view.collection();
let mut transaction = Transaction::default();
self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
for mapping in entry.mappings {
transaction.push(Operation::delete(collection.clone(), mapping.source));
}
Ok(())
})?;
let results = LowLevelConnection::apply_transaction(self, transaction)?;
Ok(results.len() as u64)
}
}
impl HasSchema for Database {
fn schematic(&self) -> &Schematic {
&self.data.schema
}
}
type ViewIterator<'a> =
Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;
struct ViewEntryCollectionIterator<'a> {
iterator: ViewIterator<'a>,
}
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
type Item = Result<ViewEntry, crate::Error>;
fn next(&mut self) -> Option<Self::Item> {
self.iterator.next().map(|item| {
item.map_err(crate::Error::from)
.and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct Context {
data: Arc<ContextData>,
}
impl Deref for Context {
type Target = ContextData;
fn deref(&self) -> &Self::Target {
&self.data
}
}
#[derive(Debug)]
pub(crate) struct ContextData {
pub(crate) roots: Roots<AnyFile>,
key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
}
impl Borrow<Roots<AnyFile>> for Context {
fn borrow(&self) -> &Roots<AnyFile> {
&self.data.roots
}
}
impl Context {
pub(crate) fn new(
roots: Roots<AnyFile>,
key_value_persistence: KeyValuePersistence,
storage_lock: Option<StorageLock>,
) -> Self {
let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
let mut background_worker_target_watcher = background_worker_target.watch();
let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
key_value_persistence,
roots.clone(),
background_worker_target,
)));
let background_worker_state = Arc::downgrade(&key_value_state);
let context = Self {
data: Arc::new(ContextData {
roots,
key_value_state,
}),
};
std::thread::Builder::new()
.name(String::from("keyvalue-worker"))
.spawn(move || {
keyvalue::background_worker(
&background_worker_state,
&mut background_worker_target_watcher,
storage_lock,
);
})
.unwrap();
context
}
pub(crate) fn perform_kv_operation(
&self,
op: KeyOperation,
) -> Result<Output, bonsaidb_core::Error> {
let mut state = self.data.key_value_state.lock();
state.perform_kv_operation(op, &self.data.key_value_state)
}
pub(crate) fn update_key_expiration<'key>(
&self,
tree_key: impl Into<Cow<'key, str>>,
expiration: Option<Timestamp>,
) {
let mut state = self.data.key_value_state.lock();
state.update_key_expiration(tree_key, expiration);
}
#[cfg(test)]
pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
let state = self.data.key_value_state.lock();
state.persistence_watcher()
}
}
impl Drop for ContextData {
fn drop(&mut self) {
if let Some(shutdown) = {
let mut state = self.key_value_state.lock();
state.shutdown(&self.key_value_state)
} {
let _: Result<_, _> = shutdown.recv();
}
}
}
pub fn document_tree_name(collection: &CollectionName) -> String {
format!("collection.{collection:#}")
}
pub struct DocumentIdRange(Range<DocumentId>);
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
BorrowedRange {
start: match &self.0.start {
connection::Bound::Unbounded => ops::Bound::Unbounded,
connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
},
end: match &self.0.end {
connection::Bound::Unbounded => ops::Bound::Unbounded,
connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
},
}
}
}
pub trait DatabaseNonBlocking {
#[must_use]
fn name(&self) -> &str;
}
impl DatabaseNonBlocking for Database {
fn name(&self) -> &str {
self.data.name.as_ref()
}
}