1
use std::{borrow::Cow, collections::HashSet, convert::Infallible, hash::Hash, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::schema::{view, CollectionName, Key, ViewName};
5
use nebari::{
6
    io::fs::StdFile,
7
    tree::{KeyEvaluation, Operation, Unversioned, Versioned},
8
    ArcBytes, Tree,
9
};
10

            
11
use super::{
12
    mapper::{Map, Mapper},
13
    view_document_map_tree_name, view_invalidated_docs_tree_name, view_versions_tree_name,
14
};
15
use crate::{
16
    database::{document_tree_name, Database},
17
    jobs::{Job, Keyed},
18
    tasks::Task,
19
    Error,
20
};
21

            
22
#[derive(Debug)]
23
pub struct IntegrityScanner {
24
    pub database: Database,
25
    pub scan: IntegrityScan,
26
}
27

            
28
17745
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
29
pub struct IntegrityScan {
30
    pub view_version: u64,
31
    pub database: Arc<Cow<'static, str>>,
32
    pub collection: CollectionName,
33
    pub view_name: ViewName,
34
}
35

            
36
#[async_trait]
37
impl Job for IntegrityScanner {
38
    type Output = ();
39
    type Error = Error;
40

            
41
19578
    #[cfg_attr(feature = "tracing", tracing::instrument)]
42
    #[allow(clippy::too_many_lines)]
43
6548
    async fn execute(&mut self) -> Result<Self::Output, Self::Error> {
44
6548
        let documents =
45
6548
            self.database
46
6548
                .roots()
47
6548
                .tree(self.database.collection_tree::<Versioned, _>(
48
6548
                    &self.scan.collection,
49
6548
                    document_tree_name(&self.scan.collection),
50
6548
                )?)?;
51

            
52
6548
        let view_versions_tree = self.database.collection_tree::<Unversioned, _>(
53
6548
            &self.scan.collection,
54
6548
            view_versions_tree_name(&self.scan.collection),
55
6548
        )?;
56
6548
        let view_versions = self.database.roots().tree(view_versions_tree.clone())?;
57

            
58
6548
        let document_map =
59
6548
            self.database
60
6548
                .roots()
61
6548
                .tree(self.database.collection_tree::<Unversioned, _>(
62
6548
                    &self.scan.collection,
63
6548
                    view_document_map_tree_name(&self.scan.view_name),
64
6548
                )?)?;
65

            
66
6548
        let invalidated_entries_tree = self.database.collection_tree::<Unversioned, _>(
67
6548
            &self.scan.collection,
68
6548
            view_invalidated_docs_tree_name(&self.scan.view_name),
69
6548
        )?;
70

            
71
6548
        let view_name = self.scan.view_name.clone();
72
6548
        let view_version = self.scan.view_version;
73
6548
        let roots = self.database.roots().clone();
74

            
75
6548
        let needs_update = tokio::task::spawn_blocking::<_, Result<bool, Error>>(move || {
76
6548
            let document_ids = tree_keys::<u64, Versioned>(&documents)?;
77
6548
            let view_is_current_version =
78
6548
                if let Some(version) = view_versions.get(view_name.to_string().as_bytes())? {
79
46
                    if let Ok(version) = u64::from_big_endian_bytes(version.as_slice()) {
80
46
                        version == view_version
81
                    } else {
82
                        false
83
                    }
84
                } else {
85
6502
                    false
86
                };
87

            
88
6548
            let missing_entries = if view_is_current_version {
89
45
                let stored_document_ids = tree_keys::<u64, Unversioned>(&document_map)?;
90

            
91
45
                document_ids
92
45
                    .difference(&stored_document_ids)
93
45
                    .copied()
94
45
                    .collect::<HashSet<_>>()
95
            } else {
96
                // The view isn't the current version, queue up all documents.
97
6503
                document_ids
98
            };
99

            
100
6548
            if !missing_entries.is_empty() {
101
                // Add all missing entries to the invalidated list. The view
102
                // mapping job will update them on the next pass.
103
1003
                let mut transaction =
104
1003
                    roots.transaction(&[invalidated_entries_tree, view_versions_tree])?;
105
1003
                let view_versions = transaction.tree::<Unversioned>(1).unwrap();
106
1003
                view_versions.set(
107
1003
                    view_name.to_string().as_bytes().to_vec(),
108
1003
                    view_version.as_big_endian_bytes().unwrap().to_vec(),
109
1003
                )?;
110
1003
                let invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
111
1003
                let mut missing_entries = missing_entries
112
1003
                    .into_iter()
113
42662
                    .map(|id| ArcBytes::from(id.to_be_bytes()))
114
1003
                    .collect::<Vec<_>>();
115
1003
                missing_entries.sort();
116
1003
                invalidated_entries.modify(missing_entries, Operation::Set(ArcBytes::default()))?;
117
1003
                transaction.commit()?;
118

            
119
981
                return Ok(true);
120
5545
            }
121
5545

            
122
5545
            Ok(false)
123
6548
        })
124
6214
        .await??;
125

            
126
6548
        if needs_update {
127
981
            self.database
128
981
                .data
129
981
                .storage
130
981
                .tasks()
131
981
                .jobs
132
981
                .lookup_or_enqueue(Mapper {
133
981
                    database: self.database.clone(),
134
981
                    map: Map {
135
981
                        database: self.database.data.name.clone(),
136
981
                        collection: self.scan.collection.clone(),
137
981
                        view_name: self.scan.view_name.clone(),
138
981
                    },
139
981
                })
140
                .await;
141
5567
        }
142

            
143
6548
        self.database
144
6548
            .data
145
6548
            .storage
146
6548
            .tasks()
147
6548
            .mark_integrity_check_complete(
148
6548
                self.database.data.name.clone(),
149
6548
                self.scan.collection.clone(),
150
6548
                self.scan.view_name.clone(),
151
6548
            )
152
            .await;
153

            
154
6548
        Ok(())
155
13096
    }
156
}
157

            
158
6593
fn tree_keys<K: for<'a> Key<'a> + Hash + Eq + Clone, R: nebari::tree::Root>(
159
6593
    tree: &Tree<R, StdFile>,
160
6593
) -> Result<HashSet<K>, crate::Error> {
161
6593
    let mut ids = Vec::new();
162
6593
    tree.scan::<Infallible, _, _, _, _>(
163
6593
        &(..),
164
6593
        true,
165
11103
        |_, _, _| true,
166
42959
        |key, _| {
167
42930
            ids.push(key.clone());
168
42930
            KeyEvaluation::Skip
169
42959
        },
170
6593
        |_, _, _| unreachable!(),
171
6593
    )?;
172

            
173
6593
    Ok(ids
174
6593
        .into_iter()
175
42959
        .map(|key| K::from_big_endian_bytes(&key).map_err(view::Error::key_serialization))
176
6593
        .collect::<Result<HashSet<_>, view::Error>>()?)
177
6593
}
178

            
179
impl Keyed<Task> for IntegrityScanner {
180
6571
    fn key(&self) -> Task {
181
6571
        Task::IntegrityScan(self.scan.clone())
182
6571
    }
183
}
184

            
185
// The reason we use jobs like this is to make sure we can tweak how much is
186
// happening at any given time.
187
//
188
// On the Server level, we'll need to cooperate with all the databases in a
189
// shared pool of workers. So, we need to come up with a design for the view
190
// updaters to work within this limitation.
191
//
192
// Integrity scan is simple: Have a shared structure on Database that keeps track
193
// of all integrity scan results. It can check for an existing value and return,
194
// or make you wait until the job is finished. For views, I suppose the best
195
// that can be done is a similar approach, but the indexer's output is the last
196
// transaction id it synced. When a request comes in, a check can be done if
197
// there are any docs outdated, if so, the client can get the current transaction id
198
// and ask the ViewScanning service manager to wait until that txid is scanned.
199
//
200
// The view can then scan and return the results it finds with confidence it was updated to that time.
201
// If new requests come in while the current batch is being caught up to,