1
use std::{borrow::Cow, collections::HashSet, convert::Infallible, hash::Hash, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::schema::{view, CollectionName, Key, ViewName};
5
use nebari::{
6
    io::any::AnyFile,
7
    tree::{KeyEvaluation, Operation, Unversioned, Versioned},
8
    ArcBytes, Tree,
9
};
10

            
11
use super::{
12
    mapper::{Map, Mapper},
13
    view_document_map_tree_name, view_invalidated_docs_tree_name, view_versions_tree_name,
14
};
15
use crate::{
16
    database::{document_tree_name, Database},
17
    jobs::{Job, Keyed},
18
    tasks::Task,
19
    Error,
20
};
21

            
22
#[derive(Debug)]
23
pub struct IntegrityScanner {
24
    pub database: Database,
25
    pub scan: IntegrityScan,
26
}
27

            
28
19412
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
29
pub struct IntegrityScan {
30
    pub view_version: u64,
31
    pub database: Arc<Cow<'static, str>>,
32
    pub collection: CollectionName,
33
    pub view_name: ViewName,
34
}
35

            
36
#[async_trait]
37
impl Job for IntegrityScanner {
38
    type Output = ();
39
    type Error = Error;
40

            
41
21561
    #[cfg_attr(feature = "tracing", tracing::instrument)]
42
    #[allow(clippy::too_many_lines)]
43
7203
    async fn execute(&mut self) -> Result<Self::Output, Self::Error> {
44
7203
        let documents =
45
7203
            self.database
46
7203
                .roots()
47
7203
                .tree(self.database.collection_tree::<Versioned, _>(
48
7203
                    &self.scan.collection,
49
7203
                    document_tree_name(&self.scan.collection),
50
7203
                )?)?;
51

            
52
7203
        let view_versions_tree = self.database.collection_tree::<Unversioned, _>(
53
7203
            &self.scan.collection,
54
7203
            view_versions_tree_name(&self.scan.collection),
55
7203
        )?;
56
7203
        let view_versions = self.database.roots().tree(view_versions_tree.clone())?;
57

            
58
7203
        let document_map =
59
7203
            self.database
60
7203
                .roots()
61
7203
                .tree(self.database.collection_tree::<Unversioned, _>(
62
7203
                    &self.scan.collection,
63
7203
                    view_document_map_tree_name(&self.scan.view_name),
64
7203
                )?)?;
65

            
66
7203
        let invalidated_entries_tree = self.database.collection_tree::<Unversioned, _>(
67
7203
            &self.scan.collection,
68
7203
            view_invalidated_docs_tree_name(&self.scan.view_name),
69
7203
        )?;
70

            
71
7203
        let view_name = self.scan.view_name.clone();
72
7203
        let view_version = self.scan.view_version;
73
7203
        let roots = self.database.roots().clone();
74

            
75
7203
        let needs_update = tokio::task::spawn_blocking::<_, Result<bool, Error>>(move || {
76
7203
            let document_ids = tree_keys::<u64, Versioned>(&documents)?;
77
7203
            let view_is_current_version =
78
7203
                if let Some(version) = view_versions.get(view_name.to_string().as_bytes())? {
79
50
                    if let Ok(version) = u64::from_big_endian_bytes(version.as_slice()) {
80
50
                        version == view_version
81
                    } else {
82
                        false
83
                    }
84
                } else {
85
7153
                    false
86
                };
87

            
88
7203
            let missing_entries = if view_is_current_version {
89
49
                let stored_document_ids = tree_keys::<u64, Unversioned>(&document_map)?;
90

            
91
49
                document_ids
92
49
                    .difference(&stored_document_ids)
93
49
                    .copied()
94
49
                    .collect::<HashSet<_>>()
95
            } else {
96
                // The view isn't the current version, queue up all documents.
97
7154
                document_ids
98
            };
99

            
100
7203
            if !missing_entries.is_empty() {
101
                // Add all missing entries to the invalidated list. The view
102
                // mapping job will update them on the next pass.
103
1050
                let mut transaction =
104
1050
                    roots.transaction(&[invalidated_entries_tree, view_versions_tree])?;
105
1050
                let view_versions = transaction.tree::<Unversioned>(1).unwrap();
106
1050
                view_versions.set(
107
1050
                    view_name.to_string().as_bytes().to_vec(),
108
1050
                    view_version.as_big_endian_bytes().unwrap().to_vec(),
109
1050
                )?;
110
1050
                let invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
111
1050
                let mut missing_entries = missing_entries
112
1050
                    .into_iter()
113
32033
                    .map(|id| ArcBytes::from(id.to_be_bytes()))
114
1050
                    .collect::<Vec<_>>();
115
1050
                missing_entries.sort();
116
1050
                invalidated_entries.modify(missing_entries, Operation::Set(ArcBytes::default()))?;
117
1050
                transaction.commit()?;
118

            
119
1026
                return Ok(true);
120
6153
            }
121
6153

            
122
6153
            Ok(false)
123
7203
        })
124
6783
        .await??;
125

            
126
7179
        if needs_update {
127
1026
            self.database
128
1026
                .data
129
1026
                .storage
130
1026
                .tasks()
131
1026
                .jobs
132
1026
                .lookup_or_enqueue(Mapper {
133
1026
                    database: self.database.clone(),
134
1026
                    map: Map {
135
1026
                        database: self.database.data.name.clone(),
136
1026
                        collection: self.scan.collection.clone(),
137
1026
                        view_name: self.scan.view_name.clone(),
138
1026
                    },
139
1026
                })
140
                .await;
141
6153
        }
142

            
143
7179
        self.database
144
7179
            .data
145
7179
            .storage
146
7179
            .tasks()
147
7179
            .mark_integrity_check_complete(
148
7179
                self.database.data.name.clone(),
149
7179
                self.scan.collection.clone(),
150
7179
                self.scan.view_name.clone(),
151
7179
            )
152
            .await;
153

            
154
7203
        Ok(())
155
14406
    }
156
}
157

            
158
7252
fn tree_keys<K: for<'a> Key<'a> + Hash + Eq + Clone, R: nebari::tree::Root>(
159
7252
    tree: &Tree<R, AnyFile>,
160
7252
) -> Result<HashSet<K>, crate::Error> {
161
7252
    let mut ids = Vec::new();
162
7252
    tree.scan::<Infallible, _, _, _, _>(
163
7252
        &(..),
164
7252
        true,
165
9556
        |_, _, _| true,
166
32380
        |key, _| {
167
32325
            ids.push(key.clone());
168
32325
            KeyEvaluation::Skip
169
32380
        },
170
7252
        |_, _, _| unreachable!(),
171
7252
    )?;
172

            
173
7252
    Ok(ids
174
7252
        .into_iter()
175
32380
        .map(|key| K::from_big_endian_bytes(&key).map_err(view::Error::key_serialization))
176
7252
        .collect::<Result<HashSet<_>, view::Error>>()?)
177
7252
}
178

            
179
impl Keyed<Task> for IntegrityScanner {
180
7228
    fn key(&self) -> Task {
181
7228
        Task::IntegrityScan(self.scan.clone())
182
7228
    }
183
}
184

            
185
// The reason we use jobs like this is to make sure we can tweak how much is
186
// happening at any given time.
187
//
188
// On the Server level, we'll need to cooperate with all the databases in a
189
// shared pool of workers. So, we need to come up with a design for the view
190
// updaters to work within this limitation.
191
//
192
// Integrity scan is simple: Have a shared structure on Database that keeps track
193
// of all integrity scan results. It can check for an existing value and return,
194
// or make you wait until the job is finished. For views, I suppose the best
195
// that can be done is a similar approach, but the indexer's output is the last
196
// transaction id it synced. When a request comes in, a check can be done if
197
// there are any docs outdated, if so, the client can get the current transaction id
198
// and ask the ViewScanning service manager to wait until that txid is scanned.
199
//
200
// The view can then scan and return the results it finds with confidence it was updated to that time.
201
// If new requests come in while the current batch is being caught up to,