1
use std::{borrow::Cow, collections::HashSet, convert::Infallible, hash::Hash, sync::Arc};
2

            
3
use async_lock::Mutex;
4
use async_trait::async_trait;
5
use bonsaidb_core::{
6
    document::DocumentId,
7
    schema::{CollectionName, ViewName},
8
};
9
use nebari::{
10
    io::any::AnyFile,
11
    tree::{KeyEvaluation, Operation, Unversioned, Versioned},
12
    ArcBytes, Roots, Tree,
13
};
14
use serde::{Deserialize, Serialize};
15

            
16
use super::{
17
    mapper::{Map, Mapper},
18
    view_invalidated_docs_tree_name, view_versions_tree_name,
19
};
20
use crate::{
21
    database::{document_tree_name, Database},
22
    tasks::{handle::Handle, Job, Keyed, Task},
23
    Error,
24
};
25

            
26
#[derive(Debug)]
27
pub struct IntegrityScanner {
28
    pub database: Database,
29
    pub scan: IntegrityScan,
30
}
31

            
32
22195
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
33
pub struct IntegrityScan {
34
    pub view_version: u64,
35
    pub database: Arc<Cow<'static, str>>,
36
    pub collection: CollectionName,
37
    pub view_name: ViewName,
38
}
39

            
40
pub type OptionalViewMapHandle = Option<Arc<Mutex<Option<Handle<u64, Error>>>>>;
41

            
42
#[async_trait]
43
impl Job for IntegrityScanner {
44
    type Output = OptionalViewMapHandle;
45
    type Error = Error;
46

            
47
25485
    #[cfg_attr(feature = "tracing", tracing::instrument)]
48
    #[allow(clippy::too_many_lines)]
49
8495
    async fn execute(&mut self) -> Result<Self::Output, Self::Error> {
50
8495
        let documents =
51
8495
            self.database
52
8495
                .roots()
53
8495
                .tree(self.database.collection_tree::<Versioned, _>(
54
8495
                    &self.scan.collection,
55
8495
                    document_tree_name(&self.scan.collection),
56
8495
                )?)?;
57

            
58
8495
        let view_versions_tree = self.database.collection_tree::<Unversioned, _>(
59
8495
            &self.scan.collection,
60
8495
            view_versions_tree_name(&self.scan.collection),
61
8495
        )?;
62
8495
        let view_versions = self.database.roots().tree(view_versions_tree.clone())?;
63

            
64
8495
        let invalidated_entries_tree = self.database.collection_tree::<Unversioned, _>(
65
8495
            &self.scan.collection,
66
8495
            view_invalidated_docs_tree_name(&self.scan.view_name),
67
8495
        )?;
68

            
69
8495
        let view_name = self.scan.view_name.clone();
70
8495
        let view_version = self.scan.view_version;
71
8495
        let roots = self.database.roots().clone();
72

            
73
8495
        let needs_update = tokio::task::spawn_blocking::<_, Result<bool, Error>>(move || {
74
8495
            let version = view_versions
75
8495
                .get(view_name.to_string().as_bytes())?
76
8495
                .and_then(|version| ViewVersion::from_bytes(&version).ok())
77
8495
                .unwrap_or_default();
78
8495
            version.cleanup(&roots, &view_name)?;
79

            
80
8495
            if version.is_current(view_version) {
81
423
                Ok(false)
82
            } else {
83
                // The view isn't the current version, queue up all documents.
84
8072
                let missing_entries = tree_keys::<Versioned>(&documents)?;
85
                // Add all missing entries to the invalidated list. The view
86
                // mapping job will update them on the next pass.
87
8072
                let transaction =
88
8072
                    roots.transaction(&[invalidated_entries_tree, view_versions_tree])?;
89
                {
90
8072
                    let mut view_versions = transaction.tree::<Unversioned>(1).unwrap();
91
8072
                    view_versions.set(
92
8072
                        view_name.to_string().as_bytes().to_vec(),
93
8072
                        ViewVersion::current_for(view_version).to_vec()?,
94
                    )?;
95
8072
                    let mut invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
96
8072
                    let mut missing_entries = missing_entries
97
8072
                        .into_iter()
98
28248
                        .map(|id| ArcBytes::from(id.to_vec()))
99
8072
                        .collect::<Vec<_>>();
100
8072
                    missing_entries.sort();
101
8072
                    invalidated_entries
102
8072
                        .modify(missing_entries, Operation::Set(ArcBytes::default()))?;
103
                }
104
8072
                transaction.commit()?;
105

            
106
8072
                Ok(true)
107
            }
108
8495
        })
109
8339
        .await??;
110

            
111
8495
        let task = if needs_update {
112
            Some(Arc::new(Mutex::new(Some(
113
8072
                self.database
114
8072
                    .data
115
8072
                    .storage
116
8072
                    .tasks()
117
8072
                    .jobs
118
8072
                    .lookup_or_enqueue(Mapper {
119
8072
                        database: self.database.clone(),
120
8072
                        map: Map {
121
8072
                            database: self.database.data.name.clone(),
122
8072
                            collection: self.scan.collection.clone(),
123
8072
                            view_name: self.scan.view_name.clone(),
124
8072
                        },
125
8072
                    })
126
                    .await,
127
            ))))
128
        } else {
129
423
            None
130
        };
131

            
132
8495
        self.database
133
8495
            .data
134
8495
            .storage
135
8495
            .tasks()
136
8495
            .mark_integrity_check_complete(
137
8495
                self.database.data.name.clone(),
138
8495
                self.scan.collection.clone(),
139
8495
                self.scan.view_name.clone(),
140
8495
            )
141
            .await;
142

            
143
8495
        Ok(task)
144
16990
    }
145
}
146

            
147
8072
#[derive(Serialize, Deserialize, Debug, Default)]
148
pub struct ViewVersion {
149
    internal_version: u8,
150
    schema_version: u64,
151
}
152

            
153
impl ViewVersion {
154
    const CURRENT_VERSION: u8 = 2;
155
    pub fn from_bytes(bytes: &[u8]) -> Result<Self, crate::Error> {
156
438
        match pot::from_slice(bytes) {
157
431
            Ok(version) => Ok(version),
158
7
            Err(err) if matches!(err, pot::Error::NotAPot) && bytes.len() == 8 => {
159
                let mut be_bytes = [0_u8; 8];
160
                be_bytes.copy_from_slice(bytes);
161
                let schema_version = u64::from_be_bytes(be_bytes);
162
                Ok(Self {
163
                    internal_version: 0,
164
                    schema_version,
165
                })
166
            }
167
7
            Err(err) => Err(crate::Error::from(err)),
168
        }
169
438
    }
170

            
171
8072
    pub fn to_vec(&self) -> Result<Vec<u8>, crate::Error> {
172
8072
        pot::to_vec(self).map_err(crate::Error::from)
173
8072
    }
174

            
175
8072
    pub fn current_for(schema_version: u64) -> Self {
176
8072
        Self {
177
8072
            internal_version: Self::CURRENT_VERSION,
178
8072
            schema_version,
179
8072
        }
180
8072
    }
181

            
182
8495
    pub fn is_current(&self, schema_version: u64) -> bool {
183
8495
        self.internal_version == Self::CURRENT_VERSION && self.schema_version == schema_version
184
8495
    }
185

            
186
8495
    pub fn cleanup(&self, roots: &Roots<AnyFile>, view: &ViewName) -> Result<(), crate::Error> {
187
8495
        if self.internal_version < 2 {
188
            // omitted entries was removed
189
8071
            roots.delete_tree(format!("view.{:#}.omitted", view))?;
190
424
        }
191
8495
        Ok(())
192
8495
    }
193
}
194

            
195
8072
fn tree_keys<R: nebari::tree::Root>(
196
8072
    tree: &Tree<R, AnyFile>,
197
8072
) -> Result<HashSet<DocumentId>, crate::Error> {
198
8072
    let mut ids = Vec::new();
199
8072
    tree.scan::<Infallible, _, _, _, _>(
200
8072
        &(..),
201
8072
        true,
202
8904
        |_, _, _| true,
203
28248
        |key, _| {
204
28245
            ids.push(key.clone());
205
28245
            KeyEvaluation::Skip
206
28248
        },
207
8072
        |_, _, _| unreachable!(),
208
8072
    )?;
209

            
210
8072
    Ok(ids
211
8072
        .into_iter()
212
28248
        .map(|key| DocumentId::try_from(key.as_slice()))
213
8072
        .collect::<Result<HashSet<_>, bonsaidb_core::Error>>()?)
214
8072
}
215

            
216
impl Keyed<Task> for IntegrityScanner {
217
8524
    fn key(&self) -> Task {
218
8524
        Task::IntegrityScan(self.scan.clone())
219
8524
    }
220
}
221

            
222
// The reason we use jobs like this is to make sure we can tweak how much is
223
// happening at any given time.
224
//
225
// On the Server level, we'll need to cooperate with all the databases in a
226
// shared pool of workers. So, we need to come up with a design for the view
227
// updaters to work within this limitation.
228
//
229
// Integrity scan is simple: Have a shared structure on Database that keeps track
230
// of all integrity scan results. It can check for an existing value and return,
231
// or make you wait until the job is finished. For views, I suppose the best
232
// that can be done is a similar approach, but the indexer's output is the last
233
// transaction id it synced. When a request comes in, a check can be done if
234
// there are any docs outdated, if so, the client can get the current transaction id
235
// and ask the ViewScanning service manager to wait until that txid is scanned.
236
//
237
// The view can then scan and return the results it finds with confidence it was updated to that time.
238
// If new requests come in while the current batch is being caught up to,