1
use std::borrow::Cow;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::schema::{CollectionName, ViewName};
5
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
6
use nebari::tree::{Root, Unversioned, Versioned};
7

            
8
use crate::{
9
    database::{document_tree_name, keyvalue::KEY_TREE},
10
    jobs::{Job, Keyed},
11
    tasks::Task,
12
    views::{
13
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
14
        view_omitted_docs_tree_name, view_versions_tree_name,
15
    },
16
    Database, Error,
17
};
18

            
19
#[derive(Debug)]
20
pub struct Compactor {
21
    pub database: Database,
22
    pub compaction: Compaction,
23
}
24

            
25
impl Compactor {
26
308
    pub fn collection(database: Database, collection: CollectionName) -> Self {
27
308
        Self {
28
308
            compaction: Compaction {
29
308
                database_name: database.name().to_string(),
30
308
                target: Target::Collection(collection),
31
308
            },
32
308
            database,
33
308
        }
34
308
    }
35
77
    pub fn database(database: Database) -> Self {
36
77
        Self {
37
77
            compaction: Compaction {
38
77
                database_name: database.name().to_string(),
39
77
                target: Target::Database,
40
77
            },
41
77
            database,
42
77
        }
43
77
    }
44
154
    pub fn keyvalue(database: Database) -> Self {
45
154
        Self {
46
154
            compaction: Compaction {
47
154
                database_name: database.name().to_string(),
48
154
                target: Target::KeyValue,
49
154
            },
50
154
            database,
51
154
        }
52
154
    }
53
}
54

            
55
1543
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
56
pub struct Compaction {
57
    database_name: String,
58
    target: Target,
59
}
60

            
61
1618
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
62
enum Target {
63
    Collection(CollectionName),
64
    KeyValue,
65
    Database,
66
}
67

            
68
impl Target {
69
539
    async fn compact(self, database: &Database) -> Result<(), Error> {
70
539
        match self {
71
308
            Target::Collection(collection) => {
72
308
                let database = database.clone();
73
308
                tokio::task::spawn_blocking(move || compact_collection(&database, &collection))
74
308
                    .await?
75
            }
76
            Target::KeyValue => {
77
154
                let database = database.clone();
78
154
                tokio::task::spawn_blocking(move || {
79
154
                    compact_tree::<Unversioned, _>(&database, KEY_TREE)
80
154
                })
81
154
                .await?
82
            }
83
            Target::Database => {
84
77
                let mut handles = FuturesUnordered::new();
85
231
                for collection in database.schematic().collections() {
86
231
                    handles.push(
87
231
                        database
88
231
                            .storage()
89
231
                            .tasks()
90
231
                            .compact_collection(database.clone(), collection)
91
231
                            .boxed(),
92
231
                    );
93
231
                }
94
77
                handles.push(
95
77
                    database
96
77
                        .storage()
97
77
                        .tasks()
98
77
                        .compact_key_value_store(database.clone())
99
77
                        .boxed(),
100
77
                );
101
385
                while let Some(result) = handles.next().await {
102
308
                    result?;
103
                }
104
77
                Ok(())
105
            }
106
        }
107
539
    }
108
}
109

            
110
#[async_trait]
111
impl Job for Compactor {
112
    type Output = ();
113

            
114
    type Error = Error;
115

            
116
1617
    #[cfg_attr(feature = "tracing", tracing::instrument)]
117
539
    async fn execute(&mut self) -> Result<Self::Output, Error> {
118
847
        self.compaction.target.clone().compact(&self.database).await
119
1078
    }
120
}
121

            
122
impl Keyed<Task> for Compactor {
123
539
    fn key(&self) -> Task {
124
539
        Task::Compaction(self.compaction.clone())
125
539
    }
126
}
127
fn compact_collection(database: &Database, collection: &CollectionName) -> Result<(), Error> {
128
    // Compact the main database file
129
308
    compact_tree::<Versioned, _>(database, document_tree_name(collection))?;
130

            
131
    // Compact the views
132
308
    if let Some(views) = database.data.schema.views_in_collection(collection) {
133
1232
        for view in views {
134
924
            compact_view(database, &view.view_name())?;
135
        }
136
    }
137
308
    compact_tree::<Unversioned, _>(database, view_versions_tree_name(collection))?;
138
308
    Ok(())
139
308
}
140

            
141
fn compact_view(database: &Database, name: &ViewName) -> Result<(), Error> {
142
924
    compact_tree::<Unversioned, _>(database, view_entries_tree_name(name))?;
143
924
    compact_tree::<Unversioned, _>(database, view_document_map_tree_name(name))?;
144
924
    compact_tree::<Unversioned, _>(database, view_invalidated_docs_tree_name(name))?;
145
924
    compact_tree::<Unversioned, _>(database, view_omitted_docs_tree_name(name))?;
146

            
147
924
    Ok(())
148
924
}
149

            
150
4466
fn compact_tree<R: Root, S: Into<Cow<'static, str>>>(
151
4466
    database: &Database,
152
4466
    name: S,
153
4466
) -> Result<(), Error> {
154
4466
    let documents = database.roots().tree(R::tree(name))?;
155
4466
    documents.compact()?;
156
4466
    Ok(())
157
4466
}