1
use std::borrow::Cow;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::schema::{CollectionName, ViewName};
5
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
6
use nebari::tree::{Root, Unversioned, Versioned};
7

            
8
use crate::{
9
    database::{document_tree_name, keyvalue::KEY_TREE},
10
    jobs::{Job, Keyed},
11
    tasks::Task,
12
    views::{
13
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
14
        view_omitted_docs_tree_name, view_versions_tree_name,
15
    },
16
    Database, Error,
17
};
18

            
19
#[derive(Debug)]
20
pub struct Compactor {
21
    pub database: Database,
22
    pub compaction: Compaction,
23
}
24

            
25
impl Compactor {
26
296
    pub fn collection(database: Database, collection: CollectionName) -> Self {
27
296
        Self {
28
296
            compaction: Compaction {
29
296
                database_name: database.name().to_string(),
30
296
                target: Target::Collection(collection),
31
296
            },
32
296
            database,
33
296
        }
34
296
    }
35
74
    pub fn database(database: Database) -> Self {
36
74
        Self {
37
74
            compaction: Compaction {
38
74
                database_name: database.name().to_string(),
39
74
                target: Target::Database,
40
74
            },
41
74
            database,
42
74
        }
43
74
    }
44
148
    pub fn keyvalue(database: Database) -> Self {
45
148
        Self {
46
148
            compaction: Compaction {
47
148
                database_name: database.name().to_string(),
48
148
                target: Target::KeyValue,
49
148
            },
50
148
            database,
51
148
        }
52
148
    }
53
}
54

            
55
1506
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
56
pub struct Compaction {
57
    database_name: String,
58
    target: Target,
59
}
60

            
61
1554
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
62
enum Target {
63
    Collection(CollectionName),
64
    KeyValue,
65
    Database,
66
}
67

            
68
impl Target {
69
518
    async fn compact(self, database: &Database) -> Result<(), Error> {
70
518
        match self {
71
296
            Target::Collection(collection) => {
72
296
                let database = database.clone();
73
296
                tokio::task::spawn_blocking(move || compact_collection(&database, &collection))
74
296
                    .await?
75
            }
76
            Target::KeyValue => {
77
148
                let database = database.clone();
78
148
                tokio::task::spawn_blocking(move || {
79
148
                    compact_tree::<Unversioned, _>(&database, KEY_TREE)
80
148
                })
81
124
                .await?
82
            }
83
            Target::Database => {
84
74
                let mut handles = FuturesUnordered::new();
85
222
                for collection in database.schematic().collections() {
86
222
                    handles.push(
87
222
                        database
88
222
                            .storage()
89
222
                            .tasks()
90
222
                            .compact_collection(database.clone(), collection)
91
222
                            .boxed(),
92
222
                    );
93
222
                }
94
74
                handles.push(
95
74
                    database
96
74
                        .storage()
97
74
                        .tasks()
98
74
                        .compact_key_value_store(database.clone())
99
74
                        .boxed(),
100
74
                );
101
370
                while let Some(result) = handles.next().await {
102
296
                    result?;
103
                }
104
74
                Ok(())
105
            }
106
        }
107
518
    }
108
}
109

            
110
#[async_trait]
111
impl Job for Compactor {
112
    type Output = ();
113

            
114
    type Error = Error;
115

            
116
1554
    #[cfg_attr(feature = "tracing", tracing::instrument)]
117
518
    async fn execute(&mut self) -> Result<Self::Output, Error> {
118
766
        self.compaction.target.clone().compact(&self.database).await
119
1036
    }
120
}
121

            
122
impl Keyed<Task> for Compactor {
123
518
    fn key(&self) -> Task {
124
518
        Task::Compaction(self.compaction.clone())
125
518
    }
126
}
127
fn compact_collection(database: &Database, collection: &CollectionName) -> Result<(), Error> {
128
    // Compact the main database file
129
296
    compact_tree::<Versioned, _>(database, document_tree_name(collection))?;
130

            
131
    // Compact the views
132
296
    if let Some(views) = database.data.schema.views_in_collection(collection) {
133
1184
        for view in views {
134
888
            compact_view(database, &view.view_name())?;
135
        }
136
    }
137
296
    compact_tree::<Unversioned, _>(database, view_versions_tree_name(collection))?;
138
296
    Ok(())
139
296
}
140

            
141
fn compact_view(database: &Database, name: &ViewName) -> Result<(), Error> {
142
888
    compact_tree::<Unversioned, _>(database, view_entries_tree_name(name))?;
143
888
    compact_tree::<Unversioned, _>(database, view_document_map_tree_name(name))?;
144
888
    compact_tree::<Unversioned, _>(database, view_invalidated_docs_tree_name(name))?;
145
888
    compact_tree::<Unversioned, _>(database, view_omitted_docs_tree_name(name))?;
146

            
147
888
    Ok(())
148
888
}
149

            
150
4292
fn compact_tree<R: Root, S: Into<Cow<'static, str>>>(
151
4292
    database: &Database,
152
4292
    name: S,
153
4292
) -> Result<(), Error> {
154
4292
    let documents = database.roots().tree(R::tree(name))?;
155
4292
    documents.compact()?;
156
4292
    Ok(())
157
4292
}