1
use std::borrow::Cow;
2
use std::collections::HashSet;
3
use std::convert::Infallible;
4
use std::hash::Hash;
5
use std::sync::Arc;
6

            
7
use bonsaidb_core::document::DocumentId;
8
use bonsaidb_core::schema::{CollectionName, ViewName};
9
use nebari::io::any::AnyFile;
10
use nebari::tree::{Operation, ScanEvaluation, Unversioned, Versioned};
11
use nebari::{ArcBytes, Roots, Tree};
12
use parking_lot::Mutex;
13
use serde::{Deserialize, Serialize};
14

            
15
use super::mapper::{Map, Mapper};
16
use super::{view_invalidated_docs_tree_name, view_versions_tree_name};
17
use crate::database::{document_tree_name, Database};
18
use crate::tasks::handle::Handle;
19
use crate::tasks::{Job, Keyed, Task};
20
use crate::views::{view_document_map_tree_name, view_entries_tree_name};
21
use crate::Error;
22

            
23
#[derive(Debug)]
24
pub struct IntegrityScanner {
25
    pub database: Database,
26
    pub scan: IntegrityScan,
27
}
28

            
29
46326
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
30
pub struct IntegrityScan {
31
    pub view_version: u64,
32
    pub database: Arc<Cow<'static, str>>,
33
    pub collection: CollectionName,
34
    pub view_name: ViewName,
35
}
36

            
37
pub type OptionalViewMapHandle = Option<Arc<Mutex<Option<Handle<u64, Error>>>>>;
38

            
39
impl Job for IntegrityScanner {
40
    type Error = Error;
41
    type Output = OptionalViewMapHandle;
42

            
43
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
44
    #[allow(clippy::too_many_lines)]
45
19099
    fn execute(&mut self) -> Result<Self::Output, Self::Error> {
46
19099
        let documents =
47
19099
            self.database
48
19099
                .roots()
49
19099
                .tree(self.database.collection_tree::<Versioned, _>(
50
19099
                    &self.scan.collection,
51
19099
                    document_tree_name(&self.scan.collection),
52
19099
                )?)?;
53

            
54
19099
        let view_versions_tree = self.database.collection_tree::<Unversioned, _>(
55
19099
            &self.scan.collection,
56
19099
            view_versions_tree_name(&self.scan.collection),
57
19099
        )?;
58
19099
        let view_versions = self.database.roots().tree(view_versions_tree.clone())?;
59

            
60
19099
        let view_name = self.scan.view_name.clone();
61
19099
        let view_version = self.scan.view_version;
62
19099
        let roots = self.database.roots().clone();
63
19099
        let version = view_versions
64
19099
            .get(view_name.to_string().as_bytes())?
65
19099
            .and_then(|version| ViewVersion::from_bytes(&version).ok())
66
19099
            .unwrap_or_default();
67
19099

            
68
19099
        // Remove any old files that are no longer used.
69
19099
        version.cleanup(&roots, &view_name)?;
70

            
71
19099
        let task = if version.is_current(view_version) {
72
932
            None
73
        } else {
74
            // The view isn't the current version, queue up all documents.
75
18167
            let missing_entries = tree_keys::<Versioned>(&documents)?;
76
            // When a version is updated, we can make no guarantees about
77
            // existing keys. The best we can do is delete the existing files so
78
            // that the view starts fresh.
79
18167
            roots.delete_tree(view_invalidated_docs_tree_name(&self.scan.view_name))?;
80
18167
            roots.delete_tree(view_entries_tree_name(&self.scan.view_name))?;
81
18167
            roots.delete_tree(view_document_map_tree_name(&self.scan.view_name))?;
82
            // Add all missing entries to the invalidated list. The view
83
            // mapping job will update them on the next pass.
84
18167
            let invalidated_entries_tree = self.database.collection_tree::<Unversioned, _>(
85
18167
                &self.scan.collection,
86
18167
                view_invalidated_docs_tree_name(&self.scan.view_name),
87
18167
            )?;
88

            
89
18167
            let transaction = roots.transaction(&[invalidated_entries_tree, view_versions_tree])?;
90
            {
91
18167
                let mut view_versions = transaction.tree::<Unversioned>(1).unwrap();
92
18167
                view_versions.set(
93
18167
                    view_name.to_string().as_bytes().to_vec(),
94
18167
                    ViewVersion::current_for(view_version).to_vec()?,
95
                )?;
96
18167
                let mut invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
97
18167
                let mut missing_entries = missing_entries
98
18167
                    .into_iter()
99
63048
                    .map(|id| ArcBytes::from(id.to_vec()))
100
18167
                    .collect::<Vec<_>>();
101
18167
                missing_entries.sort();
102
18167
                invalidated_entries.modify(missing_entries, Operation::Set(ArcBytes::default()))?;
103
            }
104
18167
            transaction.commit()?;
105

            
106
18167
            Some(Arc::new(Mutex::new(Some(
107
18167
                self.database
108
18167
                    .storage
109
18167
                    .instance
110
18167
                    .tasks()
111
18167
                    .jobs
112
18167
                    .lookup_or_enqueue(Mapper {
113
18167
                        database: self.database.clone(),
114
18167
                        map: Map {
115
18167
                            database: self.database.data.name.clone(),
116
18167
                            collection: self.scan.collection.clone(),
117
18167
                            view_name: self.scan.view_name.clone(),
118
18167
                        },
119
18167
                    }),
120
18167
            ))))
121
        };
122

            
123
19099
        self.database
124
19099
            .storage
125
19099
            .instance
126
19099
            .tasks()
127
19099
            .mark_integrity_check_complete(
128
19099
                self.database.data.name.clone(),
129
19099
                self.scan.collection.clone(),
130
19099
                self.scan.view_name.clone(),
131
19099
            );
132
19099

            
133
19099
        Ok(task)
134
19099
    }
135
}
136

            
137
18167
#[derive(Serialize, Deserialize, Debug, Default)]
138
pub struct ViewVersion {
139
    internal_version: u8,
140
    schema_version: u64,
141
}
142

            
143
impl ViewVersion {
144
    const CURRENT_VERSION: u8 = 3;
145

            
146
    pub fn from_bytes(bytes: &[u8]) -> Result<Self, crate::Error> {
147
947
        match pot::from_slice(bytes) {
148
940
            Ok(version) => Ok(version),
149
7
            Err(err) if matches!(err, pot::Error::NotAPot) && bytes.len() == 8 => {
150
                let mut be_bytes = [0_u8; 8];
151
                be_bytes.copy_from_slice(bytes);
152
                let schema_version = u64::from_be_bytes(be_bytes);
153
                Ok(Self {
154
                    internal_version: 0,
155
                    schema_version,
156
                })
157
            }
158
7
            Err(err) => Err(crate::Error::from(err)),
159
        }
160
947
    }
161

            
162
18167
    pub fn to_vec(&self) -> Result<Vec<u8>, crate::Error> {
163
18167
        pot::to_vec(self).map_err(crate::Error::from)
164
18167
    }
165

            
166
18167
    pub fn current_for(schema_version: u64) -> Self {
167
18167
        Self {
168
18167
            internal_version: Self::CURRENT_VERSION,
169
18167
            schema_version,
170
18167
        }
171
18167
    }
172

            
173
19099
    pub fn is_current(&self, schema_version: u64) -> bool {
174
19099
        self.internal_version == Self::CURRENT_VERSION && self.schema_version == schema_version
175
19099
    }
176

            
177
19099
    pub fn cleanup(&self, roots: &Roots<AnyFile>, view: &ViewName) -> Result<(), crate::Error> {
178
19099
        if self.internal_version < 2 {
179
            // omitted entries was removed
180
18166
            roots.delete_tree(format!("view.{view:#}.omitted"))?;
181
933
        }
182
19099
        Ok(())
183
19099
    }
184
}
185

            
186
18167
fn tree_keys<R: nebari::tree::Root>(
187
18167
    tree: &Tree<R, AnyFile>,
188
18167
) -> Result<HashSet<DocumentId>, crate::Error> {
189
18167
    let mut ids = Vec::new();
190
18167
    tree.scan::<Infallible, _, _, _, _>(
191
18167
        &(..),
192
18167
        true,
193
19129
        |_, _, _| ScanEvaluation::ReadData,
194
63048
        |key, _| {
195
62891
            ids.push(key.clone());
196
62891
            ScanEvaluation::Skip
197
63048
        },
198
18167
        |_, _, _| unreachable!(),
199
18167
    )?;
200

            
201
18167
    Ok(ids
202
18167
        .into_iter()
203
63048
        .map(|key| DocumentId::try_from(key.as_slice()))
204
18167
        .collect::<Result<HashSet<_>, bonsaidb_core::Error>>()?)
205
18167
}
206

            
207
impl Keyed<Task> for IntegrityScanner {
208
20938
    fn key(&self) -> Task {
209
20938
        Task::IntegrityScan(self.scan.clone())
210
20938
    }
211
}
212

            
213
// The reason we use jobs like this is to make sure we can tweak how much is
214
// happening at any given time.
215
//
216
// On the Server level, we'll need to cooperate with all the databases in a
217
// shared pool of workers. So, we need to come up with a design for the view
218
// updaters to work within this limitation.
219
//
220
// Integrity scan is simple: Have a shared structure on Database that keeps track
221
// of all integrity scan results. It can check for an existing value and return,
222
// or make you wait until the job is finished. For views, I suppose the best
223
// that can be done is a similar approach, but the indexer's output is the last
224
// transaction id it synced. When a request comes in, a check can be done if
225
// there are any docs outdated, if so, the client can get the current transaction id
226
// and ask the ViewScanning service manager to wait until that txid is scanned.
227
//
228
// The view can then scan and return the results it finds with confidence it was updated to that time.
229
// If new requests come in while the current batch is being caught up to,