1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use std::borrow::Cow;

use bonsaidb_core::connection::Connection;
use bonsaidb_core::schema::CollectionName;
use nebari::tree::{Root, Unversioned, Versioned};

use crate::database::keyvalue::KEY_TREE;
use crate::database::{document_tree_name, DatabaseNonBlocking};
use crate::tasks::{Job, Keyed, Task};
use crate::views::{
    view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
    view_versions_tree_name,
};
use crate::{Database, Error};

#[derive(Debug)]
pub struct Compactor {
    pub database: Database,
    pub compaction: Compaction,
}

impl Compactor {
    pub fn target(database: Database, target: Target) -> Self {
        Self {
            compaction: Compaction {
                database_name: database.name().to_string(),
                target,
            },
            database,
        }
    }

    pub fn collection(database: Database, collection: CollectionName) -> Self {
        Self::target(database, Target::Collection(collection))
    }

    pub fn database(database: Database) -> Self {
        Self::target(database, Target::Database)
    }

    pub fn keyvalue(database: Database) -> Self {
        Self::target(database, Target::KeyValue)
    }
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct Compaction {
    database_name: String,
    target: Target,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum Target {
    VersionedTree(String),
    UnversionedTree(String),
    Collection(CollectionName),
    KeyValue,
    Database,
}

impl Target {
    fn compact(self, database: &Database) -> Result<(), Error> {
        match self {
            Target::UnversionedTree(name) => compact_tree::<Unversioned, _>(database, name),
            Target::VersionedTree(name) => compact_tree::<Versioned, _>(database, name),
            Target::Collection(collection) => {
                let mut trees = Vec::new();
                gather_collection_trees(database, &collection, &mut trees);
                compact_trees(database, trees)
            }
            Target::KeyValue => compact_tree::<Unversioned, _>(database, KEY_TREE),
            Target::Database => {
                let mut trees = Vec::new();
                for collection in database.schematic().collections() {
                    gather_collection_trees(database, collection, &mut trees);
                }
                trees.push(Target::KeyValue);
                compact_trees(database, trees)
            }
        }
    }
}

impl Job for Compactor {
    type Error = Error;
    type Output = ();

    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn execute(&mut self) -> Result<Self::Output, Error> {
        self.compaction.target.clone().compact(&self.database)
    }
}

impl Keyed<Task> for Compactor {
    fn key(&self) -> Task {
        Task::Compaction(self.compaction.clone())
    }
}

fn gather_collection_trees(
    database: &Database,
    collection: &CollectionName,
    trees: &mut Vec<Target>,
) {
    trees.push(Target::VersionedTree(document_tree_name(collection)));
    trees.push(Target::UnversionedTree(view_versions_tree_name(collection)));

    for view in database.data.schema.views_in_collection(collection) {
        let name = view.view_name();
        trees.push(Target::UnversionedTree(view_entries_tree_name(&name)));
        trees.push(Target::UnversionedTree(view_document_map_tree_name(&name)));
        trees.push(Target::UnversionedTree(view_invalidated_docs_tree_name(
            &name,
        )));
    }
}

fn compact_trees(database: &Database, targets: Vec<Target>) -> Result<(), Error> {
    // Enqueue all the jobs
    let handles = targets
        .into_iter()
        .map(|target| {
            database
                .storage()
                .instance
                .tasks()
                .spawn_compact_target(database.clone(), target)
        })
        .collect::<Vec<_>>();
    // Wait for them to finish.
    for handle in handles {
        handle.receive()??;
    }
    Ok(())
}

fn compact_tree<R: Root, S: Into<Cow<'static, str>>>(
    database: &Database,
    name: S,
) -> Result<(), Error> {
    let documents = database.roots().tree(R::tree(name))?;
    documents.compact()?;
    Ok(())
}