1
use std::borrow::Cow;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::schema::{CollectionName, ViewName};
5
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
6
use nebari::tree::{Root, Unversioned, Versioned};
7

            
8
use crate::{
9
    database::{document_tree_name, keyvalue::KEY_TREE},
10
    tasks::{Job, Keyed, Task},
11
    views::{
12
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
13
        view_versions_tree_name,
14
    },
15
    Database, Error,
16
};
17

            
18
#[derive(Debug)]
19
pub struct Compactor {
20
    pub database: Database,
21
    pub compaction: Compaction,
22
}
23

            
24
impl Compactor {
25
320
    pub fn collection(database: Database, collection: CollectionName) -> Self {
26
320
        Self {
27
320
            compaction: Compaction {
28
320
                database_name: database.name().to_string(),
29
320
                target: Target::Collection(collection),
30
320
            },
31
320
            database,
32
320
        }
33
320
    }
34
80
    pub fn database(database: Database) -> Self {
35
80
        Self {
36
80
            compaction: Compaction {
37
80
                database_name: database.name().to_string(),
38
80
                target: Target::Database,
39
80
            },
40
80
            database,
41
80
        }
42
80
    }
43
160
    pub fn keyvalue(database: Database) -> Self {
44
160
        Self {
45
160
            compaction: Compaction {
46
160
                database_name: database.name().to_string(),
47
160
                target: Target::KeyValue,
48
160
            },
49
160
            database,
50
160
        }
51
160
    }
52
}
53

            
54
1629
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
55
pub struct Compaction {
56
    database_name: String,
57
    target: Target,
58
}
59

            
60
1681
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
61
enum Target {
62
    Collection(CollectionName),
63
    KeyValue,
64
    Database,
65
}
66

            
67
impl Target {
68
560
    async fn compact(self, database: &Database) -> Result<(), Error> {
69
560
        match self {
70
320
            Target::Collection(collection) => {
71
320
                let database = database.clone();
72
1707
                compact_collection(database.clone(), &collection).await
73
            }
74
            Target::KeyValue => {
75
160
                let database = database.clone();
76
160
                tokio::task::spawn_blocking(move || {
77
160
                    compact_tree::<Unversioned, _>(&database, KEY_TREE)
78
160
                })
79
160
                .await?
80
            }
81
            Target::Database => {
82
80
                let mut handles = FuturesUnordered::new();
83
240
                for collection in database.schematic().collections() {
84
240
                    handles.push(
85
240
                        database
86
240
                            .storage()
87
240
                            .tasks()
88
240
                            .compact_collection(database.clone(), collection)
89
240
                            .boxed(),
90
240
                    );
91
240
                }
92
80
                handles.push(
93
80
                    database
94
80
                        .storage()
95
80
                        .tasks()
96
80
                        .compact_key_value_store(database.clone())
97
80
                        .boxed(),
98
80
                );
99
400
                while let Some(result) = handles.next().await {
100
320
                    result?;
101
                }
102
80
                Ok(())
103
            }
104
        }
105
560
    }
106
}
107

            
108
#[async_trait]
109
impl Job for Compactor {
110
    type Output = ();
111

            
112
    type Error = Error;
113

            
114
1680
    #[cfg_attr(feature = "tracing", tracing::instrument)]
115
560
    async fn execute(&mut self) -> Result<Self::Output, Error> {
116
2267
        self.compaction.target.clone().compact(&self.database).await
117
1120
    }
118
}
119

            
120
impl Keyed<Task> for Compactor {
121
560
    fn key(&self) -> Task {
122
560
        Task::Compaction(self.compaction.clone())
123
560
    }
124
}
125
320
async fn compact_collection(database: Database, collection: &CollectionName) -> Result<(), Error> {
126
320
    // Compact the main database file
127
320
    let mut handles = FuturesUnordered::new();
128
320
    let task_db = database.clone();
129
320
    let document_tree_name = document_tree_name(collection);
130
320
    handles.push(tokio::task::spawn_blocking(move || {
131
320
        compact_tree::<Versioned, _>(&task_db, document_tree_name)
132
320
    }));
133

            
134
    // Compact the views
135
320
    if let Some(views) = database.data.schema.views_in_collection(collection) {
136
1280
        for view in views {
137
960
            let task_db = database.clone();
138
960
            let view_name = view.view_name();
139
960
            handles.push(tokio::task::spawn_blocking(move || {
140
960
                compact_view(&task_db, &view_name)
141
960
            }));
142
960
        }
143
    }
144

            
145
320
    let task_db = database.clone();
146
320
    let view_versions_tree_name = view_versions_tree_name(collection);
147
320
    handles.push(tokio::task::spawn_blocking(move || {
148
320
        compact_tree::<Unversioned, _>(&task_db, view_versions_tree_name)
149
320
    }));
150

            
151
1920
    while let Some(result) = handles.next().await {
152
1600
        result??;
153
    }
154

            
155
320
    Ok(())
156
320
}
157

            
158
fn compact_view(database: &Database, name: &ViewName) -> Result<(), Error> {
159
960
    compact_tree::<Unversioned, _>(database, view_entries_tree_name(name))?;
160
960
    compact_tree::<Unversioned, _>(database, view_document_map_tree_name(name))?;
161
960
    compact_tree::<Unversioned, _>(database, view_invalidated_docs_tree_name(name))?;
162

            
163
960
    Ok(())
164
960
}
165

            
166
3680
fn compact_tree<R: Root, S: Into<Cow<'static, str>>>(
167
3680
    database: &Database,
168
3680
    name: S,
169
3680
) -> Result<(), Error> {
170
3680
    let documents = database.roots().tree(R::tree(name))?;
171
3680
    documents.compact()?;
172
3680
    Ok(())
173
3680
}