1
use std::collections::BTreeMap;
2
use std::marker::PhantomData;
3
use std::mem::size_of;
4

            
5
#[cfg(feature = "async")]
6
use bonsaidb_core::connection::AsyncConnection;
7
use bonsaidb_core::connection::Connection;
8
use bonsaidb_core::document::{BorrowedDocument, Emit};
9
use bonsaidb_core::key::time::TimestampAsNanoseconds;
10
use bonsaidb_core::key::KeyEncoding;
11
use bonsaidb_core::schema::{
12
    Collection, CollectionName, MapReduce, View, ViewMapResult, ViewMappedValue, ViewSchema,
13
};
14
use bonsaidb_core::transaction::{Operation, Transaction};
15
use derive_where::derive_where;
16
use serde::{Deserialize, Serialize};
17

            
18
use crate::direct::BlockInfo;
19
use crate::schema::file::File;
20
use crate::FileConfig;
21

            
22
#[derive_where(Debug, Default)]
23
pub struct Block<Config>(PhantomData<Config>)
24
where
25
    Config: FileConfig;
26

            
27
impl<Config> Block<Config>
28
where
29
    Config: FileConfig,
30
{
31
16
    pub fn append<Database: Connection>(
32
16
        data: &[u8],
33
16
        file_id: u32,
34
16
        database: &Database,
35
16
    ) -> Result<(), bonsaidb_core::Error> {
36
16
        if !data.is_empty() {
37
13
            let mut tx = Transaction::new();
38
13
            let now = TimestampAsNanoseconds::now();
39
13
            // Verify the file exists as part of appending. If the file was
40
13
            // deleted out from underneath the appender, this will ensure no
41
13
            // blocks are orphaned.
42
13
            tx.push(Operation::check_document_exists::<File<Config>>(&file_id)?);
43

            
44
13
            let block_collection = Self::collection_name();
45
63
            for chunk in data.chunks(Config::BLOCK_SIZE) {
46
63
                let mut block =
47
63
                    Vec::with_capacity(chunk.len() + size_of::<u32>() + size_of::<i64>());
48
63
                block.extend(chunk);
49
63
                block.extend(file_id.to_be_bytes());
50
63
                block.extend(now.representation().to_be_bytes());
51
63
                tx.push(Operation::insert(block_collection.clone(), None, block));
52
63
            }
53

            
54
13
            tx.apply(database)?;
55
3
        }
56
16
        Ok(())
57
16
    }
58

            
59
    #[cfg(feature = "async")]
60
12
    pub async fn append_async<Database: AsyncConnection>(
61
12
        data: &[u8],
62
12
        file_id: u32,
63
12
        database: &Database,
64
12
    ) -> Result<(), bonsaidb_core::Error> {
65
12
        if !data.is_empty() {
66
11
            let mut tx = Transaction::new();
67
11
            let now = TimestampAsNanoseconds::now();
68
11
            // Verify the file exists as part of appending. If the file was
69
11
            // deleted out from underneath the appender, this will ensure no
70
11
            // blocks are orphaned.
71
11
            tx.push(Operation::check_document_exists::<File<Config>>(&file_id)?);
72

            
73
11
            let block_collection = Self::collection_name();
74
61
            for chunk in data.chunks(Config::BLOCK_SIZE) {
75
61
                let mut block =
76
61
                    Vec::with_capacity(chunk.len() + size_of::<u32>() + size_of::<i64>());
77
61
                block.extend(chunk);
78
61
                block.extend(file_id.to_be_bytes());
79
61
                block.extend(now.representation().to_be_bytes());
80
61
                tx.push(Operation::insert(block_collection.clone(), None, block));
81
61
            }
82

            
83
11
            tx.apply_async(database).await?;
84
1
        }
85
12
        Ok(())
86
12
    }
87

            
88
    pub fn load<
89
        'a,
90
        DocumentIds: IntoIterator<Item = &'a PrimaryKey, IntoIter = I> + Send + Sync,
91
        I: Iterator<Item = &'a PrimaryKey> + Send + Sync,
92
        PrimaryKey: KeyEncoding<u64> + 'a,
93
        Database: Connection,
94
    >(
95
        block_ids: DocumentIds,
96
        database: &Database,
97
    ) -> Result<BTreeMap<u64, Vec<u8>>, bonsaidb_core::Error> {
98
19
        database
99
19
            .collection::<Self>()
100
19
            .get_multiple(block_ids)?
101
19
            .into_iter()
102
45
            .map(|block| {
103
45
                let mut contents = block.contents.into_vec();
104
45
                contents.truncate(contents.len() - size_of::<u32>() - size_of::<i64>());
105
45
                block.header.id.deserialize().map(|id| (id, contents))
106
45
            })
107
19
            .collect()
108
19
    }
109

            
110
    #[cfg(feature = "async")]
111
18
    pub async fn load_async<
112
18
        'a,
113
18
        DocumentIds: IntoIterator<Item = &'a PrimaryKey, IntoIter = I> + Send + Sync,
114
18
        I: Iterator<Item = &'a PrimaryKey> + Send + Sync,
115
18
        PrimaryKey: KeyEncoding<u64> + 'a,
116
18
        Database: AsyncConnection,
117
18
    >(
118
18
        block_ids: DocumentIds,
119
18
        database: &Database,
120
18
    ) -> Result<BTreeMap<u64, Vec<u8>>, bonsaidb_core::Error> {
121
18
        database
122
18
            .collection::<Self>()
123
18
            .get_multiple(block_ids)
124
18
            .await?
125
18
            .into_iter()
126
44
            .map(|block| {
127
44
                let mut contents = block.contents.into_vec();
128
44
                contents.truncate(contents.len() - size_of::<u32>() - size_of::<i64>());
129
44
                block.header.id.deserialize().map(|id| (id, contents))
130
44
            })
131
18
            .collect()
132
18
    }
133

            
134
18
    pub(crate) fn for_file<Database: Connection>(
135
18
        file_id: u32,
136
18
        database: &Database,
137
18
    ) -> Result<Vec<BlockInfo>, bonsaidb_core::Error> {
138
18
        let mut blocks = database
139
18
            .view::<ByFile<Config>>()
140
18
            .with_key(&file_id)
141
18
            .query()?
142
18
            .into_iter()
143
101
            .map(|mapping| BlockInfo {
144
101
                header: mapping.source,
145
101
                length: usize::try_from(mapping.value.length).unwrap(),
146
101
                timestamp: mapping.value.timestamp.unwrap(),
147
101
                offset: 0,
148
101
            })
149
18
            .collect::<Vec<_>>();
150
411
        blocks.sort_by(|a, b| a.header.id.cmp(&b.header.id));
151
18
        let mut offset = 0;
152
119
        for block in &mut blocks {
153
101
            block.offset = offset;
154
101
            offset += u64::try_from(block.length).unwrap();
155
101
        }
156
18
        Ok(blocks)
157
18
    }
158

            
159
6
    pub(crate) fn summary_for_file<Database: Connection>(
160
6
        file_id: u32,
161
6
        database: &Database,
162
6
    ) -> Result<BlockAppendInfo, bonsaidb_core::Error> {
163
6
        database
164
6
            .view::<ByFile<Config>>()
165
6
            .with_key(&file_id)
166
6
            .reduce()
167
6
    }
168

            
169
    #[cfg(feature = "async")]
170
6
    pub(crate) async fn summary_for_file_async<Database: AsyncConnection>(
171
6
        file_id: u32,
172
6
        database: &Database,
173
6
    ) -> Result<BlockAppendInfo, bonsaidb_core::Error> {
174
6
        database
175
6
            .view::<ByFile<Config>>()
176
6
            .with_key(&file_id)
177
6
            .reduce()
178
6
            .await
179
6
    }
180

            
181
5
    pub(crate) fn summary_for_ids<'a, Database: Connection, Iter: IntoIterator<Item = &'a u32>>(
182
5
        file_ids: Iter,
183
5
        database: &'a Database,
184
5
    ) -> Result<BlockAppendInfo, bonsaidb_core::Error> {
185
5
        database
186
5
            .view::<ByFile<Config>>()
187
5
            .with_keys(file_ids)
188
5
            .reduce()
189
5
    }
190

            
191
    #[cfg(feature = "async")]
192
5
    pub(crate) async fn summary_for_ids_async<
193
5
        'a,
194
5
        Database: AsyncConnection,
195
5
        Iter: IntoIterator<Item = &'a u32>,
196
5
    >(
197
5
        file_ids: Iter,
198
5
        database: &'a Database,
199
5
    ) -> Result<BlockAppendInfo, bonsaidb_core::Error> {
200
5
        database
201
5
            .view::<ByFile<Config>>()
202
5
            .with_keys(file_ids)
203
5
            .reduce()
204
5
            .await
205
5
    }
206

            
207
    #[cfg(feature = "async")]
208
17
    pub(crate) async fn for_file_async<Database: AsyncConnection>(
209
17
        file_id: u32,
210
17
        database: &Database,
211
17
    ) -> Result<Vec<BlockInfo>, bonsaidb_core::Error> {
212
17
        let mut blocks = database
213
17
            .view::<ByFile<Config>>()
214
17
            .with_key(&file_id)
215
17
            .query()
216
17
            .await?
217
17
            .into_iter()
218
100
            .map(|mapping| BlockInfo {
219
100
                header: mapping.source,
220
100
                length: usize::try_from(mapping.value.length).unwrap(),
221
100
                timestamp: mapping.value.timestamp.unwrap(),
222
100
                offset: 0,
223
100
            })
224
17
            .collect::<Vec<_>>();
225
392
        blocks.sort_by(|a, b| a.header.id.cmp(&b.header.id));
226
17
        let mut offset = 0;
227
117
        for block in &mut blocks {
228
100
            block.offset = offset;
229
100
            offset += u64::try_from(block.length).unwrap();
230
100
        }
231
17
        Ok(blocks)
232
17
    }
233

            
234
    pub fn delete_for_file<Database: Connection>(
235
        file_id: u32,
236
        database: &Database,
237
    ) -> Result<(), bonsaidb_core::Error> {
238
3
        database
239
3
            .view::<ByFile<Config>>()
240
3
            .with_key(&file_id)
241
3
            .delete_docs()?;
242
3
        Ok(())
243
3
    }
244

            
245
    #[cfg(feature = "async")]
246
3
    pub async fn delete_for_file_async<Database: AsyncConnection>(
247
3
        file_id: u32,
248
3
        database: &Database,
249
3
    ) -> Result<(), bonsaidb_core::Error> {
250
3
        database
251
3
            .view::<ByFile<Config>>()
252
3
            .with_key(&file_id)
253
3
            .delete_docs()
254
3
            .await?;
255
3
        Ok(())
256
3
    }
257
}
258

            
259
impl<Config> Collection for Block<Config>
260
where
261
    Config: FileConfig,
262
{
263
    type PrimaryKey = u64;
264

            
265
1058
    fn collection_name() -> CollectionName {
266
1058
        Config::blocks_name()
267
1058
    }
268

            
269
    fn define_views(
270
        schema: &mut bonsaidb_core::schema::Schematic,
271
    ) -> Result<(), bonsaidb_core::Error> {
272
40
        schema.define_view(ByFile::<Config>::default())?;
273

            
274
40
        Ok(())
275
40
    }
276
}
277

            
278
80
#[derive_where(Clone, Debug, Default)]
279
666
#[derive(View, ViewSchema)]
280
#[view(name = "by-file", collection = Block<Config>, key = u32, value = BlockAppendInfo)]
281
#[view(core = bonsaidb_core)]
282
#[view_schema(version = 2)]
283
#[view_schema(core = bonsaidb_core)]
284
struct ByFile<Config>(PhantomData<Config>)
285
where
286
    Config: FileConfig;
287

            
288
impl<Config> MapReduce for ByFile<Config>
289
where
290
    Config: FileConfig,
291
{
292
119
    fn map<'doc>(&self, doc: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
293
119
        let timestamp_offset = doc.contents.len() - size_of::<i64>();
294
119
        let file_id_offset = timestamp_offset - size_of::<u32>();
295
119

            
296
119
        let mut file_id = [0; size_of::<u32>()];
297
119
        file_id.copy_from_slice(&doc.contents[file_id_offset..timestamp_offset]);
298
119
        let file_id = u32::from_be_bytes(file_id);
299
119

            
300
119
        let mut timestamp = [0; size_of::<i64>()];
301
119
        timestamp.copy_from_slice(&doc.contents[timestamp_offset..]);
302
119
        let timestamp = TimestampAsNanoseconds::from_representation(i64::from_be_bytes(timestamp));
303
119

            
304
119
        let length = u64::try_from(file_id_offset).unwrap();
305
119

            
306
119
        doc.header.emit_key_and_value(
307
119
            file_id,
308
119
            BlockAppendInfo {
309
119
                length,
310
119
                timestamp: Some(timestamp),
311
119
            },
312
119
        )
313
119
    }
314

            
315
33
    fn reduce(
316
33
        &self,
317
33
        mappings: &[ViewMappedValue<'_, Self>],
318
33
        _rereduce: bool,
319
33
    ) -> Result<<Self::View as View>::Value, bonsaidb_core::Error> {
320
33
        Ok(BlockAppendInfo {
321
167
            length: mappings.iter().map(|info| info.value.length).sum(),
322
33
            timestamp: mappings
323
33
                .iter()
324
167
                .filter_map(|info| info.value.timestamp)
325
33
                .max(),
326
33
        })
327
33
    }
328
}
329

            
330
1950
#[derive(Clone, Debug, Serialize, Deserialize)]
331
pub struct BlockAppendInfo {
332
    pub length: u64,
333
    pub timestamp: Option<TimestampAsNanoseconds>,
334
}