1
use std::borrow::Cow;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::schema::{CollectionName, ViewName};
5
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
6
use nebari::tree::{Root, Unversioned, Versioned};
7

            
8
use crate::{
9
    database::{document_tree_name, keyvalue::KEY_TREE},
10
    jobs::{Job, Keyed},
11
    tasks::Task,
12
    views::{
13
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
14
        view_omitted_docs_tree_name, view_versions_tree_name,
15
    },
16
    Database, Error,
17
};
18

            
19
#[derive(Debug)]
20
pub struct Compactor {
21
    pub database: Database,
22
    pub compaction: Compaction,
23
}
24

            
25
impl Compactor {
26
268
    pub fn collection(database: Database, collection: CollectionName) -> Self {
27
268
        Self {
28
268
            compaction: Compaction {
29
268
                database_name: database.name().to_string(),
30
268
                target: Target::Collection(collection),
31
268
            },
32
268
            database,
33
268
        }
34
268
    }
35
67
    pub fn database(database: Database) -> Self {
36
67
        Self {
37
67
            compaction: Compaction {
38
67
                database_name: database.name().to_string(),
39
67
                target: Target::Database,
40
67
            },
41
67
            database,
42
67
        }
43
67
    }
44
134
    pub fn keyvalue(database: Database) -> Self {
45
134
        Self {
46
134
            compaction: Compaction {
47
134
                database_name: database.name().to_string(),
48
134
                target: Target::KeyValue,
49
134
            },
50
134
            database,
51
134
        }
52
134
    }
53
}
54

            
55
1363
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
56
pub struct Compaction {
57
    database_name: String,
58
    target: Target,
59
}
60

            
61
1407
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
62
enum Target {
63
    Collection(CollectionName),
64
    KeyValue,
65
    Database,
66
}
67

            
68
impl Target {
69
469
    async fn compact(self, database: &Database) -> Result<(), Error> {
70
469
        match self {
71
268
            Target::Collection(collection) => {
72
268
                let database = database.clone();
73
268
                tokio::task::spawn_blocking(move || compact_collection(&database, &collection))
74
246
                    .await?
75
            }
76
            Target::KeyValue => {
77
134
                let database = database.clone();
78
134
                tokio::task::spawn_blocking(move || {
79
134
                    compact_tree::<Unversioned, _>(&database, KEY_TREE)
80
134
                })
81
134
                .await?
82
            }
83
            Target::Database => {
84
67
                let mut handles = FuturesUnordered::new();
85
201
                for collection in database.schematic().collections() {
86
201
                    handles.push(
87
201
                        database
88
201
                            .storage()
89
201
                            .tasks()
90
201
                            .compact_collection(database.clone(), collection)
91
201
                            .boxed(),
92
201
                    );
93
201
                }
94
67
                handles.push(
95
67
                    database
96
67
                        .storage()
97
67
                        .tasks()
98
67
                        .compact_key_value_store(database.clone())
99
67
                        .boxed(),
100
67
                );
101
335
                while let Some(result) = handles.next().await {
102
268
                    result?;
103
                }
104
67
                Ok(())
105
            }
106
        }
107
469
    }
108
}
109

            
110
#[async_trait]
111
impl Job for Compactor {
112
    type Output = ();
113

            
114
    type Error = Error;
115

            
116
1407
    #[cfg_attr(feature = "tracing", tracing::instrument)]
117
469
    async fn execute(&mut self) -> Result<Self::Output, Error> {
118
715
        self.compaction.target.clone().compact(&self.database).await
119
938
    }
120
}
121

            
122
impl Keyed<Task> for Compactor {
123
469
    fn key(&self) -> Task {
124
469
        Task::Compaction(self.compaction.clone())
125
469
    }
126
}
127
fn compact_collection(database: &Database, collection: &CollectionName) -> Result<(), Error> {
128
    // Compact the main database file
129
268
    compact_tree::<Versioned, _>(database, document_tree_name(collection))?;
130

            
131
    // Compact the views
132
268
    if let Some(views) = database.data.schema.views_in_collection(collection) {
133
1072
        for view in views {
134
804
            compact_view(database, &view.view_name())?;
135
        }
136
    }
137
268
    compact_tree::<Unversioned, _>(database, view_versions_tree_name(collection))?;
138
268
    Ok(())
139
268
}
140

            
141
fn compact_view(database: &Database, name: &ViewName) -> Result<(), Error> {
142
804
    compact_tree::<Unversioned, _>(database, view_entries_tree_name(name))?;
143
804
    compact_tree::<Unversioned, _>(database, view_document_map_tree_name(name))?;
144
804
    compact_tree::<Unversioned, _>(database, view_invalidated_docs_tree_name(name))?;
145
804
    compact_tree::<Unversioned, _>(database, view_omitted_docs_tree_name(name))?;
146

            
147
804
    Ok(())
148
804
}
149

            
150
3886
fn compact_tree<R: Root, S: Into<Cow<'static, str>>>(
151
3886
    database: &Database,
152
3886
    name: S,
153
3886
) -> Result<(), Error> {
154
3886
    let documents = database.roots().tree(R::tree(name))?;
155
3886
    documents.compact()?;
156
3886
    Ok(())
157
3886
}