1
use std::borrow::Cow;
2
use std::collections::HashSet;
3
use std::convert::Infallible;
4
use std::hash::Hash;
5
use std::sync::Arc;
6

            
7
use bonsaidb_core::document::DocumentId;
8
use bonsaidb_core::schema::{CollectionName, ViewName};
9
use nebari::io::any::AnyFile;
10
use nebari::tree::{Operation, ScanEvaluation, Unversioned, Versioned};
11
use nebari::{ArcBytes, Roots, Tree};
12
use parking_lot::Mutex;
13
use serde::{Deserialize, Serialize};
14

            
15
use super::mapper::{Map, Mapper};
16
use super::{view_invalidated_docs_tree_name, view_versions_tree_name};
17
use crate::database::{document_tree_name, Database};
18
use crate::tasks::handle::Handle;
19
use crate::tasks::{Job, Keyed, Task};
20
use crate::views::{view_document_map_tree_name, view_entries_tree_name};
21
use crate::Error;
22

            
23
#[derive(Debug)]
24
pub struct IntegrityScanner {
25
    pub database: Database,
26
    pub scan: IntegrityScan,
27
}
28

            
29
46384
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
30
pub struct IntegrityScan {
31
    pub view_version: u64,
32
    pub database: Arc<Cow<'static, str>>,
33
    pub collection: CollectionName,
34
    pub view_name: ViewName,
35
}
36

            
37
pub type OptionalViewMapHandle = Option<Arc<Mutex<Option<Handle<u64, Error>>>>>;
38

            
39
impl Job for IntegrityScanner {
40
    type Error = Error;
41
    type Output = OptionalViewMapHandle;
42

            
43
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
44
    #[allow(clippy::too_many_lines)]
45
18593
    fn execute(&mut self) -> Result<Self::Output, Self::Error> {
46
18593
        let documents =
47
18593
            self.database
48
18593
                .roots()
49
18593
                .tree(self.database.collection_tree::<Versioned, _>(
50
18593
                    &self.scan.collection,
51
18593
                    document_tree_name(&self.scan.collection),
52
18593
                )?)?;
53

            
54
18593
        let view_versions_tree = self.database.collection_tree::<Unversioned, _>(
55
18593
            &self.scan.collection,
56
18593
            view_versions_tree_name(&self.scan.collection),
57
18593
        )?;
58
18593
        let view_versions = self.database.roots().tree(view_versions_tree.clone())?;
59

            
60
18593
        let view_name = self.scan.view_name.clone();
61
18593
        let view_version = self.scan.view_version;
62
18593
        let roots = self.database.roots().clone();
63
18593
        let version = view_versions
64
18593
            .get(view_name.to_string().as_bytes())?
65
18593
            .and_then(|version| ViewVersion::from_bytes(&version).ok())
66
18593
            .unwrap_or_default();
67
18593

            
68
18593
        // Remove any old files that are no longer used.
69
18593
        version.cleanup(&roots, &view_name)?;
70

            
71
18593
        let task = if version.is_current(view_version) {
72
908
            None
73
        } else {
74
            // The view isn't the current version, queue up all documents.
75
17685
            let missing_entries = tree_keys::<Versioned>(&documents)?;
76
            // When a version is updated, we can make no guarantees about
77
            // existing keys. The best we can do is delete the existing files so
78
            // that the view starts fresh.
79
17685
            roots.delete_tree(view_invalidated_docs_tree_name(&self.scan.view_name))?;
80
17685
            roots.delete_tree(view_entries_tree_name(&self.scan.view_name))?;
81
17685
            roots.delete_tree(view_document_map_tree_name(&self.scan.view_name))?;
82
            // Add all missing entries to the invalidated list. The view
83
            // mapping job will update them on the next pass.
84
17685
            let invalidated_entries_tree = self.database.collection_tree::<Unversioned, _>(
85
17685
                &self.scan.collection,
86
17685
                view_invalidated_docs_tree_name(&self.scan.view_name),
87
17685
            )?;
88

            
89
17685
            let transaction = roots.transaction(&[invalidated_entries_tree, view_versions_tree])?;
90
            {
91
17685
                let mut view_versions = transaction.tree::<Unversioned>(1).unwrap();
92
17685
                view_versions.set(
93
17685
                    view_name.to_string().as_bytes().to_vec(),
94
17685
                    ViewVersion::current_for(view_version).to_vec()?,
95
                )?;
96
17685
                let mut invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
97
17685
                let mut missing_entries = missing_entries
98
17685
                    .into_iter()
99
83565
                    .map(|id| ArcBytes::from(id.to_vec()))
100
17685
                    .collect::<Vec<_>>();
101
17685
                missing_entries.sort();
102
17685
                invalidated_entries.modify(missing_entries, Operation::Set(ArcBytes::default()))?;
103
            }
104
17685
            transaction.commit()?;
105

            
106
17685
            Some(Arc::new(Mutex::new(Some(
107
17685
                self.database
108
17685
                    .storage
109
17685
                    .instance
110
17685
                    .tasks()
111
17685
                    .jobs
112
17685
                    .lookup_or_enqueue(Mapper {
113
17685
                        database: self.database.clone(),
114
17685
                        map: Map {
115
17685
                            database: self.database.data.name.clone(),
116
17685
                            collection: self.scan.collection.clone(),
117
17685
                            view_name: self.scan.view_name.clone(),
118
17685
                        },
119
17685
                    }),
120
17685
            ))))
121
        };
122

            
123
18593
        self.database
124
18593
            .storage
125
18593
            .instance
126
18593
            .tasks()
127
18593
            .mark_integrity_check_complete(
128
18593
                self.database.data.name.clone(),
129
18593
                self.scan.collection.clone(),
130
18593
                self.scan.view_name.clone(),
131
18593
            );
132
18593

            
133
18593
        Ok(task)
134
18593
    }
135
}
136

            
137
17685
#[derive(Serialize, Deserialize, Debug, Default)]
138
pub struct ViewVersion {
139
    internal_version: u8,
140
    schema_version: u64,
141
}
142

            
143
impl ViewVersion {
144
    const CURRENT_VERSION: u8 = 3;
145

            
146
923
    pub fn from_bytes(bytes: &[u8]) -> Result<Self, crate::Error> {
147
923
        match pot::from_slice(bytes) {
148
916
            Ok(version) => Ok(version),
149
7
            Err(err) if matches!(err, pot::Error::NotAPot) && bytes.len() == 8 => {
150
                let mut be_bytes = [0_u8; 8];
151
                be_bytes.copy_from_slice(bytes);
152
                let schema_version = u64::from_be_bytes(be_bytes);
153
                Ok(Self {
154
                    internal_version: 0,
155
                    schema_version,
156
                })
157
            }
158
7
            Err(err) => Err(crate::Error::from(err)),
159
        }
160
923
    }
161

            
162
17685
    pub fn to_vec(&self) -> Result<Vec<u8>, crate::Error> {
163
17685
        pot::to_vec(self).map_err(crate::Error::from)
164
17685
    }
165

            
166
17685
    pub fn current_for(schema_version: u64) -> Self {
167
17685
        Self {
168
17685
            internal_version: Self::CURRENT_VERSION,
169
17685
            schema_version,
170
17685
        }
171
17685
    }
172

            
173
18593
    pub fn is_current(&self, schema_version: u64) -> bool {
174
18593
        self.internal_version == Self::CURRENT_VERSION && self.schema_version == schema_version
175
18593
    }
176

            
177
18593
    pub fn cleanup(&self, roots: &Roots<AnyFile>, view: &ViewName) -> Result<(), crate::Error> {
178
18593
        if self.internal_version < 2 {
179
            // omitted entries was removed
180
17684
            roots.delete_tree(format!("view.{view:#}.omitted"))?;
181
909
        }
182
18593
        Ok(())
183
18593
    }
184
}
185

            
186
17685
fn tree_keys<R: nebari::tree::Root>(
187
17685
    tree: &Tree<R, AnyFile>,
188
17685
) -> Result<HashSet<DocumentId>, crate::Error> {
189
17685
    let mut ids = Vec::new();
190
17685
    tree.scan::<Infallible, _, _, _, _>(
191
17685
        &(..),
192
17685
        true,
193
23121
        |_, _, _| ScanEvaluation::ReadData,
194
83565
        |key, _| {
195
83407
            ids.push(key.clone());
196
83407
            ScanEvaluation::Skip
197
83565
        },
198
17685
        |_, _, _| unreachable!(),
199
17685
    )?;
200

            
201
17685
    Ok(ids
202
17685
        .into_iter()
203
83565
        .map(|key| DocumentId::try_from(key.as_slice()))
204
17685
        .collect::<Result<HashSet<_>, bonsaidb_core::Error>>()?)
205
17685
}
206

            
207
impl Keyed<Task> for IntegrityScanner {
208
20745
    fn key(&self) -> Task {
209
20745
        Task::IntegrityScan(self.scan.clone())
210
20745
    }
211
}
212

            
213
// The reason we use jobs like this is to make sure we can tweak how much is
214
// happening at any given time.
215
//
216
// On the Server level, we'll need to cooperate with all the databases in a
217
// shared pool of workers. So, we need to come up with a design for the view
218
// updaters to work within this limitation.
219
//
220
// Integrity scan is simple: Have a shared structure on Database that keeps track
221
// of all integrity scan results. It can check for an existing value and return,
222
// or make you wait until the job is finished. For views, I suppose the best
223
// that can be done is a similar approach, but the indexer's output is the last
224
// transaction id it synced. When a request comes in, a check can be done if
225
// there are any docs outdated, if so, the client can get the current transaction id
226
// and ask the ViewScanning service manager to wait until that txid is scanned.
227
//
228
// The view can then scan and return the results it finds with confidence it was updated to that time.
229
// If new requests come in while the current batch is being caught up to,