1
#[cfg(feature = "async")]
2
use std::collections::BTreeMap;
3
use std::collections::VecDeque;
4
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
5
use std::marker::PhantomData;
6
#[cfg(feature = "async")]
7
use std::task::Poll;
8

            
9
use bonsaidb_core::connection::Connection;
10
use bonsaidb_core::document::{CollectionDocument, CollectionHeader};
11
use bonsaidb_core::key::time::TimestampAsNanoseconds;
12
use bonsaidb_core::schema::SerializedCollection;
13
#[cfg(feature = "async")]
14
use bonsaidb_core::{circulate::flume, connection::AsyncConnection};
15
use derive_where::derive_where;
16
#[cfg(feature = "async")]
17
use futures::{future::BoxFuture, ready, FutureExt};
18
#[cfg(feature = "async")]
19
use tokio::io::AsyncWriteExt;
20

            
21
use crate::schema::block::BlockAppendInfo;
22
use crate::schema::{self};
23
use crate::{BonsaiFiles, Error, FileConfig, Statistics, Truncate};
24

            
25
/// A handle to a file stored in a database.
26
10
#[derive_where(Debug, Clone)]
27
pub struct File<Database: Clone, Config: FileConfig = BonsaiFiles> {
28
    doc: CollectionDocument<schema::file::File<Config>>,
29
    #[derive_where(skip(Debug))]
30
    database: Database,
31
}
32

            
33
impl<Database, Config> PartialEq for File<Database, Config>
34
where
35
    Database: Clone,
36
    Config: FileConfig,
37
{
38
4
    fn eq(&self, other: &Self) -> bool {
39
4
        self.doc.header == other.doc.header
40
4
    }
41
}
42

            
43
/// A blocking database connection.
44
18
#[derive(Clone)]
45
pub struct Blocking<Database: Connection>(Database);
46

            
47
/// An async database connection.
48
#[cfg(feature = "async")]
49
21
#[derive(Clone)]
50
pub struct Async<Database: AsyncConnection>(Database);
51

            
52
impl<Database, Config> File<Blocking<Database>, Config>
53
where
54
    Database: Connection + Clone,
55
    Config: FileConfig,
56
{
57
13
    fn new_file(
58
13
        path: Option<String>,
59
13
        name: String,
60
13
        contents: &[u8],
61
13
        metadata: Config::Metadata,
62
13
        database: Database,
63
13
    ) -> Result<Self, Error> {
64
13
        Ok(Self {
65
13
            doc: schema::file::File::create_file(path, name, contents, metadata, &database)?,
66
11
            database: Blocking(database),
67
        })
68
13
    }
69

            
70
2
    pub(crate) fn get(id: u32, database: &Database) -> Result<Option<Self>, bonsaidb_core::Error> {
71
2
        schema::file::File::<Config>::get(&id, database).map(|doc| {
72
2
            doc.map(|doc| Self {
73
2
                doc,
74
2
                database: Blocking(database.clone()),
75
2
            })
76
2
        })
77
2
    }
78

            
79
13
    pub(crate) fn load(path: &str, database: &Database) -> Result<Option<Self>, Error> {
80
13
        schema::file::File::<Config>::find(path, database).map(|opt| {
81
13
            opt.map(|doc| Self {
82
8
                doc,
83
8
                database: Blocking(database.clone()),
84
13
            })
85
13
        })
86
13
    }
87

            
88
4
    pub(crate) fn list(path: &str, database: &Database) -> Result<Vec<Self>, bonsaidb_core::Error> {
89
4
        schema::file::File::<Config>::list_path_contents(path, database).map(|vec| {
90
4
            vec.into_iter()
91
4
                .map(|doc| Self {
92
2
                    doc,
93
2
                    database: Blocking(database.clone()),
94
4
                })
95
4
                .collect()
96
4
        })
97
4
    }
98

            
99
4
    pub(crate) fn list_recursive(
100
4
        path: &str,
101
4
        database: &Database,
102
4
    ) -> Result<Vec<Self>, bonsaidb_core::Error> {
103
4
        schema::file::File::<Config>::list_recursive_path_contents(path, database).map(|vec| {
104
4
            vec.into_iter()
105
6
                .map(|doc| Self {
106
6
                    doc,
107
6
                    database: Blocking(database.clone()),
108
6
                })
109
4
                .collect()
110
4
        })
111
4
    }
112

            
113
5
    pub(crate) fn stats_for_path(
114
5
        path: &str,
115
5
        database: &Database,
116
5
    ) -> Result<Statistics, bonsaidb_core::Error> {
117
5
        schema::file::File::<Config>::summarize_recursive_path_contents(path, database)
118
5
    }
119

            
120
    /// Return all direct descendents of this file. For example, consider this
121
    /// list of files:
122
    ///
123
    /// - /top-level
124
    /// - /top-level/sub-level
125
    /// - /top-level/sub-level/file.txt
126
    ///
127
    /// If this instance were `/top-level`, this function would return
128
    /// `sub-level` but not `sub-level/file.txt`.
129
    pub fn children(&self) -> Result<Vec<Self>, bonsaidb_core::Error> {
130
        schema::file::File::<Config>::list_path_contents(&self.path(), &self.database.0).map(
131
            |docs| {
132
                docs.into_iter()
133
                    .map(|doc| Self {
134
                        doc,
135
                        database: self.database.clone(),
136
                    })
137
                    .collect()
138
            },
139
        )
140
    }
141

            
142
    /// Moves this file to a new location. If `new_path` ends with a `/`, the
143
    /// file will be moved to that path with its name preserved. Otherwise, the
144
    /// file will be renamed as part of the move.
145
    ///
146
    /// For example, moving `/a/file.txt` to `/b/` will result in the full path
147
    /// being `/b/file.txt`. Moving `/a/file.txt` to `/b/new-name.txt` will
148
    /// result in the full path being `/b/new-name.txt`.
149
2
    pub fn move_to(&mut self, new_path: &str) -> Result<(), Error> {
150
2
        if !new_path.as_bytes().starts_with(b"/") {
151
            return Err(Error::InvalidPath);
152
2
        }
153
2

            
154
2
        let mut doc = self.update_document_for_move(new_path);
155
2
        doc.update(&self.database.0)?;
156
2
        self.doc = doc;
157
2
        Ok(())
158
2
    }
159

            
160
    /// Renames this file to the new name.
161
1
    pub fn rename(&mut self, new_name: String) -> Result<(), Error> {
162
1
        if new_name.as_bytes().contains(&b'/') {
163
            return Err(Error::InvalidName);
164
1
        }
165
1

            
166
1
        // Prevent mutating self until after the database is updated.
167
1
        let mut doc = self.doc.clone();
168
1
        doc.contents.name = new_name;
169
1
        doc.update(&self.database.0)?;
170
1
        self.doc = doc;
171
1
        Ok(())
172
1
    }
173

            
174
    /// Deletes the file.
175
    pub fn delete(&self) -> Result<(), Error> {
176
3
        schema::block::Block::<Config>::delete_for_file(self.doc.header.id, &self.database.0)?;
177
3
        self.doc.delete(&self.database.0)?;
178
3
        Ok(())
179
3
    }
180

            
181
6
    fn map_block_metadata<F: FnOnce(BlockAppendInfo) -> T, T>(
182
6
        &mut self,
183
6
        callback: F,
184
6
    ) -> Result<T, bonsaidb_core::Error> {
185
6
        let metadata =
186
6
            schema::block::Block::<Config>::summary_for_file(self.doc.header.id, &self.database.0)?;
187

            
188
6
        Ok(callback(metadata))
189
6
    }
190

            
191
    /// Returns the length of the file.
192
    #[allow(clippy::missing_panics_doc)]
193
4
    pub fn len(&mut self) -> Result<u64, bonsaidb_core::Error> {
194
4
        self.map_block_metadata(|metadata| metadata.length)
195
4
    }
196

            
197
    /// Returns true if this file contains no data.
198
    #[allow(clippy::missing_panics_doc)]
199
    pub fn is_empty(&mut self) -> Result<bool, bonsaidb_core::Error> {
200
        Ok(self.len()? == 0)
201
    }
202

            
203
    /// Returns the timestamp of the last append to the file. This function
204
    /// returns 0 when the file is empty, even if the file was previously
205
    /// written to.
206
    #[allow(clippy::missing_panics_doc)]
207
2
    pub fn last_appended_at(
208
2
        &mut self,
209
2
    ) -> Result<Option<TimestampAsNanoseconds>, bonsaidb_core::Error> {
210
2
        self.map_block_metadata(|metadata| metadata.timestamp)
211
2
    }
212

            
213
    /// Returns the contents of the file, which allows random and buffered
214
    /// access to the file stored in the database.
215
    ///
216
    /// The default buffer size is ten times
217
    /// [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE).
218
15
    pub fn contents(&self) -> Result<Contents<Blocking<Database>, Config>, bonsaidb_core::Error> {
219
15
        let blocks = schema::block::Block::<Config>::for_file(self.id(), &self.database.0)?;
220
15
        Ok(Contents {
221
15
            database: self.database.clone(),
222
15
            blocks,
223
15
            loaded: VecDeque::default(),
224
15
            current_block: 0,
225
15
            offset: 0,
226
15
            buffer_size: Config::BLOCK_SIZE * 10,
227
15
            #[cfg(feature = "async")]
228
15
            async_blocks: None,
229
15
            _config: PhantomData,
230
15
        })
231
15
    }
232

            
233
    /// Truncates the file, removing data from either the start or end of the
234
    /// file until the file is within
235
    /// [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE) of `new_length`.
236
    /// Truncating currently will not split a block, causing the resulting
237
    /// length to not always match the length requested.
238
    ///
239
    /// If `new_length` is 0 and this call succeeds, the file's length is
240
    /// guaranteed to be 0.
241
3
    pub fn truncate(&self, new_length: u64, from: Truncate) -> Result<(), bonsaidb_core::Error> {
242
3
        schema::file::File::<Config>::truncate(&self.doc, new_length, from, &self.database.0)
243
3
    }
244

            
245
    /// Appends `data` to the end of the file. The data will be split into
246
    /// chunks no larger than [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE)
247
    /// when stored in the database.
248
5
    pub fn append(&self, data: &[u8]) -> Result<(), bonsaidb_core::Error> {
249
5
        schema::block::Block::<Config>::append(data, self.doc.header.id, &self.database.0)
250
5
    }
251

            
252
    /// Returns a writer that will buffer writes to the end of the file.
253
3
    pub fn append_buffered(&mut self) -> BufferedAppend<'_, Config, Database> {
254
3
        BufferedAppend {
255
3
            file: self,
256
3
            buffer: Vec::new(),
257
3
            _config: PhantomData,
258
3
        }
259
3
    }
260

            
261
    /// Stores changes to the metadata of this document.
262
1
    pub fn update_metadata(&mut self) -> Result<(), bonsaidb_core::Error> {
263
1
        self.doc.update(&self.database.0)
264
1
    }
265
}
266

            
267
#[cfg(feature = "async")]
268
impl<Database, Config> File<Async<Database>, Config>
269
where
270
    Database: AsyncConnection + Clone,
271
    Config: FileConfig,
272
{
273
11
    async fn new_file_async(
274
11
        path: Option<String>,
275
11
        name: String,
276
11
        contents: &[u8],
277
11
        metadata: Config::Metadata,
278
11
        database: Database,
279
11
    ) -> Result<Self, Error> {
280
        Ok(Self {
281
11
            doc: schema::file::File::create_file_async(path, name, contents, metadata, &database)
282
18
                .await?,
283
9
            database: Async(database),
284
        })
285
11
    }
286

            
287
2
    pub(crate) async fn get_async(
288
2
        id: u32,
289
2
        database: &Database,
290
2
    ) -> Result<Option<Self>, bonsaidb_core::Error> {
291
2
        schema::file::File::<Config>::get_async(&id, database)
292
2
            .await
293
2
            .map(|doc| {
294
2
                doc.map(|doc| Self {
295
2
                    doc,
296
2
                    database: Async(database.clone()),
297
2
                })
298
2
            })
299
2
    }
300

            
301
10
    pub(crate) async fn load_async(path: &str, database: &Database) -> Result<Option<Self>, Error> {
302
10
        schema::file::File::<Config>::find_async(path, database)
303
10
            .await
304
10
            .map(|opt| {
305
10
                opt.map(|doc| Self {
306
7
                    doc,
307
7
                    database: Async(database.clone()),
308
10
                })
309
10
            })
310
10
    }
311

            
312
4
    pub(crate) async fn list_async(
313
4
        path: &str,
314
4
        database: &Database,
315
4
    ) -> Result<Vec<Self>, bonsaidb_core::Error> {
316
4
        schema::file::File::<Config>::list_path_contents_async(path, database)
317
4
            .await
318
4
            .map(|vec| {
319
4
                vec.into_iter()
320
4
                    .map(|doc| Self {
321
2
                        doc,
322
2
                        database: Async(database.clone()),
323
4
                    })
324
4
                    .collect()
325
4
            })
326
4
    }
327

            
328
4
    pub(crate) async fn list_recursive_async(
329
4
        path: &str,
330
4
        database: &Database,
331
4
    ) -> Result<Vec<Self>, bonsaidb_core::Error> {
332
4
        schema::file::File::<Config>::list_recursive_path_contents_async(path, database)
333
4
            .await
334
4
            .map(|vec| {
335
4
                vec.into_iter()
336
6
                    .map(|doc| Self {
337
6
                        doc,
338
6
                        database: Async(database.clone()),
339
6
                    })
340
4
                    .collect()
341
4
            })
342
4
    }
343

            
344
5
    pub(crate) async fn stats_for_path_async(
345
5
        path: &str,
346
5
        database: &Database,
347
5
    ) -> Result<Statistics, bonsaidb_core::Error> {
348
10
        schema::file::File::<Config>::summarize_recursive_path_contents_async(path, database).await
349
5
    }
350

            
351
    /// Return all direct descendents of this file. For example, consider this
352
    /// list of files:
353
    ///
354
    /// - /top-level
355
    /// - /top-level/sub-level
356
    /// - /top-level/sub-level/file.txt
357
    ///
358
    /// If this instance were `/top-level`, this function would return
359
    /// `sub-level` but not `sub-level/file.txt`.
360
    pub async fn children(&self) -> Result<Vec<Self>, bonsaidb_core::Error> {
361
        schema::file::File::<Config>::list_path_contents_async(&self.path(), &self.database.0)
362
            .await
363
            .map(|docs| {
364
                docs.into_iter()
365
                    .map(|doc| Self {
366
                        doc,
367
                        database: self.database.clone(),
368
                    })
369
                    .collect()
370
            })
371
    }
372

            
373
    /// Moves this file to a new location. If `new_path` ends with a `/`, the
374
    /// file will be moved to that path with its name preserved. Otherwise, the
375
    /// file will be renamed as part of the move.
376
    ///
377
    /// For example, moving `/a/file.txt` to `/b/` will result in the full path
378
    /// being `/b/file.txt`. Moving `/a/file.txt` to `/b/new-name.txt` will
379
    /// result in the full path being `/b/new-name.txt`.
380
2
    pub async fn move_to(&mut self, new_path: &str) -> Result<(), Error> {
381
2
        if !new_path.as_bytes().starts_with(b"/") {
382
            return Err(Error::InvalidPath);
383
2
        }
384
2

            
385
2
        let mut doc = self.update_document_for_move(new_path);
386
2
        doc.update_async(&self.database.0).await?;
387
2
        self.doc = doc;
388
2
        Ok(())
389
2
    }
390

            
391
    /// Renames this file to the new name.
392
1
    pub async fn rename(&mut self, new_name: String) -> Result<(), Error> {
393
1
        if new_name.as_bytes().contains(&b'/') {
394
            return Err(Error::InvalidName);
395
1
        }
396
1

            
397
1
        // Prevent mutating self until after the database is updated.
398
1
        let mut doc = self.doc.clone();
399
1
        doc.contents.name = new_name;
400
1
        doc.update_async(&self.database.0).await?;
401
1
        self.doc = doc;
402
1
        Ok(())
403
1
    }
404

            
405
    /// Deletes the file.
406
3
    pub async fn delete(&self) -> Result<(), Error> {
407
3
        schema::block::Block::<Config>::delete_for_file_async(self.doc.header.id, &self.database.0)
408
3
            .await?;
409
3
        self.doc.delete_async(&self.database.0).await?;
410
3
        Ok(())
411
3
    }
412

            
413
6
    async fn map_block_metadata<F: FnOnce(BlockAppendInfo) -> T, T>(
414
6
        &mut self,
415
6
        callback: F,
416
6
    ) -> Result<T, bonsaidb_core::Error> {
417
6
        let metadata = schema::block::Block::<Config>::summary_for_file_async(
418
6
            self.doc.header.id,
419
6
            &self.database.0,
420
6
        )
421
6
        .await?;
422

            
423
6
        Ok(callback(metadata))
424
6
    }
425

            
426
    /// Returns the length of the file.
427
    #[allow(clippy::missing_panics_doc)]
428
4
    pub async fn len(&mut self) -> Result<u64, bonsaidb_core::Error> {
429
4
        self.map_block_metadata(|metadata| metadata.length).await
430
4
    }
431

            
432
    /// Returns true if this file contains no data.
433
    #[allow(clippy::missing_panics_doc)]
434
    pub async fn is_empty(&mut self) -> Result<bool, bonsaidb_core::Error> {
435
        Ok(self.len().await? == 0)
436
    }
437

            
438
    /// Returns the timestamp of the last append to the file. This function
439
    /// returns 0 when the file is empty, even if the file was previously
440
    /// written to.
441
    #[allow(clippy::missing_panics_doc)]
442
2
    pub async fn last_appended_at(
443
2
        &mut self,
444
2
    ) -> Result<Option<TimestampAsNanoseconds>, bonsaidb_core::Error> {
445
2
        self.map_block_metadata(|metadata| metadata.timestamp).await
446
2
    }
447

            
448
    /// Returns the contents of the file, which allows random and buffered
449
    /// access to the file stored in the database.
450
    ///
451
    /// The default buffer size is ten times
452
    /// [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE).
453
14
    pub async fn contents(
454
14
        &self,
455
14
    ) -> Result<Contents<Async<Database>, Config>, bonsaidb_core::Error> {
456
14
        let blocks =
457
14
            schema::block::Block::<Config>::for_file_async(self.id(), &self.database.0).await?;
458
14
        Ok(Contents {
459
14
            database: self.database.clone(),
460
14
            blocks,
461
14
            loaded: VecDeque::default(),
462
14
            current_block: 0,
463
14
            offset: 0,
464
14
            buffer_size: Config::BLOCK_SIZE * 10,
465
14
            #[cfg(feature = "async")]
466
14
            async_blocks: None,
467
14
            _config: PhantomData,
468
14
        })
469
14
    }
470

            
471
    /// Truncates the file, removing data from either the start or end of the
472
    /// file until the file is within
473
    /// [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE) of `new_length`.
474
    /// Truncating currently will not split a block, causing the resulting
475
    /// length to not always match the length requested.
476
    ///
477
    /// If `new_length` is 0 and this call succeeds, the file's length is
478
    /// guaranteed to be 0.
479
3
    pub async fn truncate(
480
3
        &self,
481
3
        new_length: u64,
482
3
        from: Truncate,
483
3
    ) -> Result<(), bonsaidb_core::Error> {
484
3
        schema::file::File::<Config>::truncate_async(&self.doc, new_length, from, &self.database.0)
485
6
            .await
486
3
    }
487

            
488
    /// Appends `data` to the end of the file. The data will be split into
489
    /// chunks no larger than [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE)
490
    /// when stored in the database.
491
3
    pub async fn append(&self, data: &[u8]) -> Result<(), bonsaidb_core::Error> {
492
3
        schema::block::Block::<Config>::append_async(data, self.doc.header.id, &self.database.0)
493
3
            .await
494
3
    }
495

            
496
    /// Returns a writer that will buffer writes to the end of the file.
497
2
    pub fn append_buffered(&mut self) -> AsyncBufferedAppend<'_, Config, Database> {
498
2
        AsyncBufferedAppend {
499
2
            file: self,
500
2
            buffer: Vec::new(),
501
2
            flush_future: None,
502
2
            _config: PhantomData,
503
2
        }
504
2
    }
505

            
506
    /// Stores changes to the metadata of this document.
507
1
    pub async fn update_metadata(&mut self) -> Result<(), bonsaidb_core::Error> {
508
1
        self.doc.update_async(&self.database.0).await
509
1
    }
510
}
511

            
512
impl<Database, Config> File<Database, Config>
513
where
514
    Database: Clone,
515
    Config: FileConfig,
516
{
517
    /// Returns the unique id of this file. The file id is only unique within a
518
    /// single database and [`FileConfig`].
519
33
    pub fn id(&self) -> u32 {
520
33
        self.doc.header.id
521
33
    }
522

            
523
    /// Returns the path containing this file. For example, if the full path to
524
    /// the file is `/some-path/file.txt`, this function will return
525
    /// `/some-path/`.
526
8
    pub fn containing_path(&self) -> &str {
527
8
        self.doc.contents.path.as_deref().unwrap_or("/")
528
8
    }
529

            
530
    /// Returns the name of this file.
531
10
    pub fn name(&self) -> &str {
532
10
        &self.doc.contents.name
533
10
    }
534

            
535
    /// Returns the absolute path of this file.
536
2
    pub fn path(&self) -> String {
537
2
        let containing_path = self.containing_path();
538
2
        let ends_in_slash = self.containing_path().ends_with('/');
539
2
        let mut full_path = String::with_capacity(
540
2
            containing_path.len() + usize::from(!ends_in_slash) + self.name().len(),
541
2
        );
542
2
        full_path.push_str(containing_path);
543
2
        if !ends_in_slash {
544
            full_path.push('/');
545
2
        }
546
2
        full_path.push_str(self.name());
547
2

            
548
2
        full_path
549
2
    }
550

            
551
    /// Returns the timestamp the file was created at.
552
    pub fn created_at(&self) -> TimestampAsNanoseconds {
553
        self.doc.contents.created_at
554
    }
555

            
556
    /// Returns the metadata for this file.
557
6
    pub fn metadata(&self) -> &Config::Metadata {
558
6
        &self.doc.contents.metadata
559
6
    }
560

            
561
    /// Returns mutable access metadata for this file. Modifying the metadata
562
    /// will not update it in the database. Be sure to call `update_metadata()`
563
    /// or another operation that persists the file.
564
2
    pub fn metadata_mut(&mut self) -> &mut Config::Metadata {
565
2
        &mut self.doc.contents.metadata
566
2
    }
567

            
568
4
    fn update_document_for_move(
569
4
        &self,
570
4
        new_path: &str,
571
4
    ) -> CollectionDocument<schema::file::File<Config>> {
572
4
        let mut doc = self.doc.clone();
573
4
        if new_path.as_bytes().ends_with(b"/") {
574
2
            if new_path.len() > 1 {
575
2
                doc.contents.path = Some(new_path.to_string());
576
2
            } else {
577
                doc.contents.path = None;
578
            }
579
2
        } else {
580
2
            let (path, name) = new_path.rsplit_once('/').unwrap();
581
2
            doc.contents.path = (!path.is_empty()).then(|| path.to_string());
582
2
            doc.contents.name = name.to_string();
583
2
        }
584

            
585
        // Force path to end in a slash
586
4
        if let Some(path) = doc.contents.path.as_mut() {
587
4
            if path.bytes().last() != Some(b'/') {
588
2
                path.push('/');
589
2
            }
590
        }
591

            
592
4
        doc
593
4
    }
594
}
595

            
596
/// A builder to create a [`File`].
597
#[derive(Debug, Clone)]
598
#[must_use]
599
pub struct FileBuilder<'a, Config>
600
where
601
    Config: FileConfig,
602
{
603
    path: Option<String>,
604
    name: String,
605
    contents: &'a [u8],
606
    metadata: Config::Metadata,
607
    _config: PhantomData<Config>,
608
}
609

            
610
impl<'a, Config: FileConfig> FileBuilder<'a, Config> {
611
24
    pub(crate) fn new<NameOrPath: AsRef<str>>(
612
24
        name_or_path: NameOrPath,
613
24
        metadata: Config::Metadata,
614
24
    ) -> Self {
615
24
        let mut name_or_path = name_or_path.as_ref();
616
24
        let (path, name) = if name_or_path.starts_with('/') {
617
            // Trim the trailing / if there is one.
618
9
            if name_or_path.ends_with('/') && name_or_path.len() > 1 {
619
                name_or_path = &name_or_path[..name_or_path.len() - 1];
620
9
            }
621
9
            let (path, name) = name_or_path.rsplit_once('/').unwrap();
622
9
            let path = match path {
623
9
                "" | "/" => None,
624
8
                other => Some(other.to_string()),
625
            };
626
9
            (path, name.to_string())
627
        } else {
628
15
            (None, name_or_path.to_string())
629
        };
630
24
        Self {
631
24
            path,
632
24
            name,
633
24
            contents: b"",
634
24
            metadata,
635
24
            _config: PhantomData,
636
24
        }
637
24
    }
638

            
639
    /// Creates this file at `path`. This does not change the file's name
640
    /// specified when creating the builder.
641
4
    pub fn at_path<Path: Into<String>>(mut self, path: Path) -> Self {
642
4
        self.path = Some(path.into());
643
4
        self
644
4
    }
645

            
646
    /// Sets the file's initial contents.
647
19
    pub fn contents(mut self, contents: &'a [u8]) -> Self {
648
19
        self.contents = contents;
649
19
        self
650
19
    }
651

            
652
    /// Sets the file's initial metadata.
653
2
    pub fn metadata(mut self, metadata: Config::Metadata) -> Self {
654
2
        self.metadata = metadata;
655
2
        self
656
2
    }
657

            
658
    /// Creates the file and returns a handle to the created file.
659
13
    pub fn create<Database: Connection + Clone>(
660
13
        self,
661
13
        database: &Database,
662
13
    ) -> Result<File<Blocking<Database>, Config>, Error> {
663
13
        File::new_file(
664
13
            self.path,
665
13
            self.name,
666
13
            self.contents,
667
13
            self.metadata,
668
13
            database.clone(),
669
13
        )
670
13
    }
671

            
672
    /// Creates the file and returns a handle to the created file.
673
    #[cfg(feature = "async")]
674
11
    pub async fn create_async<Database: bonsaidb_core::connection::AsyncConnection + Clone>(
675
11
        self,
676
11
        database: &Database,
677
11
    ) -> Result<File<Async<Database>, Config>, Error> {
678
11
        File::new_file_async(
679
11
            self.path,
680
11
            self.name,
681
11
            self.contents,
682
11
            self.metadata,
683
11
            database.clone(),
684
11
        )
685
18
        .await
686
11
    }
687
}
688

            
689
/// Buffered access to the contents of a [`File`].
690
#[must_use]
691
pub struct Contents<Database: Clone, Config: FileConfig> {
692
    database: Database,
693
    blocks: Vec<BlockInfo>,
694
    loaded: VecDeque<LoadedBlock>,
695
    current_block: usize,
696
    offset: usize,
697
    buffer_size: usize,
698
    #[cfg(feature = "async")]
699
    async_blocks: Option<AsyncBlockTask>,
700
    _config: PhantomData<Config>,
701
}
702

            
703
#[cfg(feature = "async")]
704
struct AsyncBlockTask {
705
    block_receiver:
706
        flume::r#async::RecvFut<'static, Result<BTreeMap<u64, Vec<u8>>, std::io::Error>>,
707
    requested: bool,
708
    request_sender: flume::Sender<Vec<u64>>,
709
}
710

            
711
impl<Database: Clone, Config: FileConfig> Clone for Contents<Database, Config> {
712
6
    fn clone(&self) -> Self {
713
6
        Self {
714
6
            database: self.database.clone(),
715
6
            blocks: self.blocks.clone(),
716
6
            loaded: VecDeque::new(),
717
6
            current_block: self.current_block,
718
6
            offset: self.offset,
719
6
            buffer_size: self.buffer_size,
720
6
            #[cfg(feature = "async")]
721
6
            async_blocks: None,
722
6
            _config: PhantomData,
723
6
        }
724
6
    }
725
}
726

            
727
#[derive(Clone)]
728
struct LoadedBlock {
729
    index: usize,
730
    contents: Vec<u8>,
731
}
732

            
733
impl<Database: Connection + Clone, Config: FileConfig> Contents<Blocking<Database>, Config> {
734
    /// Returns the remaining contents as a `Vec<u8>`. If no bytes have been
735
    /// read, this returns the entire contents.
736
3
    pub fn to_vec(&self) -> std::io::Result<Vec<u8>> {
737
3
        self.clone().into_vec()
738
3
    }
739

            
740
    /// Returns the remaining contents as a string. If no bytes have been read,
741
    /// this returns the entire contents.
742
1
    pub fn to_string(&self) -> std::io::Result<String> {
743
1
        String::from_utf8(self.to_vec()?)
744
1
            .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
745
1
    }
746

            
747
    /// Returns the remaining contents as a `Vec<u8>`. If no bytes have been
748
    /// read, this returns the entire contents.
749
    #[allow(clippy::missing_panics_doc)] // Not reachable
750
9
    pub fn into_vec(mut self) -> std::io::Result<Vec<u8>> {
751
9
        let mut contents = Vec::with_capacity(usize::try_from(self.len()).unwrap());
752
9
        self.read_to_end(&mut contents)?;
753
9
        Ok(contents)
754
9
    }
755

            
756
    /// Returns the remaining contents as a string. If no bytes have been read,
757
    /// this returns the entire contents.
758
1
    pub fn into_string(self) -> std::io::Result<String> {
759
1
        String::from_utf8(self.into_vec()?)
760
1
            .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
761
1
    }
762

            
763
19
    fn load_blocks(&mut self) -> std::io::Result<()> {
764
19
        self.loaded.clear();
765
45
        for (index, (_, contents)) in
766
19
            schema::block::Block::<Config>::load(&self.next_blocks(), &self.database.0)
767
19
                .map_err(|err| std::io::Error::new(ErrorKind::Other, err))?
768
19
                .into_iter()
769
19
                .enumerate()
770
45
        {
771
45
            self.loaded.push_back(LoadedBlock {
772
45
                index: self.current_block + index,
773
45
                contents,
774
45
            });
775
45
        }
776

            
777
19
        Ok(())
778
19
    }
779
}
780

            
781
#[cfg(feature = "async")]
782
impl<
783
        Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
784
        Config: FileConfig,
785
    > Contents<Async<Database>, Config>
786
{
787
    /// Returns the remaining contents as a `Vec<u8>`. If no bytes have been
788
    /// read, this returns the entire contents.
789
3
    pub async fn to_vec(&self) -> std::io::Result<Vec<u8>> {
790
3
        self.clone().into_vec().await
791
3
    }
792

            
793
    /// Returns the remaining contents as a `Vec<u8>`. If no bytes have been
794
    /// read, this returns the entire contents.
795
    #[allow(clippy::missing_panics_doc)] // Not reachable
796
9
    pub async fn into_vec(mut self) -> std::io::Result<Vec<u8>> {
797
9
        let mut contents = vec![0; usize::try_from(self.len()).unwrap()];
798
10
        <Self as tokio::io::AsyncReadExt>::read_exact(&mut self, &mut contents).await?;
799
9
        Ok(contents)
800
9
    }
801

            
802
    /// Returns the remaining contents as a string. If no bytes have been read,
803
    /// this returns the entire contents.
804
1
    pub async fn to_string(&self) -> std::io::Result<String> {
805
1
        String::from_utf8(self.to_vec().await?)
806
1
            .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
807
1
    }
808

            
809
    /// Returns the remaining contents as a string. If no bytes have been read,
810
    /// this returns the entire contents.
811
1
    pub async fn into_string(self) -> std::io::Result<String> {
812
1
        String::from_utf8(self.into_vec().await?)
813
1
            .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
814
1
    }
815

            
816
37
    fn spawn_block_fetching_task(&mut self) {
817
37
        if self.async_blocks.is_none() {
818
12
            // Spawn the task
819
12
            let (block_sender, block_receiver) = flume::unbounded();
820
12
            let (request_sender, request_receiver) = flume::unbounded();
821
12

            
822
12
            let task_database = self.database.0.clone();
823
12
            tokio::task::spawn(async move {
824
30
                while let Ok(doc_ids) = request_receiver.recv_async().await {
825
18
                    let blocks =
826
18
                        schema::block::Block::<Config>::load_async(&doc_ids, &task_database)
827
18
                            .await
828
18
                            .map_err(|err| std::io::Error::new(ErrorKind::Other, err));
829
18
                    if block_sender.send(blocks).is_err() {
830
                        break;
831
18
                    }
832
                }
833
12
            });
834
12

            
835
12
            self.async_blocks = Some(AsyncBlockTask {
836
12
                block_receiver: block_receiver.into_recv_async(),
837
12
                request_sender,
838
12
                requested: false,
839
12
            });
840
25
        }
841
37
    }
842

            
843
54
    fn fetch_blocks(
844
54
        &mut self,
845
54
        cx: &mut std::task::Context<'_>,
846
54
    ) -> Poll<Result<bool, std::io::Error>> {
847
54
        if self.async_blocks.as_mut().unwrap().requested {
848
36
            match ready!(self
849
36
                .async_blocks
850
36
                .as_mut()
851
36
                .unwrap()
852
36
                .block_receiver
853
36
                .poll_unpin(cx))
854
            {
855
18
                Ok(Ok(blocks)) => {
856
18
                    self.async_blocks.as_mut().unwrap().requested = false;
857
44
                    for (index, (_, contents)) in blocks.into_iter().enumerate() {
858
44
                        let loaded_block = LoadedBlock {
859
44
                            index: self.current_block + index,
860
44
                            contents,
861
44
                        };
862
44
                        self.loaded.push_back(loaded_block);
863
44
                    }
864
18
                    Poll::Ready(Ok(true))
865
                }
866
                Ok(Err(db_err)) => Poll::Ready(Err(std::io::Error::new(ErrorKind::Other, db_err))),
867
                Err(flume_error) => {
868
                    Poll::Ready(Err(std::io::Error::new(ErrorKind::BrokenPipe, flume_error)))
869
                }
870
            }
871
        } else {
872
18
            let blocks = self.next_blocks();
873
18
            if blocks.is_empty() {
874
                return Poll::Ready(Ok(false));
875
18
            }
876
18
            self.loaded.clear();
877
18
            self.async_blocks.as_mut().unwrap().requested = true;
878
18
            if let Err(err) = self
879
18
                .async_blocks
880
18
                .as_mut()
881
18
                .unwrap()
882
18
                .request_sender
883
18
                .send(blocks)
884
            {
885
                return Poll::Ready(Err(std::io::Error::new(ErrorKind::BrokenPipe, err)));
886
18
            }
887
18

            
888
18
            Poll::Ready(Ok(true))
889
        }
890
54
    }
891
}
892

            
893
impl<Database: Clone, Config: FileConfig> Contents<Database, Config> {
894
37
    fn next_blocks(&self) -> Vec<u64> {
895
37
        let mut last_block = self.current_block;
896
37
        let mut requesting_size = 0;
897
101
        for index in self.current_block..self.blocks.len() {
898
101
            let size_if_requested = self.blocks[index].length.saturating_add(requesting_size);
899
101
            if size_if_requested > self.buffer_size {
900
12
                break;
901
89
            }
902
89

            
903
89
            requesting_size = size_if_requested;
904
89
            last_block = index;
905
        }
906

            
907
37
        self.blocks[self.current_block..=last_block]
908
37
            .iter()
909
89
            .map(|info| info.header.id)
910
37
            .collect()
911
37
    }
912

            
913
    /// Sets the maximum buffer size in bytes and returns `self`. When buffering
914
    /// reads from the database, requests will be made to fill at-most
915
    /// `size_in_bytes` of memory.
916
4
    pub fn with_buffer_size(mut self, size_in_bytes: usize) -> Self {
917
4
        self.buffer_size = size_in_bytes;
918
4
        self
919
4
    }
920

            
921
    /// Returns the total length of the file.
922
    #[allow(clippy::missing_panics_doc)] // Not reachable
923
    #[must_use]
924
38
    pub fn len(&self) -> u64 {
925
38
        self.blocks
926
38
            .last()
927
38
            .map(|b| b.offset + u64::try_from(b.length).unwrap())
928
38
            .unwrap_or_default()
929
38
    }
930

            
931
    /// Returns true if the file's length is 0.
932
    #[must_use]
933
    pub fn is_empty(&self) -> bool {
934
        self.blocks.is_empty() || (self.blocks.len() == 1 && self.blocks[0].length == 0)
935
    }
936

            
937
    /// Returns the timestamp that the last data was written to the file.
938
    /// Returns None if the file is empty.
939
    #[must_use]
940
5
    pub fn last_appended_at(&self) -> Option<TimestampAsNanoseconds> {
941
5
        self.blocks.last().map(|b| b.timestamp)
942
5
    }
943

            
944
16
    fn non_blocking_read_block(&mut self) -> NonBlockingBlockReadResult {
945
16
        let block = self.loaded.pop_front();
946

            
947
16
        if let Some(mut block) = block {
948
6
            if block.index == self.current_block {
949
6
                self.current_block += 1;
950
6
                if self.offset > 0 {
951
2
                    block.contents.splice(..self.offset, []);
952
2
                    self.offset = 0;
953
4
                }
954
6
                return NonBlockingBlockReadResult::ReadBlock(block.contents);
955
            }
956
10
        }
957

            
958
        // We need to load blocks. We need to ensure we aren't in an EOF
959
        // position.
960
10
        let is_last_block = self.current_block + 1 == self.blocks.len();
961
10
        if self.current_block < self.blocks.len()
962
2
            || (is_last_block && self.offset < self.blocks.last().unwrap().length)
963
        {
964
8
            return NonBlockingBlockReadResult::NeedBlocks;
965
2
        }
966
2

            
967
2
        NonBlockingBlockReadResult::Eof
968
16
    }
969

            
970
142
    fn non_blocking_read<F: FnMut(&[u8]) -> usize>(
971
142
        &mut self,
972
142
        mut read_callback: F,
973
142
    ) -> NonBlockingReadResult {
974
        loop {
975
156
            if self.loaded.is_empty() || self.loaded.front().unwrap().index != self.current_block {
976
77
                let is_last_block = self.current_block + 1 == self.blocks.len();
977
77

            
978
77
                if self.current_block < self.blocks.len()
979
12
                    || (is_last_block && self.offset < self.blocks.last().unwrap().length)
980
                {
981
65
                    return NonBlockingReadResult::NeedBlocks;
982
12
                }
983
12

            
984
12
                return NonBlockingReadResult::Eof;
985
79
            }
986
127
            while let Some(block) = self.loaded.front() {
987
113
                let read_length = read_callback(&block.contents[self.offset..]);
988
113
                if read_length > 0 {
989
65
                    self.offset += read_length;
990
65
                    return NonBlockingReadResult::ReadBytes(read_length);
991
48
                }
992
48

            
993
48
                self.loaded.pop_front();
994
48
                self.offset = 0;
995
48
                self.current_block += 1;
996
            }
997
        }
998
142
    }
999
}

            
enum NonBlockingBlockReadResult {
    NeedBlocks,
    ReadBlock(Vec<u8>),
    Eof,
}

            
enum NonBlockingReadResult {
    NeedBlocks,
    ReadBytes(usize),
    Eof,
}

            
impl<Database: Connection + Clone, Config: FileConfig> Read
    for Contents<Blocking<Database>, Config>
{
44
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
61
        loop {
64
            match self.non_blocking_read(|block| {
62
                let bytes_to_read = buf.len().min(block.len());
62
                buf[..bytes_to_read].copy_from_slice(&block[..bytes_to_read]);
62
                bytes_to_read
64
            }) {
33
                NonBlockingReadResult::ReadBytes(bytes) => return Ok(bytes),
11
                NonBlockingReadResult::Eof => return Ok(0),
17
                NonBlockingReadResult::NeedBlocks => self.load_blocks()?,
            }
        }
44
    }
}

            
impl<Database: Clone, Config: FileConfig> Seek for Contents<Database, Config> {
12
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
12
        let seek_to = match pos {
4
            SeekFrom::Start(offset) => offset,
4
            SeekFrom::End(from_end) => {
4
                if from_end < 0 {
2
                    self.len() - u64::try_from(from_end.saturating_abs()).unwrap()
                } else {
                    // Seek to the end
2
                    self.len()
                }
            }
4
            SeekFrom::Current(from_current) => {
4
                if self.blocks.is_empty() {
                    return Ok(0);
4
                }
4

            
4
                u64::try_from(
4
                    i64::try_from(
4
                        self.blocks[self.current_block].offset
4
                            + u64::try_from(self.offset).unwrap(),
4
                    )
4
                    .unwrap()
4
                        + from_current,
4
                )
4
                .unwrap()
            }
        };
12
        if let Some((index, block)) = self
12
            .blocks
12
            .iter()
12
            .enumerate()
24
            .find(|b| b.1.offset + u64::try_from(b.1.length).unwrap() > seek_to)
        {
10
            self.current_block = index;
10
            self.offset = usize::try_from(seek_to - block.offset).unwrap();
10
            Ok(seek_to)
2
        } else if let Some(last_block) = self.blocks.last() {
            // Set to the end of the file
2
            self.current_block = self.blocks.len() - 1;
2
            self.offset = last_block.length;
2
            Ok(last_block.offset + u64::try_from(last_block.length).unwrap())
        } else {
            // Empty
            self.current_block = 0;
            self.offset = 0;
            Ok(0)
        }
12
    }
}

            
#[cfg(feature = "async")]
impl<
        Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
        Config: FileConfig,
    > tokio::io::AsyncSeek for Contents<Async<Database>, Config>
{
1
    fn start_seek(mut self: std::pin::Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
1
        self.seek(position).map(|_| ())
1
    }

            
2
    fn poll_complete(
2
        self: std::pin::Pin<&mut Self>,
2
        _cx: &mut std::task::Context<'_>,
2
    ) -> Poll<std::io::Result<u64>> {
2
        if self.blocks.is_empty() {
            Poll::Ready(Ok(0))
2
        } else if self.current_block < self.blocks.len() {
2
            Poll::Ready(Ok(
2
                self.blocks[self.current_block].offset + u64::try_from(self.offset).unwrap()
2
            ))
        } else {
            Poll::Ready(Ok(self.len()))
        }
2
    }
}

            
#[cfg(feature = "async")]
impl<
        Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
        Config: FileConfig,
    > tokio::io::AsyncRead for Contents<Async<Database>, Config>
{
31
    fn poll_read(
31
        mut self: std::pin::Pin<&mut Self>,
31
        cx: &mut std::task::Context<'_>,
31
        buf: &mut tokio::io::ReadBuf<'_>,
31
    ) -> Poll<std::io::Result<()>> {
31
        self.spawn_block_fetching_task();
81
        loop {
81
            match self.non_blocking_read(|block| {
51
                let bytes_to_read = buf.remaining().min(block.len());
51
                buf.put_slice(&block[..bytes_to_read]);
51
                bytes_to_read
81
            }) {
48
                NonBlockingReadResult::NeedBlocks => match self.fetch_blocks(cx) {
32
                    Poll::Ready(Ok(true)) => continue,
16
                    Poll::Pending => return Poll::Pending,
                    Poll::Ready(Ok(false)) => return Poll::Ready(Ok(())),
                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
                },
32
                NonBlockingReadResult::ReadBytes(bytes) => {
32
                    if bytes == 0 || buf.remaining() == 0 {
14
                        return Poll::Ready(Ok(()));
18
                    }
                }
1
                NonBlockingReadResult::Eof => return Poll::Ready(Ok(())),
            }
        }
31
    }
}

            
impl<Database: Connection + Clone, Config: FileConfig> Iterator
    for Contents<Blocking<Database>, Config>
{
    type Item = std::io::Result<Vec<u8>>;

            
4
    fn next(&mut self) -> Option<Self::Item> {
6
        loop {
6
            match self.non_blocking_read_block() {
3
                NonBlockingBlockReadResult::ReadBlock(bytes) => return Some(Ok(bytes)),
1
                NonBlockingBlockReadResult::Eof => return None,
2
                NonBlockingBlockReadResult::NeedBlocks => match self.load_blocks() {
2
                    Ok(()) => {}
                    Err(err) => return Some(Err(err)),
                },
            }
        }
4
    }
}
#[cfg(feature = "async")]
impl<
        Database: bonsaidb_core::connection::AsyncConnection + Unpin + Clone + 'static,
        Config: FileConfig,
    > futures::Stream for Contents<Async<Database>, Config>
{
    type Item = std::io::Result<Vec<u8>>;

            
6
    fn poll_next(
6
        mut self: std::pin::Pin<&mut Self>,
6
        cx: &mut std::task::Context<'_>,
6
    ) -> Poll<Option<Self::Item>> {
6
        self.spawn_block_fetching_task();
10
        loop {
10
            match self.non_blocking_read_block() {
6
                NonBlockingBlockReadResult::NeedBlocks => match self.fetch_blocks(cx) {
4
                    Poll::Ready(Ok(true)) => continue,
2
                    Poll::Pending => return Poll::Pending,
                    Poll::Ready(Ok(false)) => return Poll::Ready(None),
                    Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
                },
3
                NonBlockingBlockReadResult::ReadBlock(block) => {
3
                    return Poll::Ready(Some(Ok(block)))
                }
1
                NonBlockingBlockReadResult::Eof => return Poll::Ready(None),
            }
        }
6
    }
}

            
10
#[derive(Clone)]
pub(crate) struct BlockInfo {
    pub offset: u64,
    pub length: usize,
    pub timestamp: TimestampAsNanoseconds,
    pub header: CollectionHeader<u64>,
}

            
/// A buffered [`std::io::Write`] and [`std::io::Seek`] implementor for a
/// [`File`].
pub struct BufferedAppend<'a, Config: FileConfig, Database: Connection + Clone> {
    file: &'a mut File<Blocking<Database>, Config>,
    pub(crate) buffer: Vec<u8>,
    _config: PhantomData<Config>,
}

            
impl<'a, Config: FileConfig, Database: Connection + Clone> BufferedAppend<'a, Config, Database> {
    /// Sets the size of the buffer. For optimal use, this should be a multiple
    /// of [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE).
    ///
    /// If any data is already buffered, it will be flushed before the buffer is
    /// resized.
1
    pub fn set_buffer_size(&mut self, capacity: usize) -> std::io::Result<()> {
1
        if self.buffer.capacity() > 0 {
            self.flush()?;
1
        }
1
        self.buffer = Vec::with_capacity(capacity);
1
        Ok(())
1
    }
}

            
impl<'a, Config: FileConfig, Database: Connection + Clone> Write
    for BufferedAppend<'a, Config, Database>
{
131
    fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
131
        if self.buffer.capacity() == 0 {
2
            const ONE_MEGABYTE: usize = 1024 * 1024;
2
            // By default, reserve the largest multiple of BLOCK_SIZE that is
2
            // less than or equal to 1 megabyte.
2
            self.buffer
2
                .reserve_exact(ONE_MEGABYTE / Config::BLOCK_SIZE * Config::BLOCK_SIZE);
130
        } else if self.buffer.capacity() == self.buffer.len() {
1
            self.flush()?;
128
        }

            
131
        if data.is_empty() {
            Ok(0)
        } else {
131
            let bytes_to_write = data.len().min(self.buffer.capacity() - self.buffer.len());
131
            self.buffer.extend(&data[..bytes_to_write]);
131
            Ok(bytes_to_write)
        }
131
    }

            
    fn flush(&mut self) -> std::io::Result<()> {
5
        self.file
5
            .append(&self.buffer)
5
            .map_err(|err| std::io::Error::new(ErrorKind::Other, err))?;
5
        self.buffer.clear();
5
        Ok(())
5
    }
}

            
impl<'a, Config: FileConfig, Database: Connection + Clone> Drop
    for BufferedAppend<'a, Config, Database>
{
3
    fn drop(&mut self) {
3
        drop(self.flush());
3
    }
}

            
/// A buffered [`tokio::io::AsyncWrite`] and [`std::io::Seek`] implementor for a
/// [`File`].
#[cfg(feature = "async")]
pub struct AsyncBufferedAppend<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static>
{
    file: &'a mut File<Async<Database>, Config>,
    pub(crate) buffer: Vec<u8>,
    flush_future: Option<BoxFuture<'a, Result<(), std::io::Error>>>,
    _config: PhantomData<Config>,
}

            
#[cfg(feature = "async")]
impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static>
    AsyncBufferedAppend<'a, Config, Database>
{
    /// Sets the size of the buffer. For optimal use, this should be a multiple
    /// of [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE).
    ///
    /// If any data is already buffered, it will be flushed before the buffer is
    /// resized.
1
    pub async fn set_buffer_size(&mut self, capacity: usize) -> std::io::Result<()> {
1
        if self.buffer.capacity() > 0 {
            self.flush().await?;
1
        }
1
        self.buffer = Vec::with_capacity(capacity);
1
        Ok(())
1
    }
}

            
#[cfg(feature = "async")]
impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static> tokio::io::AsyncWrite
    for AsyncBufferedAppend<'a, Config, Database>
{
131
    fn poll_write(
131
        mut self: std::pin::Pin<&mut Self>,
131
        cx: &mut std::task::Context<'_>,
131
        data: &[u8],
131
    ) -> Poll<Result<usize, std::io::Error>> {
131
        if self.buffer.capacity() == 0 {
1
            const ONE_MEGABYTE: usize = 1024 * 1024;
1
            // By default, reserve the largest multiple of BLOCK_SIZE that is
1
            // less than or equal to 1 megabyte.
1
            self.buffer
1
                .reserve_exact(ONE_MEGABYTE / Config::BLOCK_SIZE * Config::BLOCK_SIZE);
130
        }

            
131
        if self.flush_future.is_some() {
1
            if let Err(err) = ready!(std::pin::Pin::new(&mut self).poll_flush(cx)) {
                return Poll::Ready(Err(err));
1
            }
130
        } else if self.buffer.capacity() == self.buffer.len() {
1
            match ready!(std::pin::Pin::new(&mut self).poll_flush(cx)) {
                Ok(_) => {}
                Err(err) => {
                    return Poll::Ready(Err(err));
                }
            }
129
        }

            
130
        if data.is_empty() {
            Poll::Ready(Ok(0))
        } else {
130
            let bytes_to_write = data.len().min(self.buffer.capacity() - self.buffer.len());
130
            self.buffer.extend(&data[..bytes_to_write]);
130
            Poll::Ready(Ok(bytes_to_write))
        }
131
    }

            
    fn poll_flush(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), std::io::Error>> {
7
        if let Some(flush_future) = &mut self.flush_future {
3
            let result = ready!(flush_future.poll_unpin(cx));
3
            self.flush_future = None;
3
            Poll::Ready(result)
4
        } else if self.buffer.is_empty() {
1
            Poll::Ready(Ok(()))
        } else {
3
            let file = self.file.clone();
3

            
3
            let mut buffer = Vec::with_capacity(self.buffer.capacity());
3
            std::mem::swap(&mut buffer, &mut self.buffer);
3

            
3
            let mut flush_task = async move {
3
                file.append(&buffer)
3
                    .await
3
                    .map_err(|err| std::io::Error::new(ErrorKind::Other, err))
3
            }
3
            .boxed();
3
            let poll_result = flush_task.poll_unpin(cx);
3
            self.flush_future = Some(flush_task);
3
            poll_result
        }
7
    }

            
    fn poll_shutdown(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), std::io::Error>> {
        self.poll_flush(cx)
    }
}

            
#[cfg(feature = "async")]
impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static> Drop
    for AsyncBufferedAppend<'a, Config, Database>
{
3
    fn drop(&mut self) {
3
        if !self.buffer.is_empty() {
1
            assert!(
1
                self.flush_future.is_none(),
                "flush() was started but not completed before dropped"
            );
1
            let mut buffer = Vec::new();
1
            std::mem::swap(&mut buffer, &mut self.buffer);
1
            let mut file = self.file.clone();
1

            
1
            tokio::runtime::Handle::current().spawn(async move {
1
                drop(
1
                    AsyncBufferedAppend {
1
                        file: &mut file,
1
                        buffer,
1
                        flush_future: None,
1
                        _config: PhantomData,
1
                    }
1
                    .flush()
1
                    .await,
                );
1
            });
2
        }
3
    }
}