1
use std::borrow::{Borrow, Cow};
2
use std::marker::PhantomData;
3

            
4
#[cfg(feature = "async")]
5
use bonsaidb_core::connection::AsyncConnection;
6
use bonsaidb_core::connection::{Bound, BoundRef, Connection, RangeRef};
7
use bonsaidb_core::document::{CollectionDocument, Emit, Header};
8
use bonsaidb_core::key::time::TimestampAsNanoseconds;
9
use bonsaidb_core::key::{
10
    ByteSource, CompositeKeyDecoder, CompositeKeyEncoder, CompositeKeyError, CompositeKind,
11
    IntoPrefixRange, Key, KeyEncoding, KeyKind,
12
};
13
use bonsaidb_core::schema::view::map::ViewMappings;
14
use bonsaidb_core::schema::{
15
    Collection, CollectionMapReduce, CollectionName, DefaultSerialization, SerializedCollection,
16
    View, ViewMapResult,
17
};
18
use bonsaidb_core::transaction::{Operation, Transaction};
19
use bonsaidb_macros::ViewSchema;
20
use bonsaidb_utils::next_string_sequence;
21
use derive_where::derive_where;
22
use serde::{Deserialize, Serialize};
23

            
24
use crate::direct::BlockInfo;
25
use crate::schema::block::Block;
26
use crate::{BonsaiFiles, Error, FileConfig, Statistics, Truncate};
27

            
28
16
#[derive_where(Debug, Clone)]
29
306
#[derive(Serialize, Deserialize)]
30
pub struct File<Config = BonsaiFiles>
31
where
32
    Config: FileConfig,
33
{
34
    pub path: Option<String>,
35
    pub name: String,
36
    pub created_at: TimestampAsNanoseconds,
37
    pub metadata: Config::Metadata,
38

            
39
    #[serde(skip)]
40
    #[derive_where(skip)]
41
    _name: PhantomData<Config>,
42
}
43

            
44
impl<Config> File<Config>
45
where
46
    Config: FileConfig,
47
{
48
13
    pub fn create_file<Database: Connection>(
49
13
        mut path: Option<String>,
50
13
        name: String,
51
13
        contents: &[u8],
52
13
        metadata: Config::Metadata,
53
13
        database: &Database,
54
13
    ) -> Result<CollectionDocument<Self>, Error> {
55
13
        if name.contains('/') || name.is_empty() {
56
1
            return Err(Error::InvalidName);
57
12
        }
58

            
59
        // Force path to end with a /
60
12
        if let Some(path) = path.as_mut() {
61
6
            if path.bytes().last() != Some(b'/') {
62
5
                path.push('/');
63
5
            }
64
6
        }
65

            
66
12
        let now = TimestampAsNanoseconds::now();
67
12
        let file = File {
68
12
            path,
69
12
            name,
70
12
            created_at: now,
71
12
            metadata,
72
12
            _name: PhantomData,
73
12
        }
74
12
        .push_into(database)?;
75
11
        Block::<Config>::append(contents, file.header.id, database)?;
76
11
        Ok(file)
77
13
    }
78

            
79
    #[cfg(feature = "async")]
80
11
    pub async fn create_file_async<Database: AsyncConnection>(
81
11
        mut path: Option<String>,
82
11
        name: String,
83
11
        contents: &[u8],
84
11
        metadata: Config::Metadata,
85
11
        database: &Database,
86
11
    ) -> Result<CollectionDocument<Self>, Error> {
87
11
        if name.contains('/') || name.is_empty() {
88
1
            return Err(Error::InvalidName);
89
10
        }
90

            
91
        // Force path to end with a /
92
10
        if let Some(path) = path.as_mut() {
93
6
            if path.bytes().last() != Some(b'/') {
94
5
                path.push('/');
95
5
            }
96
4
        }
97

            
98
10
        let now = TimestampAsNanoseconds::now();
99
10
        let file = File {
100
10
            path,
101
10
            name,
102
10
            created_at: now,
103
10
            metadata,
104
10
            _name: PhantomData,
105
10
        }
106
10
        .push_into_async(database)
107
10
        .await?;
108
9
        Block::<Config>::append_async(contents, file.header.id, database).await?;
109
9
        Ok(file)
110
11
    }
111

            
112
13
    pub fn find<Database: Connection>(
113
13
        mut path: &str,
114
13
        database: &Database,
115
13
    ) -> Result<Option<CollectionDocument<Self>>, Error> {
116
13
        if path.is_empty() {
117
            return Err(Error::InvalidPath);
118
13
        }
119
13

            
120
13
        // If the search is for a directory, the name is of the last component.
121
13
        // Remove the trailing slash if it's present
122
13
        if path.as_bytes()[path.len() - 1] == b'/' {
123
            path = &path[..path.len() - 1];
124
13
        }
125

            
126
13
        let key = if let Some(separator_index) = path.rfind('/') {
127
13
            let (path, name) = path.split_at(separator_index + 1);
128
13
            FileKey::Full {
129
13
                path: Cow::Borrowed(if path.is_empty() { "/" } else { path }),
130
13
                name: Cow::Borrowed(name),
131
            }
132
        } else {
133
            FileKey::Full {
134
                path: Cow::Borrowed("/"),
135
                name: Cow::Borrowed(path),
136
            }
137
        };
138
        Ok(
139
            convert_mappings_to_documents(
140
13
                database.view::<ByPath<Config>>().with_key(&key).query()?,
141
            )
142
13
            .into_iter()
143
13
            .next(),
144
        )
145
13
    }
146

            
147
    #[cfg(feature = "async")]
148
10
    pub async fn find_async<Database: AsyncConnection>(
149
10
        mut path: &str,
150
10
        database: &Database,
151
10
    ) -> Result<Option<CollectionDocument<Self>>, Error> {
152
10
        if path.is_empty() {
153
            return Err(Error::InvalidPath);
154
10
        }
155
10

            
156
10
        // If the search is for a directory, the name is of the last component.
157
10
        // Remove the trailing slash if it's present
158
10
        if path.as_bytes()[path.len() - 1] == b'/' {
159
            path = &path[..path.len() - 1];
160
10
        }
161

            
162
10
        let key = if let Some(separator_index) = path.rfind('/') {
163
10
            let (path, name) = path.split_at(separator_index + 1);
164
10
            FileKey::Full {
165
10
                path: Cow::Borrowed(if path.is_empty() { "/" } else { path }),
166
10
                name: Cow::Borrowed(name),
167
            }
168
        } else {
169
            FileKey::Full {
170
                path: Cow::Borrowed("/"),
171
                name: Cow::Borrowed(path),
172
            }
173
        };
174
        Ok(convert_mappings_to_documents(
175
10
            database
176
10
                .view::<ByPath<Config>>()
177
10
                .with_key(&key)
178
10
                .query()
179
10
                .await?,
180
        )
181
10
        .into_iter()
182
10
        .next())
183
10
    }
184

            
185
4
    pub fn list_path_contents<Database: Connection>(
186
4
        path: &str,
187
4
        database: &Database,
188
4
    ) -> Result<Vec<CollectionDocument<Self>>, bonsaidb_core::Error> {
189
4
        Ok(convert_mappings_to_documents(
190
4
            database
191
4
                .view::<ByPath<Config>>()
192
4
                .with_key_prefix(&FileKey::ExactPath {
193
4
                    start: Box::new(FileKey::ExactPathPart { path, start: true }),
194
4
                    end: Box::new(FileKey::ExactPathPart { path, start: false }),
195
4
                })
196
4
                .query()?,
197
        ))
198
4
    }
199

            
200
    #[cfg(feature = "async")]
201
4
    pub async fn list_path_contents_async<Database: AsyncConnection>(
202
4
        path: &str,
203
4
        database: &Database,
204
4
    ) -> Result<Vec<CollectionDocument<Self>>, bonsaidb_core::Error> {
205
        Ok(convert_mappings_to_documents(
206
4
            database
207
4
                .view::<ByPath<Config>>()
208
4
                .with_key_prefix(&FileKey::ExactPath {
209
4
                    start: Box::new(FileKey::ExactPathPart { path, start: true }),
210
4
                    end: Box::new(FileKey::ExactPathPart { path, start: false }),
211
4
                })
212
4
                .query()
213
4
                .await?,
214
        ))
215
4
    }
216

            
217
4
    pub fn list_recursive_path_contents<Database: Connection>(
218
4
        path: &str,
219
4
        database: &Database,
220
4
    ) -> Result<Vec<CollectionDocument<Self>>, bonsaidb_core::Error> {
221
4
        Ok(convert_mappings_to_documents(
222
4
            database
223
4
                .view::<ByPath<Config>>()
224
4
                .with_key_prefix(&FileKey::RecursivePath {
225
4
                    start: Box::new(FileKey::RecursivePathPart { path, start: true }),
226
4
                    end: Box::new(FileKey::RecursivePathPart { path, start: false }),
227
4
                })
228
4
                .query()?,
229
        ))
230
4
    }
231

            
232
    #[cfg(feature = "async")]
233
4
    pub async fn list_recursive_path_contents_async<Database: AsyncConnection>(
234
4
        path: &str,
235
4
        database: &Database,
236
4
    ) -> Result<Vec<CollectionDocument<Self>>, bonsaidb_core::Error> {
237
        Ok(convert_mappings_to_documents(
238
4
            database
239
4
                .view::<ByPath<Config>>()
240
4
                .with_key_prefix(&FileKey::RecursivePath {
241
4
                    start: Box::new(FileKey::RecursivePathPart { path, start: true }),
242
4
                    end: Box::new(FileKey::RecursivePathPart { path, start: false }),
243
4
                })
244
4
                .query()
245
4
                .await?,
246
        ))
247
4
    }
248

            
249
5
    pub fn summarize_recursive_path_contents<Database: Connection>(
250
5
        path: &str,
251
5
        database: &Database,
252
5
    ) -> Result<Statistics, bonsaidb_core::Error> {
253
5
        let ids = database
254
5
            .view::<ByPath<Config>>()
255
5
            .with_key_prefix(&FileKey::RecursivePath {
256
5
                start: Box::new(FileKey::RecursivePathPart { path, start: true }),
257
5
                end: Box::new(FileKey::RecursivePathPart { path, start: false }),
258
5
            })
259
5
            .query()?
260
5
            .iter()
261
6
            .map(|mapping| mapping.source.id)
262
5
            .collect::<Vec<u32>>();
263
5
        let append_info = Block::<Config>::summary_for_ids(&ids, database)?;
264
5
        Ok(Statistics {
265
5
            total_bytes: append_info.length,
266
5
            last_appended_at: append_info.timestamp,
267
5
            file_count: ids.len(),
268
5
        })
269
5
    }
270

            
271
    #[cfg(feature = "async")]
272
5
    pub async fn summarize_recursive_path_contents_async<Database: AsyncConnection>(
273
5
        path: &str,
274
5
        database: &Database,
275
5
    ) -> Result<Statistics, bonsaidb_core::Error> {
276
5
        let ids = database
277
5
            .view::<ByPath<Config>>()
278
5
            .with_key_prefix(&FileKey::RecursivePath {
279
5
                start: Box::new(FileKey::RecursivePathPart { path, start: true }),
280
5
                end: Box::new(FileKey::RecursivePathPart { path, start: false }),
281
5
            })
282
5
            .query()
283
5
            .await?
284
5
            .iter()
285
6
            .map(|mapping| mapping.source.id)
286
5
            .collect::<Vec<u32>>();
287
5
        let append_info = Block::<Config>::summary_for_ids_async(&ids, database).await?;
288
5
        Ok(Statistics {
289
5
            total_bytes: append_info.length,
290
5
            last_appended_at: append_info.timestamp,
291
5
            file_count: ids.len(),
292
5
        })
293
5
    }
294

            
295
3
    pub fn truncate<Database: Connection>(
296
3
        file: &CollectionDocument<Self>,
297
3
        new_length: u64,
298
3
        from: Truncate,
299
3
        database: &Database,
300
3
    ) -> Result<(), bonsaidb_core::Error> {
301
3
        let tx = Self::create_truncate_transaction(
302
3
            Block::<Config>::for_file(file.header.id, database)?,
303
3
            new_length,
304
3
            from,
305
3
        );
306
3

            
307
3
        tx.apply(database)?;
308
3
        Ok(())
309
3
    }
310

            
311
6
    fn create_truncate_transaction(
312
6
        mut blocks: Vec<BlockInfo>,
313
6
        new_length: u64,
314
6
        from: Truncate,
315
6
    ) -> Transaction {
316
6
        let total_length: u64 = blocks
317
6
            .iter()
318
38
            .map(|b| u64::try_from(b.length).unwrap())
319
6
            .sum();
320
6
        let mut tx = Transaction::new();
321
6
        if let Some(mut bytes_to_remove) = total_length.checked_sub(new_length) {
322
6
            let block_collection = Config::blocks_name();
323
36
            while bytes_to_remove > 0 && !blocks.is_empty() {
324
30
                let offset = match from {
325
24
                    Truncate::RemovingStart => 0,
326
6
                    Truncate::RemovingEnd => blocks.len() - 1,
327
                };
328
30
                let block_length = u64::try_from(blocks[offset].length).unwrap();
329
30
                if block_length <= bytes_to_remove {
330
30
                    tx.push(Operation::delete(
331
30
                        block_collection.clone(),
332
30
                        Header::try_from(blocks[offset].header)
333
30
                            .expect("u64 serialization can't fail"),
334
30
                    ));
335
30
                    blocks.remove(offset);
336
30
                    bytes_to_remove -= block_length;
337
30
                } else {
338
                    // Partial removal. For now, we're just not going to support
339
                    // partial removes. This is just purely to keep things simple.
340
                    break;
341
                }
342
            }
343
        }
344
6
        tx
345
6
    }
346

            
347
    #[cfg(feature = "async")]
348
3
    pub async fn truncate_async<Database: AsyncConnection>(
349
3
        file: &CollectionDocument<Self>,
350
3
        new_length: u64,
351
3
        from: Truncate,
352
3
        database: &Database,
353
3
    ) -> Result<(), bonsaidb_core::Error> {
354
3
        let tx = Self::create_truncate_transaction(
355
3
            Block::<Config>::for_file_async(file.header.id, database).await?,
356
3
            new_length,
357
3
            from,
358
3
        );
359
3

            
360
3
        tx.apply_async(database).await?;
361
3
        Ok(())
362
3
    }
363
}
364

            
365
impl<Config> Collection for File<Config>
366
where
367
    Config: FileConfig,
368
{
369
    type PrimaryKey = u32;
370

            
371
879
    fn collection_name() -> CollectionName {
372
879
        Config::files_name()
373
879
    }
374

            
375
    fn define_views(
376
        schema: &mut bonsaidb_core::schema::Schematic,
377
    ) -> Result<(), bonsaidb_core::Error> {
378
40
        schema.define_view(ByPath::<Config>::default())?;
379

            
380
40
        Ok(())
381
40
    }
382
}
383

            
384
impl<Config> DefaultSerialization for File<Config> where Config: FileConfig {}
385

            
386
80
#[derive_where(Clone, Debug, Default)]
387
490
#[derive(View, ViewSchema)]
388
#[view(name = "by-path", collection = File<Config>, key = OwnedFileKey, value = (TimestampAsNanoseconds, Config::Metadata))]
389
#[view(core = bonsaidb_core)]
390
#[view_schema(version = 3, policy = Unique, core = bonsaidb_core)]
391
pub struct ByPath<Config>(PhantomData<Config>)
392
where
393
    Config: FileConfig;
394

            
395
impl<Config> CollectionMapReduce for ByPath<Config>
396
where
397
    Config: FileConfig,
398
{
399
30
    fn map<'doc>(&self, doc: CollectionDocument<File<Config>>) -> ViewMapResult<'doc, Self> {
400
30
        doc.header.emit_key_and_value(
401
30
            OwnedFileKey(FileKey::Full {
402
30
                path: Cow::Owned(doc.contents.path.unwrap_or_else(|| String::from("/"))),
403
30
                name: Cow::Owned(doc.contents.name),
404
30
            }),
405
30
            (doc.contents.created_at, doc.contents.metadata),
406
30
        )
407
30
    }
408
}
409

            
410
impl<'a> PartialEq<FileKey<'a>> for OwnedFileKey {
411
    fn eq(&self, other: &FileKey<'a>) -> bool {
412
        self.0.eq(other)
413
    }
414
}
415

            
416
impl<'a> IntoPrefixRange<'a, OwnedFileKey> for FileKey<'a> {
417
26
    fn to_prefix_range(&'a self) -> RangeRef<'a, OwnedFileKey, Self> {
418
26
        match self {
419
26
            FileKey::ExactPath { start, end } | FileKey::RecursivePath { start, end } => RangeRef {
420
26
                start: BoundRef::borrowed(Bound::Included(start)),
421
26
                end: BoundRef::borrowed(Bound::Excluded(end)),
422
26
            },
423
            FileKey::Full { .. }
424
            | FileKey::ExactPathPart { .. }
425
            | FileKey::RecursivePathPart { .. } => {
426
                unreachable!()
427
            }
428
        }
429
26
    }
430
}
431

            
432
39
fn convert_mappings_to_documents<Config: FileConfig>(
433
39
    mappings: ViewMappings<ByPath<Config>>,
434
39
) -> Vec<CollectionDocument<File<Config>>> {
435
39
    let mut docs = Vec::with_capacity(mappings.len());
436
70
    for mapping in mappings {
437
31
        if let OwnedFileKey(FileKey::Full { path, name }) = mapping.key {
438
            docs.push(CollectionDocument {
439
31
                header: mapping.source,
440
31
                contents: File {
441
31
                    path: match path.into_owned() {
442
31
                        path if path == "/" => None,
443
30
                        other => Some(other),
444
                    },
445
31
                    name: name.into_owned(),
446
31
                    created_at: mapping.value.0,
447
31
                    metadata: mapping.value.1,
448
31
                    _name: PhantomData,
449
                },
450
            });
451
        }
452
    }
453

            
454
39
    docs
455
39
}
456

            
457
#[derive(Debug, Clone, PartialEq)]
458
pub struct OwnedFileKey(FileKey<'static>);
459

            
460
impl<'a> Borrow<FileKey<'a>> for OwnedFileKey {
461
    fn borrow(&self) -> &FileKey<'a> {
462
        &self.0
463
    }
464
}
465

            
466
#[derive(Debug, Clone, PartialEq)]
467
enum FileKey<'a> {
468
    Full {
469
        path: Cow<'a, str>,
470
        name: Cow<'a, str>,
471
    },
472
    ExactPath {
473
        start: Box<FileKey<'a>>,
474
        end: Box<FileKey<'a>>,
475
    },
476
    ExactPathPart {
477
        path: &'a str,
478
        start: bool,
479
    },
480
    RecursivePath {
481
        start: Box<FileKey<'a>>,
482
        end: Box<FileKey<'a>>,
483
    },
484
    RecursivePathPart {
485
        path: &'a str,
486
        start: bool,
487
    },
488
}
489

            
490
impl<'k> Key<'k> for OwnedFileKey {
491
    const CAN_OWN_BYTES: bool = false;
492

            
493
101
    fn from_ord_bytes<'e>(bytes: ByteSource<'k, 'e>) -> Result<Self, Self::Error> {
494
101
        let mut decoder = CompositeKeyDecoder::default_for(bytes);
495

            
496
101
        let path = Cow::Owned(decoder.decode::<String>()?);
497
101
        let name = Cow::Owned(decoder.decode::<String>()?);
498
101
        decoder.finish()?;
499

            
500
101
        Ok(Self(FileKey::Full { path, name }))
501
101
    }
502
}
503

            
504
impl KeyEncoding<Self> for OwnedFileKey {
505
    type Error = CompositeKeyError;
506

            
507
    const LENGTH: Option<usize> = None;
508

            
509
    fn describe<Visitor>(visitor: &mut Visitor)
510
    where
511
        Visitor: bonsaidb_core::key::KeyVisitor,
512
    {
513
        FileKey::describe(visitor);
514
    }
515

            
516
54
    fn as_ord_bytes(&self) -> Result<std::borrow::Cow<'_, [u8]>, Self::Error> {
517
54
        self.0.as_ord_bytes()
518
54
    }
519
}
520

            
521
impl<'fk> KeyEncoding<OwnedFileKey> for FileKey<'fk> {
522
    type Error = CompositeKeyError;
523

            
524
    const LENGTH: Option<usize> = None;
525

            
526
    fn describe<Visitor>(visitor: &mut Visitor)
527
    where
528
        Visitor: bonsaidb_core::key::KeyVisitor,
529
    {
530
        visitor.visit_composite(
531
            CompositeKind::Struct(Cow::Borrowed("bonsaidb_files::schema::file::FileKey")),
532
            2,
533
        );
534
        visitor.visit_type(KeyKind::String);
535
        visitor.visit_type(KeyKind::String);
536
    }
537

            
538
147
    fn as_ord_bytes(&self) -> Result<std::borrow::Cow<'_, [u8]>, Self::Error> {
539
147
        match self {
540
95
            FileKey::Full { path, name } => {
541
95
                let mut encoder = CompositeKeyEncoder::default();
542
95
                encoder.encode(&path)?;
543
95
                encoder.encode(&name)?;
544
95
                Ok(Cow::Owned(encoder.finish()))
545
            }
546
16
            FileKey::ExactPathPart { path, start } => {
547
16
                let mut bytes = Vec::new();
548
16
                // The path needs to end with a /. Rather than force an allocation to
549
16
                // append it to a string before calling encode_composite_key, we're
550
16
                // manually encoding the key taking this adjustment into account.
551
16

            
552
16
                bytes.extend(path.bytes());
553
16

            
554
16
                if !path.as_bytes().ends_with(b"/") {
555
4
                    bytes.push(b'/');
556
12
                }
557
                // Variable encoding adds a null byte at the end of the string, we can
558
                // use this padding byte to create our exclusive range
559
16
                if *start {
560
8
                    bytes.push(0);
561
8
                } else {
562
8
                    bytes.push(1);
563
8
                }
564
16
                Ok(Cow::Owned(bytes))
565
            }
566
36
            FileKey::RecursivePathPart { path, start } => {
567
36
                let mut encoder = CompositeKeyEncoder::default();
568
36
                if *start {
569
18
                    encoder.encode(path)?;
570
                } else {
571
18
                    let next = next_string_sequence(path);
572
18
                    encoder.encode(&next)?;
573
                }
574
36
                Ok(Cow::Owned(encoder.finish()))
575
            }
576
            FileKey::ExactPath { .. } | FileKey::RecursivePath { .. } => unreachable!(),
577
        }
578
147
    }
579
}