1
use std::borrow::Cow;
2

            
3
use bonsaidb_core::connection::Connection;
4
use bonsaidb_core::schema::CollectionName;
5
use nebari::tree::{Root, Unversioned, Versioned};
6

            
7
use crate::database::keyvalue::KEY_TREE;
8
use crate::database::{document_tree_name, DatabaseNonBlocking};
9
use crate::tasks::{Job, Keyed, Task};
10
use crate::views::{
11
    view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
12
    view_versions_tree_name,
13
};
14
use crate::{Database, Error};
15

            
16
#[derive(Debug)]
17
pub struct Compactor {
18
    pub database: Database,
19
    pub compaction: Compaction,
20
}
21

            
22
impl Compactor {
23
9120
    pub fn target(database: Database, target: Target) -> Self {
24
9120
        Self {
25
9120
            compaction: Compaction {
26
9120
                database_name: database.name().to_string(),
27
9120
                target,
28
9120
            },
29
9120
            database,
30
9120
        }
31
9120
    }
32

            
33
152
    pub fn collection(database: Database, collection: CollectionName) -> Self {
34
152
        Self::target(database, Target::Collection(collection))
35
152
    }
36

            
37
152
    pub fn database(database: Database) -> Self {
38
152
        Self::target(database, Target::Database)
39
152
    }
40

            
41
152
    pub fn keyvalue(database: Database) -> Self {
42
152
        Self::target(database, Target::KeyValue)
43
152
    }
44
}
45

            
46
30960
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
47
pub struct Compaction {
48
    database_name: String,
49
    target: Target,
50
}
51

            
52
30960
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
53
pub enum Target {
54
    VersionedTree(String),
55
    UnversionedTree(String),
56
    Collection(CollectionName),
57
    KeyValue,
58
    Database,
59
}
60

            
61
impl Target {
62
9120
    fn compact(self, database: &Database) -> Result<(), Error> {
63
9120
        match self {
64
7904
            Target::UnversionedTree(name) => compact_tree::<Unversioned, _>(database, name),
65
608
            Target::VersionedTree(name) => compact_tree::<Versioned, _>(database, name),
66
152
            Target::Collection(collection) => {
67
152
                let mut trees = Vec::new();
68
152
                gather_collection_trees(database, &collection, &mut trees);
69
152
                compact_trees(database, trees)
70
            }
71
304
            Target::KeyValue => compact_tree::<Unversioned, _>(database, KEY_TREE),
72
            Target::Database => {
73
152
                let mut trees = Vec::new();
74
456
                for collection in database.schematic().collections() {
75
456
                    gather_collection_trees(database, collection, &mut trees);
76
456
                }
77
152
                trees.push(Target::KeyValue);
78
152
                compact_trees(database, trees)
79
            }
80
        }
81
9120
    }
82
}
83

            
84
impl Job for Compactor {
85
    type Error = Error;
86
    type Output = ();
87

            
88
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
89
9120
    fn execute(&mut self) -> Result<Self::Output, Error> {
90
9120
        self.compaction.target.clone().compact(&self.database)
91
9120
    }
92
}
93

            
94
impl Keyed<Task> for Compactor {
95
9120
    fn key(&self) -> Task {
96
9120
        Task::Compaction(self.compaction.clone())
97
9120
    }
98
}
99

            
100
608
fn gather_collection_trees(
101
608
    database: &Database,
102
608
    collection: &CollectionName,
103
608
    trees: &mut Vec<Target>,
104
608
) {
105
608
    trees.push(Target::VersionedTree(document_tree_name(collection)));
106
608
    trees.push(Target::UnversionedTree(view_versions_tree_name(collection)));
107

            
108
2432
    for view in database.data.schema.views_in_collection(collection) {
109
2432
        let name = view.view_name();
110
2432
        trees.push(Target::UnversionedTree(view_entries_tree_name(&name)));
111
2432
        trees.push(Target::UnversionedTree(view_document_map_tree_name(&name)));
112
2432
        trees.push(Target::UnversionedTree(view_invalidated_docs_tree_name(
113
2432
            &name,
114
2432
        )));
115
2432
    }
116
608
}
117

            
118
304
fn compact_trees(database: &Database, targets: Vec<Target>) -> Result<(), Error> {
119
304
    // Enqueue all the jobs
120
304
    let handles = targets
121
304
        .into_iter()
122
8664
        .map(|target| {
123
8664
            database
124
8664
                .storage()
125
8664
                .instance
126
8664
                .tasks()
127
8664
                .spawn_compact_target(database.clone(), target)
128
8664
        })
129
304
        .collect::<Vec<_>>();
130
    // Wait for them to finish.
131
8968
    for handle in handles {
132
8664
        handle.receive()??;
133
    }
134
304
    Ok(())
135
304
}
136

            
137
8816
fn compact_tree<R: Root, S: Into<Cow<'static, str>>>(
138
8816
    database: &Database,
139
8816
    name: S,
140
8816
) -> Result<(), Error> {
141
8816
    let documents = database.roots().tree(R::tree(name))?;
142
8816
    documents.compact()?;
143
8816
    Ok(())
144
8816
}