1
use std::borrow::Cow;
2

            
3
use bonsaidb_core::connection::Connection;
4
use bonsaidb_core::schema::CollectionName;
5
use nebari::tree::{Root, Unversioned, Versioned};
6

            
7
use crate::database::keyvalue::KEY_TREE;
8
use crate::database::{document_tree_name, DatabaseNonBlocking};
9
use crate::tasks::{Job, Keyed, Task};
10
use crate::views::{
11
    view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
12
    view_versions_tree_name,
13
};
14
use crate::{Database, Error};
15

            
16
#[derive(Debug)]
17
pub struct Compactor {
18
    pub database: Database,
19
    pub compaction: Compaction,
20
}
21

            
22
impl Compactor {
23
8880
    pub fn target(database: Database, target: Target) -> Self {
24
8880
        Self {
25
8880
            compaction: Compaction {
26
8880
                database_name: database.name().to_string(),
27
8880
                target,
28
8880
            },
29
8880
            database,
30
8880
        }
31
8880
    }
32

            
33
148
    pub fn collection(database: Database, collection: CollectionName) -> Self {
34
148
        Self::target(database, Target::Collection(collection))
35
148
    }
36

            
37
148
    pub fn database(database: Database) -> Self {
38
148
        Self::target(database, Target::Database)
39
148
    }
40

            
41
148
    pub fn keyvalue(database: Database) -> Self {
42
148
        Self::target(database, Target::KeyValue)
43
148
    }
44
}
45

            
46
30076
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
47
pub struct Compaction {
48
    database_name: String,
49
    target: Target,
50
}
51

            
52
30076
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
53
pub enum Target {
54
    VersionedTree(String),
55
    UnversionedTree(String),
56
    Collection(CollectionName),
57
    KeyValue,
58
    Database,
59
}
60

            
61
impl Target {
62
8880
    fn compact(self, database: &Database) -> Result<(), Error> {
63
8880
        match self {
64
7696
            Target::UnversionedTree(name) => compact_tree::<Unversioned, _>(database, name),
65
592
            Target::VersionedTree(name) => compact_tree::<Versioned, _>(database, name),
66
148
            Target::Collection(collection) => {
67
148
                let mut trees = Vec::new();
68
148
                gather_collection_trees(database, &collection, &mut trees);
69
148
                compact_trees(database, trees)
70
            }
71
296
            Target::KeyValue => compact_tree::<Unversioned, _>(database, KEY_TREE),
72
            Target::Database => {
73
148
                let mut trees = Vec::new();
74
444
                for collection in database.schematic().collections() {
75
444
                    gather_collection_trees(database, collection, &mut trees);
76
444
                }
77
148
                trees.push(Target::KeyValue);
78
148
                compact_trees(database, trees)
79
            }
80
        }
81
8880
    }
82
}
83

            
84
impl Job for Compactor {
85
    type Error = Error;
86
    type Output = ();
87

            
88
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
89
8880
    fn execute(&mut self) -> Result<Self::Output, Error> {
90
8880
        self.compaction.target.clone().compact(&self.database)
91
8880
    }
92
}
93

            
94
impl Keyed<Task> for Compactor {
95
8880
    fn key(&self) -> Task {
96
8880
        Task::Compaction(self.compaction.clone())
97
8880
    }
98
}
99

            
100
592
fn gather_collection_trees(
101
592
    database: &Database,
102
592
    collection: &CollectionName,
103
592
    trees: &mut Vec<Target>,
104
592
) {
105
592
    trees.push(Target::VersionedTree(document_tree_name(collection)));
106
592
    trees.push(Target::UnversionedTree(view_versions_tree_name(collection)));
107

            
108
2368
    for view in database.data.schema.views_in_collection(collection) {
109
2368
        let name = view.view_name();
110
2368
        trees.push(Target::UnversionedTree(view_entries_tree_name(&name)));
111
2368
        trees.push(Target::UnversionedTree(view_document_map_tree_name(&name)));
112
2368
        trees.push(Target::UnversionedTree(view_invalidated_docs_tree_name(
113
2368
            &name,
114
2368
        )));
115
2368
    }
116
592
}
117

            
118
296
fn compact_trees(database: &Database, targets: Vec<Target>) -> Result<(), Error> {
119
296
    // Enqueue all the jobs
120
296
    let handles = targets
121
296
        .into_iter()
122
8436
        .map(|target| {
123
8436
            database
124
8436
                .storage()
125
8436
                .instance
126
8436
                .tasks()
127
8436
                .spawn_compact_target(database.clone(), target)
128
8436
        })
129
296
        .collect::<Vec<_>>();
130
    // Wait for them to finish.
131
8732
    for handle in handles {
132
8436
        handle.receive()??;
133
    }
134
296
    Ok(())
135
296
}
136

            
137
8584
fn compact_tree<R: Root, S: Into<Cow<'static, str>>>(
138
8584
    database: &Database,
139
8584
    name: S,
140
8584
) -> Result<(), Error> {
141
8584
    let documents = database.roots().tree(R::tree(name))?;
142
8584
    documents.compact()?;
143
8584
    Ok(())
144
8584
}