1
use std::collections::BTreeMap;
2
use std::fmt::Debug;
3
use std::fs::File;
4
use std::io::Read;
5
use std::path::Path;
6
use std::{self, mem};
7

            
8
use bonsaidb::core::async_trait::async_trait;
9
use bonsaidb::core::connection::{AsyncConnection, AsyncLowLevelConnection};
10
use bonsaidb::core::document::{CollectionDocument, Emit};
11
use bonsaidb::core::keyvalue::Timestamp;
12
use bonsaidb::core::schema::{
13
    Collection, CollectionMapReduce, ReduceResult, Schema, SerializedCollection, SerializedView,
14
    View, ViewMapResult, ViewMappedValue, ViewSchema,
15
};
16
use bonsaidb::core::transaction::{Operation, Transaction};
17
use bonsaidb::local::config::{Builder, Compression, StorageConfiguration};
18
use bonsaidb::local::AsyncDatabase;
19
use clap::{Parser, Subcommand};
20
use futures::{Future, StreamExt};
21
use serde::de::DeserializeOwned;
22
use serde::{Deserialize, Serialize};
23
use time::{Date, Month};
24

            
25
#[derive(Debug, Schema)]
26
#[schema(name = "open-library", collections = [Author, Work, Edition, Rating, ReadingLog])]
27
struct OpenLibrary;
28

            
29
#[async_trait]
30
pub trait LibraryEntity: SerializedCollection<PrimaryKey = String, Contents = Self> {
31
    const ID_PREFIX: &'static str;
32
    fn full_id(id: &str) -> String {
33
        format!("/{}/{}", Self::ID_PREFIX, id)
34
    }
35

            
36
    async fn summarize(&self, database: &AsyncDatabase) -> anyhow::Result<()>;
37
}
38

            
39
#[derive(Debug, Serialize, Deserialize, Collection)]
40
#[collection(name = "authors", primary_key = String)]
41
struct Author {
42
    #[natural_id]
43
    pub key: String,
44
    pub name: Option<String>,
45
    #[serde(default)]
46
    pub alternate_names: Vec<String>,
47
    pub bio: Option<TypedValue>,
48
    pub birth_date: Option<String>,
49
    pub death_date: Option<String>,
50
    pub location: Option<String>,
51
    pub date: Option<String>,
52
    pub entity_type: Option<String>,
53
    pub fuller_name: Option<String>,
54
    pub personal_name: Option<String>,
55
    pub title: Option<String>,
56
    #[serde(default)]
57
    pub photos: Vec<Option<i64>>,
58
    #[serde(default)]
59
    pub links: Vec<Link>,
60
    pub created: Option<TypedValue>,
61
    pub last_modified: TypedValue,
62
}
63

            
64
#[async_trait]
65
impl LibraryEntity for Author {
66
    const ID_PREFIX: &'static str = "authors";
67

            
68
    async fn summarize(&self, database: &AsyncDatabase) -> anyhow::Result<()> {
69
        if let Some(name) = &self.name {
70
            println!("Name: {name}");
71
        }
72
        if let Some(bio) = &self.bio {
73
            println!("Biography:\n{}", bio.value())
74
        }
75
        let works = WorksByAuthor::entries_async(database)
76
            .with_key(&self.key)
77
            .query_with_collection_docs()
78
            .await?;
79
        if !works.is_empty() {
80
            println!("Works:");
81
            for work in works.documents.values() {
82
                if let Some(title) = &work.contents.title {
83
                    println!("{}: {}", work.contents.key, title)
84
                }
85
            }
86
        }
87

            
88
        Ok(())
89
    }
90
}
91

            
92
#[derive(Debug, Serialize, Deserialize, Collection)]
93
#[collection(name = "editions", primary_key = String)]
94
#[collection(views = [EditionsByWork])]
95
struct Edition {
96
    #[natural_id]
97
    pub key: String,
98
    pub title: Option<String>,
99
    pub subtitle: Option<String>,
100
    #[serde(default)]
101
    pub authors: Vec<Reference>,
102
    #[serde(default)]
103
    pub works: Vec<Reference>,
104
    #[serde(default)]
105
    pub identifiers: BTreeMap<String, Vec<String>>,
106
    #[serde(default)]
107
    pub isbn_10: Vec<String>,
108
    #[serde(default)]
109
    pub isbn_13: Vec<String>,
110
    #[serde(default)]
111
    pub lccn: Vec<String>,
112
    #[serde(default)]
113
    pub oclc_numbers: Vec<String>,
114
    #[serde(default)]
115
    pub covers: Vec<Option<i64>>,
116
    #[serde(default)]
117
    pub links: Vec<Link>,
118
    pub by_statement: Option<String>,
119
    pub weight: Option<String>,
120
    pub edition_name: Option<String>,
121
    pub number_of_pages: Option<i32>,
122
    pub pagination: Option<String>,
123
    pub physical_dimensions: Option<String>,
124
    pub physical_format: Option<String>,
125
    pub publish_country: Option<String>,
126
    pub publish_date: Option<String>,
127
    #[serde(default)]
128
    pub publish_places: Vec<String>,
129
    #[serde(default)]
130
    pub publishers: Vec<String>,
131
    #[serde(default)]
132
    pub contributions: Vec<String>,
133
    #[serde(default)]
134
    pub dewey_decimal_class: Vec<String>,
135
    #[serde(default)]
136
    pub genres: Vec<String>,
137
    #[serde(default)]
138
    pub lc_classifications: Vec<String>,
139
    #[serde(default)]
140
    pub other_titles: Vec<String>,
141
    #[serde(default)]
142
    pub series: Vec<String>,
143
    #[serde(default)]
144
    pub source_records: Vec<Option<String>>,
145
    #[serde(default)]
146
    pub subjects: Vec<String>,
147
    #[serde(default)]
148
    pub work_titles: Vec<String>,
149
    #[serde(default)]
150
    pub table_of_contents: Vec<serde_json::Value>,
151
    pub description: Option<TypedValue>,
152
    pub first_sentence: Option<TypedValue>,
153
    pub notes: Option<TypedValue>,
154
    pub created: Option<TypedValue>,
155
    pub last_modified: TypedValue,
156
}
157

            
158
#[derive(View, ViewSchema, Debug, Clone)]
159
#[view(name = "by-work", collection = Edition, key = String, value = u32)]
160
struct EditionsByWork;
161

            
162
impl CollectionMapReduce for EditionsByWork {
163
    fn map<'doc>(
164
        &self,
165
        document: CollectionDocument<<Self::View as View>::Collection>,
166
    ) -> ViewMapResult<'doc, Self::View> {
167
        document
168
            .contents
169
            .works
170
            .into_iter()
171
            .map(|work| {
172
                document
173
                    .header
174
                    .emit_key_and_value(work.into_key().replace("/b/", "/books/"), 1)
175
            })
176
            .collect()
177
    }
178

            
179
    fn reduce(
180
        &self,
181
        mappings: &[ViewMappedValue<'_, Self::View>],
182
        _rereduce: bool,
183
    ) -> ReduceResult<Self::View> {
184
        Ok(mappings.iter().map(|map| map.value).sum())
185
    }
186
}
187

            
188
#[async_trait]
189
impl LibraryEntity for Edition {
190
    const ID_PREFIX: &'static str = "books";
191

            
192
    async fn summarize(&self, database: &AsyncDatabase) -> anyhow::Result<()> {
193
        if let Some(title) = &self.title {
194
            println!("Title: {title}");
195
        }
196
        if let Some(subtitle) = &self.subtitle {
197
            println!("Subtitle: {subtitle}");
198
        }
199
        let works = Work::get_multiple_async(
200
            &self
201
                .works
202
                .iter()
203
                .map(|w| w.to_key().replace("/b/", "/books/"))
204
                .collect::<Vec<_>>(),
205
            database,
206
        )
207
        .await?;
208
        if !works.is_empty() {
209
            println!("Works:");
210
            for work in works {
211
                if let Some(title) = &work.contents.title {
212
                    println!("{}: {}", work.contents.key, title)
213
                }
214
            }
215
        }
216
        Ok(())
217
    }
218
}
219

            
220
#[derive(Debug, Serialize, Deserialize, Collection)]
221
#[collection(name = "works", primary_key = String)]
222
#[collection(views = [WorksByAuthor])]
223
struct Work {
224
    #[natural_id]
225
    pub key: String,
226
    pub title: Option<String>,
227
    pub subtitle: Option<String>,
228
    #[serde(default)]
229
    pub authors: Vec<AuthorRole>,
230
    #[serde(default)]
231
    pub covers: Vec<Option<i64>>,
232
    #[serde(default)]
233
    pub links: Vec<Link>,
234
    pub id: Option<i64>,
235
    #[serde(default)]
236
    pub lc_classifications: Vec<String>,
237
    #[serde(default)]
238
    pub subjects: Vec<String>,
239
    pub first_publish_date: Option<String>,
240
    pub description: Option<TypedValue>,
241
    pub notes: Option<TypedValue>,
242
    pub created: Option<TypedValue>,
243
    pub last_modified: TypedValue,
244
}
245

            
246
#[derive(View, ViewSchema, Debug, Clone)]
247
#[view(name = "by-author", collection = Work, key = String, value = u32)]
248
#[view_schema(version = 1)]
249
struct WorksByAuthor;
250

            
251
impl CollectionMapReduce for WorksByAuthor {
252
    fn map<'doc>(
253
        &self,
254
        document: CollectionDocument<<Self::View as View>::Collection>,
255
    ) -> ViewMapResult<'doc, Self::View> {
256
        document
257
            .contents
258
            .authors
259
            .into_iter()
260
            .filter_map(|role| role.author)
261
            .map(|author| {
262
                document
263
                    .header
264
                    .emit_key_and_value(author.into_key().replace("/a/", "/authors/"), 1)
265
            })
266
            .collect()
267
    }
268

            
269
    fn reduce(
270
        &self,
271
        mappings: &[ViewMappedValue<Self::View>],
272
        _rereduce: bool,
273
    ) -> ReduceResult<Self::View> {
274
        Ok(mappings.iter().map(|map| map.value).sum())
275
    }
276
}
277

            
278
#[async_trait]
279
impl LibraryEntity for Work {
280
    const ID_PREFIX: &'static str = "works";
281

            
282
    async fn summarize(&self, database: &AsyncDatabase) -> anyhow::Result<()> {
283
        if let Some(title) = &self.title {
284
            println!("Title: {title}");
285
        }
286
        if let Some(subtitle) = &self.subtitle {
287
            println!("Subtitle: {subtitle}");
288
        }
289
        let editions = EditionsByWork::entries_async(database)
290
            .with_key(&self.key)
291
            .query_with_collection_docs()
292
            .await?;
293
        if !editions.is_empty() {
294
            println!("Editions:");
295
            for edition in editions.documents.values() {
296
                if let Some(title) = &edition.contents.title {
297
                    println!("{}: {}", edition.contents.key, title)
298
                }
299
            }
300
        }
301
        Ok(())
302
    }
303
}
304

            
305
#[derive(Serialize, Deserialize, Debug)]
306
#[serde(untagged)]
307
enum TypedValue {
308
    TypeValue { r#type: String, value: String },
309
    Value(String),
310
}
311

            
312
impl TypedValue {
313
    fn value(&self) -> &str {
314
        match self {
315
            TypedValue::TypeValue { value, .. } | TypedValue::Value(value) => value,
316
        }
317
    }
318
}
319

            
320
#[derive(Serialize, Deserialize, Debug)]
321
struct AuthorRole {
322
    pub role: Option<String>,
323
    pub r#as: Option<String>,
324
    pub author: Option<ExternalKey>,
325
}
326

            
327
#[derive(Serialize, Deserialize, Debug)]
328
#[serde(untagged)]
329
enum Reference {
330
    Typed(TypedReference),
331
    Key(String),
332
}
333

            
334
impl Reference {
335
    pub fn to_key(&self) -> &str {
336
        match self {
337
            Reference::Typed(TypedReference { key, .. }) => key.to_key(),
338
            Reference::Key(key) => key,
339
        }
340
    }
341

            
342
    pub fn into_key(self) -> String {
343
        match self {
344
            Reference::Typed(typed) => typed.key.into_key(),
345
            Reference::Key(key) => key,
346
        }
347
    }
348
}
349

            
350
#[derive(Serialize, Deserialize, Debug)]
351
struct TypedReference {
352
    pub r#type: Option<String>,
353
    pub key: ExternalKey,
354
}
355

            
356
#[derive(Serialize, Deserialize, Debug)]
357
#[serde(untagged)]
358
enum ExternalKey {
359
    Tagged { key: String },
360
    Untagged(String),
361
}
362

            
363
impl ExternalKey {
364
    pub fn to_key(&self) -> &str {
365
        match self {
366
            ExternalKey::Tagged { key } => key,
367
            ExternalKey::Untagged(key) => key,
368
        }
369
    }
370

            
371
    fn into_key(self) -> String {
372
        match self {
373
            ExternalKey::Tagged { key } => key,
374
            ExternalKey::Untagged(key) => key,
375
        }
376
    }
377
}
378

            
379
#[derive(Serialize, Deserialize, Debug)]
380
struct Link {
381
    pub title: Option<String>,
382
    pub url: String,
383
}
384

            
385
#[derive(Debug, Serialize, Deserialize, Collection)]
386
#[collection(name = "ratings")]
387
struct Rating {
388
    pub work_key: String,
389
    pub edition_key: String,
390
    pub date: Date,
391
    pub rating: u8,
392
}
393

            
394
impl TryFrom<Vec<String>> for Rating {
395
    type Error = anyhow::Error;
396

            
397
    fn try_from(fields: Vec<String>) -> Result<Self, Self::Error> {
398
        if fields.len() != 4 {
399
            anyhow::bail!("expected 4 fields, got {:?}", fields);
400
        }
401

            
402
        let mut fields = fields.into_iter();
403
        let work_key = fields.next().unwrap();
404
        let edition_key = fields.next().unwrap();
405
        let rating = fields.next().unwrap();
406
        let rating = rating.parse::<u8>()?;
407
        let date = fields.next().unwrap();
408
        let mut date_parts = date.split('-');
409
        let year = date_parts.next().unwrap().to_owned();
410
        let year = year.parse::<i32>()?;
411
        let month = date_parts.next().unwrap().to_owned();
412
        let month = month.parse::<u8>()?;
413
        let month = Month::try_from(month)?;
414
        let day = date_parts.next().unwrap().to_owned();
415
        let day = day.parse::<u8>()?;
416
        let date = Date::from_calendar_date(year, month, day)?;
417

            
418
        Ok(Self {
419
            work_key,
420
            edition_key,
421
            date,
422
            rating,
423
        })
424
    }
425
}
426

            
427
#[derive(Debug, Serialize, Deserialize, Collection)]
428
#[collection(name = "reading-logs")]
429
struct ReadingLog {
430
    pub work_key: String,
431
    pub edition_key: String,
432
    pub date: Date,
433
    pub shelf: String,
434
}
435

            
436
fn parse_tsv(
437
    path: impl AsRef<Path> + Send + Sync,
438
    output: flume::Sender<Vec<String>>,
439
) -> anyhow::Result<()> {
440
    let mut file = File::open(path)?;
441
    let mut buffer = vec![0; 16192];
442
    // let mut file = gzip::Decoder::new(file)?;
443
    let mut current_record = vec![Vec::new()];
444
    loop {
445
        let bytes_read = file.read(&mut buffer)?;
446
        if bytes_read == 0 {
447
            // TODO handle dropping the last record?
448
            break;
449
        }
450
        for &ch in &buffer[..bytes_read] {
451
            match ch {
452
                b'\t' => {
453
                    // Next field
454
                    current_record.push(Vec::new());
455
                }
456
                b'\r' => {}
457
                b'\n' => {
458
                    // Swap an empty record into the current_record, and send
459
                    // the record to the output channel.
460
                    let mut record = vec![Vec::new()];
461
                    mem::swap(&mut record, &mut current_record);
462
                    // Each field should be UTF-8
463
                    let record = record
464
                        .into_iter()
465
                        .map(String::from_utf8)
466
                        .collect::<Result<Vec<String>, _>>()?;
467
                    output.send(record)?;
468
                }
469
                other => {
470
                    current_record.last_mut().unwrap().push(other);
471
                }
472
            }
473
        }
474
    }
475

            
476
    Ok(())
477
}
478

            
479
async fn import_ratings(database: &AsyncDatabase) -> anyhow::Result<()> {
480
    import_from_tsv(
481
        "./examples/open-library/ol_dump_ratings.txt",
482
        database,
483
        |records, database| async move {
484
            let mut tx = Transaction::new();
485
            for record in records {
486
                tx.push(Operation::push_serialized::<Rating>(&Rating::try_from(
487
                    record,
488
                )?)?);
489
            }
490
            let inserted = tx.operations.len();
491
            database.apply_transaction(tx).await?;
492
            Ok(inserted)
493
        },
494
    )
495
    .await
496
}
497

            
498
async fn overwrite_serialized<C: SerializedCollection>(
499
    tx: &mut Transaction,
500
    json: &str,
501
) -> anyhow::Result<()>
502
where
503
    C::Contents: DeserializeOwned,
504
{
505
    match serde_json::from_str::<C::Contents>(json) {
506
        Ok(contents) => {
507
            tx.push(Operation::overwrite_serialized::<C, C::PrimaryKey>(
508
                &C::natural_id(&contents).unwrap(),
509
                &contents,
510
            )?);
511
            Ok(())
512
        }
513
        Err(err) => {
514
            anyhow::bail!("Error parsing json {}: {}", err, json);
515
        }
516
    }
517
}
518

            
519
async fn import_primary_data(database: &AsyncDatabase) -> anyhow::Result<()> {
520
    import_from_tsv(
521
        "./examples/open-library/ol_dump_all.txt",
522
        database,
523
        |records, database| async move {
524
            let mut tx = Transaction::new();
525
            for record in &records {
526
                match record[0].as_str() {
527
                    "/type/author" => overwrite_serialized::<Author>(&mut tx, &record[4]).await?,
528
                    "/type/edition" => overwrite_serialized::<Edition>(&mut tx, &record[4]).await?,
529
                    "/type/work" => overwrite_serialized::<Work>(&mut tx, &record[4]).await?,
530
                    _ => {}
531
                }
532
            }
533
            let inserted = tx.operations.len();
534
            database.apply_transaction(tx).await?;
535
            Ok(inserted)
536
        },
537
    )
538
    .await
539
}
540

            
541
async fn import_from_tsv<
542
    Callback: Fn(Vec<Vec<String>>, AsyncDatabase) -> Fut + 'static,
543
    Fut: Future<Output = anyhow::Result<usize>>,
544
>(
545
    path: &'static str,
546
    database: &AsyncDatabase,
547
    callback: Callback,
548
) -> anyhow::Result<()> {
549
    const CHUNK_SIZE: usize = 500_000;
550
    let (sender, receiver) = flume::bounded(CHUNK_SIZE * 2);
551
    std::thread::spawn(move || parse_tsv(path, sender));
552

            
553
    let mut inserted = 0;
554
    let mut record_stream = receiver.into_stream().chunks(CHUNK_SIZE);
555
    while let Some(records) = record_stream.next().await {
556
        inserted += callback(records, database.clone()).await?;
557
        println!("Imported records: {inserted}");
558
    }
559

            
560
    Ok(())
561
}
562

            
563
#[derive(Debug, Parser)]
564
struct Cli {
565
    #[clap(long, short('z'))]
566
    lz4: bool,
567
    #[clap(subcommand)]
568
    command: Command,
569
}
570

            
571
#[derive(Debug, Subcommand)]
572
enum Command {
573
    Import,
574
    Compact,
575
    Count,
576
    Author { id: String },
577
    Edition { id: String },
578
    Work { id: String },
579
}
580

            
581
async fn get_entity<S>(id: &str, database: &AsyncDatabase) -> anyhow::Result<()>
582
where
583
    S: LibraryEntity<Contents = S> + Debug,
584
{
585
    match S::get_async(&S::full_id(id), database).await? {
586
        Some(doc) => doc.contents.summarize(database).await,
587
        None => {
588
            anyhow::bail!("not found");
589
        }
590
    }
591
}
592

            
593
#[tokio::main]
594
async fn main() -> anyhow::Result<()> {
595
    let args = Cli::parse();
596
    let config = if args.lz4 {
597
        StorageConfiguration::new("open-library-lz4.bonsaidb").default_compression(Compression::Lz4)
598
    } else {
599
        StorageConfiguration::new("open-library.bonsaidb")
600
    };
601
    let db = AsyncDatabase::open::<OpenLibrary>(config).await?;
602
    match args.command {
603
        Command::Import => {
604
            let primary_import = import_primary_data(&db);
605
            let ratings_import = import_ratings(&db);
606
            tokio::try_join!(primary_import, ratings_import)?;
607
            Ok(())
608
        }
609
        Command::Compact => {
610
            println!("Beginning: {:?}", Timestamp::now());
611
            db.compact().await?;
612
            println!("Done: {:?}", Timestamp::now());
613

            
614
            Ok(())
615
        }
616
        Command::Count => {
617
            println!("Total authors: {}", Author::all_async(&db).count().await?);
618
            println!("Total works: {}", Work::all_async(&db).count().await?);
619
            println!("Total editions: {}", Edition::all_async(&db).count().await?);
620
            println!("Total ratings: {}", Rating::all_async(&db).count().await?);
621

            
622
            Ok(())
623
        }
624
        Command::Work { id } => get_entity::<Work>(&id, &db).await,
625
        Command::Author { id } => get_entity::<Author>(&id, &db).await,
626
        Command::Edition { id } => get_entity::<Edition>(&id, &db).await,
627
    }
628
}
629

            
630
#[test]
631
#[ignore]
632
fn runs() {
633
    main().unwrap()
634
}