1
use std::{borrow::Cow, collections::HashSet, convert::Infallible, hash::Hash, sync::Arc};
2

            
3
use bonsaidb_core::{
4
    document::DocumentId,
5
    schema::{CollectionName, ViewName},
6
};
7
use nebari::{
8
    io::any::AnyFile,
9
    tree::{Operation, ScanEvaluation, Unversioned, Versioned},
10
    ArcBytes, Roots, Tree,
11
};
12
use parking_lot::Mutex;
13
use serde::{Deserialize, Serialize};
14

            
15
use super::{
16
    mapper::{Map, Mapper},
17
    view_invalidated_docs_tree_name, view_versions_tree_name,
18
};
19
use crate::{
20
    database::{document_tree_name, Database},
21
    tasks::{handle::Handle, Job, Keyed, Task},
22
    Error,
23
};
24

            
25
#[derive(Debug)]
26
pub struct IntegrityScanner {
27
    pub database: Database,
28
    pub scan: IntegrityScan,
29
}
30

            
31
24879
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
32
pub struct IntegrityScan {
33
    pub view_version: u64,
34
    pub database: Arc<Cow<'static, str>>,
35
    pub collection: CollectionName,
36
    pub view_name: ViewName,
37
}
38

            
39
pub type OptionalViewMapHandle = Option<Arc<Mutex<Option<Handle<u64, Error>>>>>;
40

            
41
impl Job for IntegrityScanner {
42
    type Output = OptionalViewMapHandle;
43
    type Error = Error;
44

            
45
21358
    #[cfg_attr(feature = "tracing", tracing::instrument)]
46
10679
    #[allow(clippy::too_many_lines)]
47
    fn execute(&mut self) -> Result<Self::Output, Self::Error> {
48
        let documents =
49
            self.database
50
                .roots()
51
                .tree(self.database.collection_tree::<Versioned, _>(
52
                    &self.scan.collection,
53
                    document_tree_name(&self.scan.collection),
54
                )?)?;
55

            
56
        let view_versions_tree = self.database.collection_tree::<Unversioned, _>(
57
            &self.scan.collection,
58
            view_versions_tree_name(&self.scan.collection),
59
        )?;
60
        let view_versions = self.database.roots().tree(view_versions_tree.clone())?;
61

            
62
        let invalidated_entries_tree = self.database.collection_tree::<Unversioned, _>(
63
            &self.scan.collection,
64
            view_invalidated_docs_tree_name(&self.scan.view_name),
65
        )?;
66

            
67
        let view_name = self.scan.view_name.clone();
68
        let view_version = self.scan.view_version;
69
        let roots = self.database.roots().clone();
70
        let version = view_versions
71
            .get(view_name.to_string().as_bytes())?
72
592
            .and_then(|version| ViewVersion::from_bytes(&version).ok())
73
            .unwrap_or_default();
74

            
75
        // Remove any old files that are no longer used.
76
        version.cleanup(&roots, &view_name)?;
77

            
78
        let task = if version.is_current(view_version) {
79
            None
80
        } else {
81
            // The view isn't the current version, queue up all documents.
82
            let missing_entries = tree_keys::<Versioned>(&documents)?;
83
            // Add all missing entries to the invalidated list. The view
84
            // mapping job will update them on the next pass.
85
            let transaction = roots.transaction(&[invalidated_entries_tree, view_versions_tree])?;
86
            {
87
                let mut view_versions = transaction.tree::<Unversioned>(1).unwrap();
88
                view_versions.set(
89
                    view_name.to_string().as_bytes().to_vec(),
90
                    ViewVersion::current_for(view_version).to_vec()?,
91
                )?;
92
                let mut invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
93
                let mut missing_entries = missing_entries
94
                    .into_iter()
95
42353
                    .map(|id| ArcBytes::from(id.to_vec()))
96
                    .collect::<Vec<_>>();
97
                missing_entries.sort();
98
                invalidated_entries.modify(missing_entries, Operation::Set(ArcBytes::default()))?;
99
            }
100
            transaction.commit()?;
101

            
102
            Some(Arc::new(Mutex::new(Some(
103
                self.database
104
                    .storage
105
                    .instance
106
                    .tasks()
107
                    .jobs
108
                    .lookup_or_enqueue(Mapper {
109
                        database: self.database.clone(),
110
                        map: Map {
111
                            database: self.database.data.name.clone(),
112
                            collection: self.scan.collection.clone(),
113
                            view_name: self.scan.view_name.clone(),
114
                        },
115
                    }),
116
            ))))
117
        };
118

            
119
        self.database
120
            .storage
121
            .instance
122
            .tasks()
123
            .mark_integrity_check_complete(
124
                self.database.data.name.clone(),
125
                self.scan.collection.clone(),
126
                self.scan.view_name.clone(),
127
            );
128

            
129
        Ok(task)
130
    }
131
}
132

            
133
10102
#[derive(Serialize, Deserialize, Debug, Default)]
134
pub struct ViewVersion {
135
    internal_version: u8,
136
    schema_version: u64,
137
}
138

            
139
impl ViewVersion {
140
    const CURRENT_VERSION: u8 = 3;
141
    pub fn from_bytes(bytes: &[u8]) -> Result<Self, crate::Error> {
142
592
        match pot::from_slice(bytes) {
143
585
            Ok(version) => Ok(version),
144
7
            Err(err) if matches!(err, pot::Error::NotAPot) && bytes.len() == 8 => {
145
                let mut be_bytes = [0_u8; 8];
146
                be_bytes.copy_from_slice(bytes);
147
                let schema_version = u64::from_be_bytes(be_bytes);
148
                Ok(Self {
149
                    internal_version: 0,
150
                    schema_version,
151
                })
152
            }
153
7
            Err(err) => Err(crate::Error::from(err)),
154
        }
155
592
    }
156

            
157
10102
    pub fn to_vec(&self) -> Result<Vec<u8>, crate::Error> {
158
10102
        pot::to_vec(self).map_err(crate::Error::from)
159
10102
    }
160

            
161
10102
    pub fn current_for(schema_version: u64) -> Self {
162
10102
        Self {
163
10102
            internal_version: Self::CURRENT_VERSION,
164
10102
            schema_version,
165
10102
        }
166
10102
    }
167

            
168
10679
    pub fn is_current(&self, schema_version: u64) -> bool {
169
10679
        self.internal_version == Self::CURRENT_VERSION && self.schema_version == schema_version
170
10679
    }
171

            
172
10679
    pub fn cleanup(&self, roots: &Roots<AnyFile>, view: &ViewName) -> Result<(), crate::Error> {
173
10679
        if self.internal_version < 2 {
174
            // omitted entries was removed
175
10101
            roots.delete_tree(format!("view.{:#}.omitted", view))?;
176
578
        }
177
10679
        Ok(())
178
10679
    }
179
}
180

            
181
10102
fn tree_keys<R: nebari::tree::Root>(
182
10102
    tree: &Tree<R, AnyFile>,
183
10102
) -> Result<HashSet<DocumentId>, crate::Error> {
184
10102
    let mut ids = Vec::new();
185
10102
    tree.scan::<Infallible, _, _, _, _>(
186
10102
        &(..),
187
10102
        true,
188
13402
        |_, _, _| ScanEvaluation::ReadData,
189
42412
        |key, _| {
190
42353
            ids.push(key.clone());
191
42353
            ScanEvaluation::Skip
192
42412
        },
193
10102
        |_, _, _| unreachable!(),
194
10102
    )?;
195

            
196
10102
    Ok(ids
197
10102
        .into_iter()
198
42412
        .map(|key| DocumentId::try_from(key.as_slice()))
199
10102
        .collect::<Result<HashSet<_>, bonsaidb_core::Error>>()?)
200
10072
}
201

            
202
impl Keyed<Task> for IntegrityScanner {
203
10711
    fn key(&self) -> Task {
204
10711
        Task::IntegrityScan(self.scan.clone())
205
10711
    }
206
}
207

            
208
// The reason we use jobs like this is to make sure we can tweak how much is
209
// happening at any given time.
210
//
211
// On the Server level, we'll need to cooperate with all the databases in a
212
// shared pool of workers. So, we need to come up with a design for the view
213
// updaters to work within this limitation.
214
//
215
// Integrity scan is simple: Have a shared structure on Database that keeps track
216
// of all integrity scan results. It can check for an existing value and return,
217
// or make you wait until the job is finished. For views, I suppose the best
218
// that can be done is a similar approach, but the indexer's output is the last
219
// transaction id it synced. When a request comes in, a check can be done if
220
// there are any docs outdated, if so, the client can get the current transaction id
221
// and ask the ViewScanning service manager to wait until that txid is scanned.
222
//
223
// The view can then scan and return the results it finds with confidence it was updated to that time.
224
// If new requests come in while the current batch is being caught up to,