1
use std::borrow::Cow;
2

            
3
use bonsaidb_core::{connection::Connection, schema::CollectionName};
4
use nebari::tree::{Root, Unversioned, Versioned};
5

            
6
use crate::{
7
    database::{document_tree_name, keyvalue::KEY_TREE, DatabaseNonBlocking},
8
    tasks::{Job, Keyed, Task},
9
    views::{
10
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
11
        view_versions_tree_name,
12
    },
13
    Database, Error,
14
};
15

            
16
#[derive(Debug)]
17
pub struct Compactor {
18
    pub database: Database,
19
    pub compaction: Compaction,
20
}
21

            
22
impl Compactor {
23
5952
    pub fn target(database: Database, target: Target) -> Self {
24
5952
        Self {
25
5952
            compaction: Compaction {
26
5952
                database_name: database.name().to_string(),
27
5952
                target,
28
5952
            },
29
5952
            database,
30
5952
        }
31
5952
    }
32

            
33
124
    pub fn collection(database: Database, collection: CollectionName) -> Self {
34
124
        Self::target(database, Target::Collection(collection))
35
124
    }
36

            
37
124
    pub fn database(database: Database) -> Self {
38
124
        Self::target(database, Target::Database)
39
124
    }
40

            
41
124
    pub fn keyvalue(database: Database) -> Self {
42
124
        Self::target(database, Target::KeyValue)
43
124
    }
44
}
45

            
46
20812
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
47
pub struct Compaction {
48
    database_name: String,
49
    target: Target,
50
}
51

            
52
20812
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
53
pub enum Target {
54
    VersionedTree(String),
55
    UnversionedTree(String),
56
    Collection(CollectionName),
57
    KeyValue,
58
    Database,
59
}
60

            
61
impl Target {
62
5952
    fn compact(self, database: &Database) -> Result<(), Error> {
63
5952
        match self {
64
4960
            Target::UnversionedTree(name) => compact_tree::<Unversioned, _>(database, name),
65
496
            Target::VersionedTree(name) => compact_tree::<Versioned, _>(database, name),
66
124
            Target::Collection(collection) => {
67
124
                let mut trees = Vec::new();
68
124
                gather_collection_trees(database, &collection, &mut trees);
69
124
                compact_trees(database, trees)
70
            }
71
248
            Target::KeyValue => compact_tree::<Unversioned, _>(database, KEY_TREE),
72
            Target::Database => {
73
124
                let mut trees = Vec::new();
74
372
                for collection in database.schematic().collections() {
75
372
                    gather_collection_trees(database, &collection, &mut trees);
76
372
                }
77
124
                trees.push(Target::KeyValue);
78
124
                compact_trees(database, trees)
79
            }
80
        }
81
5952
    }
82
}
83

            
84
impl Job for Compactor {
85
    type Output = ();
86

            
87
    type Error = Error;
88

            
89
11904
    #[cfg_attr(feature = "tracing", tracing::instrument)]
90
5952
    fn execute(&mut self) -> Result<Self::Output, Error> {
91
        self.compaction.target.clone().compact(&self.database)
92
    }
93
}
94

            
95
impl Keyed<Task> for Compactor {
96
5952
    fn key(&self) -> Task {
97
5952
        Task::Compaction(self.compaction.clone())
98
5952
    }
99
}
100

            
101
496
fn gather_collection_trees(
102
496
    database: &Database,
103
496
    collection: &CollectionName,
104
496
    trees: &mut Vec<Target>,
105
496
) {
106
496
    trees.push(Target::VersionedTree(document_tree_name(collection)));
107
496
    trees.push(Target::UnversionedTree(view_versions_tree_name(collection)));
108

            
109
496
    if let Some(views) = database.data.schema.views_in_collection(collection) {
110
1984
        for view in views {
111
1488
            let name = view.view_name();
112
1488
            trees.push(Target::UnversionedTree(view_entries_tree_name(&name)));
113
1488
            trees.push(Target::UnversionedTree(view_document_map_tree_name(&name)));
114
1488
            trees.push(Target::UnversionedTree(view_invalidated_docs_tree_name(
115
1488
                &name,
116
1488
            )));
117
1488
        }
118
    }
119
496
}
120

            
121
248
fn compact_trees(database: &Database, targets: Vec<Target>) -> Result<(), Error> {
122
248
    // Enqueue all the jobs
123
248
    let handles = targets
124
248
        .into_iter()
125
5580
        .map(|target| {
126
5580
            database
127
5580
                .storage()
128
5580
                .instance
129
5580
                .tasks()
130
5580
                .spawn_compact_target(database.clone(), target)
131
5580
        })
132
248
        .collect::<Vec<_>>();
133
    // Wait for them to finish.
134
5828
    for handle in handles {
135
5580
        handle.receive()??;
136
    }
137
248
    Ok(())
138
248
}
139

            
140
5704
fn compact_tree<R: Root, S: Into<Cow<'static, str>>>(
141
5704
    database: &Database,
142
5704
    name: S,
143
5704
) -> Result<(), Error> {
144
5704
    let documents = database.roots().tree(R::tree(name))?;
145
5704
    documents.compact()?;
146
5704
    Ok(())
147
5704
}