use std::borrow::Cow;
use std::collections::hash_map::RandomState;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::sync::Arc;
use bonsaidb_core::arc_bytes::serde::Bytes;
use bonsaidb_core::arc_bytes::{ArcBytes, OwnedBytes};
use bonsaidb_core::connection::Connection;
use bonsaidb_core::schema::view::{self, map, Serialized, ViewUpdatePolicy};
use bonsaidb_core::schema::{CollectionName, ViewName};
use easy_parallel::Parallel;
use nebari::io::any::AnyFile;
use nebari::tree::{AnyTreeRoot, CompareSwap, KeyOperation, Operation, Unversioned, Versioned};
use nebari::{LockedTransactionTree, Tree, UnlockedTransactionTree};
use crate::database::{deserialize_document, document_tree_name, Database};
use crate::tasks::{Job, Keyed, Task};
use crate::views::{
view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
EntryMapping, ViewEntry,
};
use crate::Error;
#[derive(Debug)]
pub struct Mapper {
pub database: Database,
pub map: Map,
}
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
pub struct Map {
pub database: Arc<Cow<'static, str>>,
pub collection: CollectionName,
pub view_name: ViewName,
}
impl Job for Mapper {
type Error = Error;
type Output = u64;
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
#[allow(clippy::too_many_lines)]
fn execute(&mut self) -> Result<Self::Output, Error> {
let documents =
self.database
.roots()
.tree(self.database.collection_tree::<Versioned, _>(
&self.map.collection,
document_tree_name(&self.map.collection),
)?)?;
let view_entries =
self.database
.roots()
.tree(self.database.collection_tree::<Unversioned, _>(
&self.map.collection,
view_entries_tree_name(&self.map.view_name),
)?)?;
let document_map =
self.database
.roots()
.tree(self.database.collection_tree::<Unversioned, _>(
&self.map.collection,
view_document_map_tree_name(&self.map.view_name),
)?)?;
let invalidated_entries =
self.database
.roots()
.tree(self.database.collection_tree::<Unversioned, _>(
&self.map.collection,
view_invalidated_docs_tree_name(&self.map.view_name),
)?)?;
let transaction_id = self
.database
.last_transaction_id()?
.expect("no way to have documents without a transaction");
let storage = self.database.clone();
let map_request = self.map.clone();
map_view(
&invalidated_entries,
&document_map,
&documents,
&view_entries,
&storage,
&map_request,
)?;
self.database.storage.instance.tasks().mark_view_updated(
self.map.database.clone(),
self.map.collection.clone(),
self.map.view_name.clone(),
transaction_id,
);
Ok(transaction_id)
}
}
fn map_view(
invalidated_entries: &Tree<Unversioned, AnyFile>,
document_map: &Tree<Unversioned, AnyFile>,
documents: &Tree<Versioned, AnyFile>,
view_entries: &Tree<Unversioned, AnyFile>,
database: &Database,
map_request: &Map,
) -> Result<(), Error> {
const CHUNK_SIZE: usize = 100_000;
let mut invalidated_ids = invalidated_entries
.get_range(&(..))?
.into_iter()
.map(|(key, _)| key)
.collect::<Vec<_>>();
while !invalidated_ids.is_empty() {
let transaction = database
.roots()
.transaction::<_, dyn AnyTreeRoot<AnyFile>>(&[
Box::new(invalidated_entries.clone()) as Box<dyn AnyTreeRoot<AnyFile>>,
Box::new(document_map.clone()),
Box::new(documents.clone()),
Box::new(view_entries.clone()),
])?;
{
let view = database
.data
.schema
.view_by_name(&map_request.view_name)
.unwrap();
let document_ids = invalidated_ids
.drain(invalidated_ids.len().saturating_sub(CHUNK_SIZE)..)
.collect::<Vec<_>>();
let document_map = transaction.unlocked_tree(1).unwrap();
let documents = transaction.unlocked_tree(2).unwrap();
let view_entries = transaction.unlocked_tree(3).unwrap();
DocumentRequest {
document_ids: document_ids.clone(),
map_request,
database,
document_map,
documents,
view_entries,
view,
}
.map()?;
let mut invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
invalidated_entries.modify(document_ids, nebari::tree::Operation::Remove)?;
}
transaction.commit()?;
}
Ok(())
}
pub struct DocumentRequest<'a> {
pub document_ids: Vec<ArcBytes<'static>>,
pub map_request: &'a Map,
pub database: &'a Database,
pub document_map: &'a UnlockedTransactionTree<AnyFile>,
pub documents: &'a UnlockedTransactionTree<AnyFile>,
pub view_entries: &'a UnlockedTransactionTree<AnyFile>,
pub view: &'a dyn Serialized,
}
type DocumentIdPayload = (ArcBytes<'static>, Option<ArcBytes<'static>>);
type BatchPayload = (Vec<ArcBytes<'static>>, flume::Receiver<DocumentIdPayload>);
impl<'a> DocumentRequest<'a> {
fn generate_batches(
batch_sender: flume::Sender<BatchPayload>,
document_ids: &[ArcBytes<'static>],
documents: &UnlockedTransactionTree<AnyFile>,
) -> Result<(), Error> {
let mut documents = documents.lock::<Versioned>();
for chunk in document_ids.chunks(1024) {
let (document_id_sender, document_id_receiver) = flume::bounded(chunk.len());
batch_sender
.send((chunk.to_vec(), document_id_receiver))
.unwrap();
let mut documents = documents.get_multiple(chunk.iter().map(ArcBytes::as_slice))?;
documents.sort_by(|a, b| a.0.cmp(&b.0));
for document_id in chunk.iter().rev() {
let document = documents
.last()
.map_or(false, |(key, _)| (key == document_id))
.then(|| documents.pop().unwrap().1);
document_id_sender
.send((document_id.clone(), document))
.unwrap();
}
drop(document_id_sender);
}
drop(batch_sender);
Ok(())
}
fn map_batches(
batch_receiver: &flume::Receiver<BatchPayload>,
mapped_sender: flume::Sender<Batch>,
view: &dyn Serialized,
parallelization: usize,
) -> Result<(), Error> {
while let Ok((document_ids, document_id_receiver)) = batch_receiver.recv() {
let mut batch = Batch {
document_ids,
..Batch::default()
};
for result in Parallel::new()
.each(1..=parallelization, |_| -> Result<_, Error> {
let mut results = Vec::new();
while let Ok((document_id, document)) = document_id_receiver.recv() {
let map_result = if let Some(document) = document {
let document = deserialize_document(&document)?;
view.map(&document).map_err(bonsaidb_core::Error::from)?
} else {
Vec::new()
};
let keys: HashSet<OwnedBytes> = map_result
.iter()
.map(|map| OwnedBytes::from(map.key.as_slice()))
.collect();
let new_keys = ArcBytes::from(bincode::serialize(&keys)?);
results.push((document_id, new_keys, keys, map_result));
}
Ok(results)
})
.run()
{
for (document_id, new_keys, keys, map_result) in result? {
for key in &keys {
batch.all_keys.insert(key.0.clone());
}
batch.document_maps.insert(document_id.clone(), new_keys);
batch.document_keys.insert(document_id.clone(), keys);
for mapping in map_result {
let key_mappings = batch
.new_mappings
.entry(ArcBytes::from(mapping.key.to_vec()))
.or_insert_with(Vec::default);
key_mappings.push(mapping);
}
}
}
mapped_sender.send(batch).unwrap();
}
drop(mapped_sender);
Ok(())
}
fn update_document_map(
document_ids: Vec<ArcBytes<'static>>,
document_map: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
document_maps: &BTreeMap<ArcBytes<'static>, ArcBytes<'static>>,
mut document_keys: BTreeMap<ArcBytes<'static>, HashSet<OwnedBytes>>,
all_keys: &mut BTreeSet<ArcBytes<'static>>,
) -> Result<BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>, Error> {
let mut maps_to_clear = Vec::new();
document_map.modify(
document_ids,
nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |key, value| {
if let Some(existing_map) = value {
maps_to_clear.push((key.to_owned(), existing_map));
}
let new_map = document_maps.get(key).unwrap();
KeyOperation::Set(new_map.clone())
})),
)?;
let mut view_entries_to_clean = BTreeMap::new();
for (document_id, existing_map) in maps_to_clear {
let existing_keys = bincode::deserialize::<HashSet<OwnedBytes>>(&existing_map)?;
let new_keys = document_keys.remove(&document_id).unwrap();
for key in existing_keys.difference(&new_keys) {
all_keys.insert(key.clone().0);
let key_documents = view_entries_to_clean
.entry(key.clone().0)
.or_insert_with(HashSet::<_, RandomState>::default);
key_documents.insert(document_id.clone());
}
}
Ok(view_entries_to_clean)
}
fn update_view_entries(
view: &dyn Serialized,
map_request: &Map,
view_entries: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
all_keys: BTreeSet<ArcBytes<'static>>,
view_entries_to_clean: BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>,
new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
) -> Result<(), Error> {
let mut updater = ViewEntryUpdater {
view,
map_request,
view_entries_to_clean,
new_mappings,
result: Ok(()),
has_reduce: true,
};
view_entries
.modify(
all_keys.into_iter().collect(),
Operation::CompareSwap(CompareSwap::new(&mut |key, view_entries| {
updater.compare_swap_view_entry(key, view_entries)
})),
)
.map_err(Error::from)
.and(updater.result)
}
fn save_mappings(
mapped_receiver: &flume::Receiver<Batch>,
view: &dyn Serialized,
map_request: &Map,
document_map: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
view_entries: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
) -> Result<(), Error> {
while let Ok(Batch {
document_ids,
document_maps,
document_keys,
new_mappings,
mut all_keys,
}) = mapped_receiver.recv()
{
let view_entries_to_clean = Self::update_document_map(
document_ids,
document_map,
&document_maps,
document_keys,
&mut all_keys,
)?;
Self::update_view_entries(
view,
map_request,
view_entries,
all_keys,
view_entries_to_clean,
new_mappings,
)?;
}
Ok(())
}
pub fn map(&mut self) -> Result<(), Error> {
let (batch_sender, batch_receiver) = flume::bounded(1);
let (mapped_sender, mapped_receiver) = flume::bounded(1);
for result in Parallel::new()
.add(|| Self::generate_batches(batch_sender, &self.document_ids, self.documents))
.add(|| {
Self::map_batches(
&batch_receiver,
mapped_sender,
self.view,
self.database.storage().parallelization(),
)
})
.add(|| {
let mut document_map = self.document_map.lock();
let mut view_entries = self.view_entries.lock();
Self::save_mappings(
&mapped_receiver,
self.view,
self.map_request,
&mut document_map,
&mut view_entries,
)
})
.run()
{
result?;
}
Ok(())
}
}
#[derive(Default)]
struct Batch {
document_ids: Vec<ArcBytes<'static>>,
document_maps: BTreeMap<ArcBytes<'static>, ArcBytes<'static>>,
document_keys: BTreeMap<ArcBytes<'static>, HashSet<OwnedBytes>>,
new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
all_keys: BTreeSet<ArcBytes<'static>>,
}
impl Keyed<Task> for Mapper {
fn key(&self) -> Task {
Task::ViewMap(self.map.clone())
}
}
struct ViewEntryUpdater<'a> {
view: &'a dyn Serialized,
map_request: &'a Map,
view_entries_to_clean: BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>,
new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
result: Result<(), Error>,
has_reduce: bool,
}
impl<'a> ViewEntryUpdater<'a> {
fn compare_swap_view_entry(
&mut self,
key: &ArcBytes<'_>,
view_entries: Option<ArcBytes<'static>>,
) -> KeyOperation<ArcBytes<'static>> {
let mut view_entry = view_entries
.and_then(|view_entries| bincode::deserialize::<ViewEntry>(&view_entries).ok())
.unwrap_or_else(|| ViewEntry {
key: Bytes::from(key.to_vec()),
view_version: self.view.version(),
mappings: vec![],
reduced_value: Bytes::default(),
});
let key = key.to_owned();
if let Some(document_ids) = self.view_entries_to_clean.remove(&key) {
view_entry
.mappings
.retain(|m| !document_ids.contains(m.source.id.as_ref()));
if view_entry.mappings.is_empty() && !self.new_mappings.contains_key(&key[..]) {
return KeyOperation::Remove;
} else if self.has_reduce {
let mappings = view_entry
.mappings
.iter()
.map(|m| (&key[..], m.value.as_slice()))
.collect::<Vec<_>>();
match self.view.reduce(&mappings, false) {
Ok(reduced) => {
view_entry.reduced_value = Bytes::from(reduced);
}
Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
self.has_reduce = false;
}
Err(other) => {
self.result = Err(Error::from(other));
return KeyOperation::Skip;
}
}
}
}
if let Some(new_mappings) = self.new_mappings.remove(&key[..]) {
for map::Serialized { source, value, .. } in new_mappings {
if self.view.update_policy() == ViewUpdatePolicy::Unique
&& !view_entry.mappings.is_empty()
&& view_entry.mappings[0].source.id != source.id
{
self.result = Err(Error::Core(bonsaidb_core::Error::UniqueKeyViolation {
view: self.map_request.view_name.clone(),
conflicting_document: Box::new(source),
existing_document: Box::new(view_entry.mappings[0].source.clone()),
}));
return KeyOperation::Skip;
}
let entry_mapping = EntryMapping { source, value };
let mut found = false;
for mapping in &mut view_entry.mappings {
if mapping.source.id == entry_mapping.source.id {
found = true;
mapping.source.revision = entry_mapping.source.revision;
mapping.value = entry_mapping.value.clone();
break;
}
}
if !found {
view_entry.mappings.push(entry_mapping);
}
}
if self.has_reduce {
let mappings = view_entry
.mappings
.iter()
.map(|m| (key.as_slice(), m.value.as_slice()))
.collect::<Vec<_>>();
match self.view.reduce(&mappings, false) {
Ok(reduced) => {
view_entry.reduced_value = Bytes::from(reduced);
}
Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
self.has_reduce = false;
}
Err(other) => {
self.result = Err(Error::from(other));
return KeyOperation::Skip;
}
}
}
}
let value = bincode::serialize(&view_entry).unwrap();
KeyOperation::Set(ArcBytes::from(value))
}
}