1
1
use std::{self, collections::BTreeMap, fmt::Debug, fs::File, io::Read, mem, path::Path};
2

            
3
use bonsaidb::{
4
    core::{
5
        async_trait::async_trait,
6
        connection::{AsyncConnection, AsyncLowLevelConnection},
7
        document::{CollectionDocument, Emit},
8
        keyvalue::Timestamp,
9
        schema::{
10
            Collection, CollectionViewSchema, ReduceResult, Schema, SerializedCollection, View,
11
            ViewMapResult, ViewMappedValue,
12
        },
13
        transaction::{Operation, Transaction},
14
    },
15
    local::{
16
        config::{Builder, Compression, StorageConfiguration},
17
        AsyncDatabase,
18
    },
19
};
20
use clap::{Parser, Subcommand};
21
use futures::{Future, StreamExt};
22
use serde::{de::DeserializeOwned, Deserialize, Serialize};
23
use time::{Date, Month};
24

            
25
#[derive(Debug, Schema)]
26
#[schema(name = "open-library", collections = [Author, Work, Edition, Rating, ReadingLog])]
27
struct OpenLibrary;
28

            
29
#[async_trait]
30
pub trait LibraryEntity: SerializedCollection<PrimaryKey = String, Contents = Self> {
31
    const ID_PREFIX: &'static str;
32
    fn full_id(id: &str) -> String {
33
        format!("/{}/{}", Self::ID_PREFIX, id)
34
    }
35

            
36
    async fn summarize(&self, database: &AsyncDatabase) -> anyhow::Result<()>;
37
}
38

            
39
#[derive(Debug, Serialize, Deserialize, Collection)]
40
#[collection(name = "authors", primary_key = String, natural_id = |author: &Self| Some(author.key.clone()))]
41
struct Author {
42
    pub key: String,
43
    pub name: Option<String>,
44
    #[serde(default)]
45
    pub alternate_names: Vec<String>,
46
    pub bio: Option<TypedValue>,
47
    pub birth_date: Option<String>,
48
    pub death_date: Option<String>,
49
    pub location: Option<String>,
50
    pub date: Option<String>,
51
    pub entity_type: Option<String>,
52
    pub fuller_name: Option<String>,
53
    pub personal_name: Option<String>,
54
    pub title: Option<String>,
55
    #[serde(default)]
56
    pub photos: Vec<Option<i64>>,
57
    #[serde(default)]
58
    pub links: Vec<Link>,
59
    pub created: Option<TypedValue>,
60
    pub last_modified: TypedValue,
61
}
62

            
63
#[async_trait]
64
impl LibraryEntity for Author {
65
    const ID_PREFIX: &'static str = "authors";
66

            
67
    async fn summarize(&self, database: &AsyncDatabase) -> anyhow::Result<()> {
68
        if let Some(name) = &self.name {
69
            println!("Name: {}", name);
70
        }
71
        if let Some(bio) = &self.bio {
72
            println!("Biography:\n{}", bio.value())
73
        }
74
        let works = database
75
            .view::<WorksByAuthor>()
76
            .with_key(self.key.clone())
77
            .query_with_collection_docs()
78
            .await?;
79
        if !works.is_empty() {
80
            println!("Works:");
81
            for work in works.documents.values() {
82
                if let Some(title) = &work.contents.title {
83
                    println!("{}: {}", work.contents.key, title)
84
                }
85
            }
86
        }
87

            
88
        Ok(())
89
    }
90
}
91

            
92
#[derive(Debug, Serialize, Deserialize, Collection)]
93
#[collection(name = "editions", primary_key = String, natural_id = |edition: &Self| Some(edition.key.clone()))]
94
#[collection(views = [EditionsByWork])]
95
struct Edition {
96
    pub key: String,
97
    pub title: Option<String>,
98
    pub subtitle: Option<String>,
99
    #[serde(default)]
100
    pub authors: Vec<Reference>,
101
    #[serde(default)]
102
    pub works: Vec<Reference>,
103
    #[serde(default)]
104
    pub identifiers: BTreeMap<String, Vec<String>>,
105
    #[serde(default)]
106
    pub isbn_10: Vec<String>,
107
    #[serde(default)]
108
    pub isbn_13: Vec<String>,
109
    #[serde(default)]
110
    pub lccn: Vec<String>,
111
    #[serde(default)]
112
    pub oclc_numbers: Vec<String>,
113
    #[serde(default)]
114
    pub covers: Vec<Option<i64>>,
115
    #[serde(default)]
116
    pub links: Vec<Link>,
117
    pub by_statement: Option<String>,
118
    pub weight: Option<String>,
119
    pub edition_name: Option<String>,
120
    pub number_of_pages: Option<i32>,
121
    pub pagination: Option<String>,
122
    pub physical_dimensions: Option<String>,
123
    pub physical_format: Option<String>,
124
    pub publish_country: Option<String>,
125
    pub publish_date: Option<String>,
126
    #[serde(default)]
127
    pub publish_places: Vec<String>,
128
    #[serde(default)]
129
    pub publishers: Vec<String>,
130
    #[serde(default)]
131
    pub contributions: Vec<String>,
132
    #[serde(default)]
133
    pub dewey_decimal_class: Vec<String>,
134
    #[serde(default)]
135
    pub genres: Vec<String>,
136
    #[serde(default)]
137
    pub lc_classifications: Vec<String>,
138
    #[serde(default)]
139
    pub other_titles: Vec<String>,
140
    #[serde(default)]
141
    pub series: Vec<String>,
142
    #[serde(default)]
143
    pub source_records: Vec<Option<String>>,
144
    #[serde(default)]
145
    pub subjects: Vec<String>,
146
    #[serde(default)]
147
    pub work_titles: Vec<String>,
148
    #[serde(default)]
149
    pub table_of_contents: Vec<serde_json::Value>,
150
    pub description: Option<TypedValue>,
151
    pub first_sentence: Option<TypedValue>,
152
    pub notes: Option<TypedValue>,
153
    pub created: Option<TypedValue>,
154
    pub last_modified: TypedValue,
155
}
156

            
157
#[derive(View, Debug, Clone)]
158
#[view(name = "by-work", collection = Edition, key = String, value = u32)]
159
struct EditionsByWork;
160

            
161
impl CollectionViewSchema for EditionsByWork {
162
    type View = Self;
163

            
164
    fn map(
165
        &self,
166
        document: CollectionDocument<<Self::View as View>::Collection>,
167
    ) -> ViewMapResult<Self::View> {
168
        document
169
            .contents
170
            .works
171
            .into_iter()
172
            .map(|work| {
173
                document
174
                    .header
175
                    .emit_key_and_value(work.into_key().replace("/b/", "/books/"), 1)
176
            })
177
            .collect()
178
    }
179

            
180
    fn reduce(
181
        &self,
182
        mappings: &[ViewMappedValue<Self::View>],
183
        _rereduce: bool,
184
    ) -> ReduceResult<Self::View> {
185
        Ok(mappings.iter().map(|map| map.value).sum())
186
    }
187
}
188

            
189
#[async_trait]
190
impl LibraryEntity for Edition {
191
    const ID_PREFIX: &'static str = "books";
192

            
193
    async fn summarize(&self, database: &AsyncDatabase) -> anyhow::Result<()> {
194
        if let Some(title) = &self.title {
195
            println!("Title: {}", title);
196
        }
197
        if let Some(subtitle) = &self.subtitle {
198
            println!("Subtitle: {}", subtitle);
199
        }
200
        let works = Work::get_multiple_async(
201
            self.works
202
                .iter()
203
                .map(|w| w.to_key().replace("/b/", "/books/")),
204
            database,
205
        )
206
        .await?;
207
        if !works.is_empty() {
208
            println!("Works:");
209
            for work in works {
210
                if let Some(title) = &work.contents.title {
211
                    println!("{}: {}", work.contents.key, title)
212
                }
213
            }
214
        }
215
        Ok(())
216
    }
217
}
218

            
219
#[derive(Debug, Serialize, Deserialize, Collection)]
220
#[collection(name = "works", primary_key = String, natural_id = |work: &Self| Some(work.key.clone()))]
221
#[collection(views = [WorksByAuthor])]
222
struct Work {
223
    pub key: String,
224
    pub title: Option<String>,
225
    pub subtitle: Option<String>,
226
    #[serde(default)]
227
    pub authors: Vec<AuthorRole>,
228
    #[serde(default)]
229
    pub covers: Vec<Option<i64>>,
230
    #[serde(default)]
231
    pub links: Vec<Link>,
232
    pub id: Option<i64>,
233
    #[serde(default)]
234
    pub lc_classifications: Vec<String>,
235
    #[serde(default)]
236
    pub subjects: Vec<String>,
237
    pub first_publish_date: Option<String>,
238
    pub description: Option<TypedValue>,
239
    pub notes: Option<TypedValue>,
240
    pub created: Option<TypedValue>,
241
    pub last_modified: TypedValue,
242
}
243

            
244
#[derive(View, Debug, Clone)]
245
#[view(name = "by-author", collection = Work, key = String, value = u32)]
246
struct WorksByAuthor;
247

            
248
impl CollectionViewSchema for WorksByAuthor {
249
    type View = Self;
250

            
251
    fn version(&self) -> u64 {
252
        1
253
    }
254

            
255
    fn map(
256
        &self,
257
        document: CollectionDocument<<Self::View as View>::Collection>,
258
    ) -> ViewMapResult<Self::View> {
259
        document
260
            .contents
261
            .authors
262
            .into_iter()
263
            .filter_map(|role| role.author)
264
            .map(|author| {
265
                document
266
                    .header
267
                    .emit_key_and_value(author.into_key().replace("/a/", "/authors/"), 1)
268
            })
269
            .collect()
270
    }
271

            
272
    fn reduce(
273
        &self,
274
        mappings: &[ViewMappedValue<Self::View>],
275
        _rereduce: bool,
276
    ) -> ReduceResult<Self::View> {
277
        Ok(mappings.iter().map(|map| map.value).sum())
278
    }
279
}
280

            
281
#[async_trait]
282
impl LibraryEntity for Work {
283
    const ID_PREFIX: &'static str = "works";
284

            
285
    async fn summarize(&self, database: &AsyncDatabase) -> anyhow::Result<()> {
286
        if let Some(title) = &self.title {
287
            println!("Title: {}", title);
288
        }
289
        if let Some(subtitle) = &self.subtitle {
290
            println!("Subtitle: {}", subtitle);
291
        }
292
        let editions = database
293
            .view::<EditionsByWork>()
294
            .with_key(self.key.clone())
295
            .query_with_collection_docs()
296
            .await?;
297
        if !editions.is_empty() {
298
            println!("Editions:");
299
            for edition in editions.documents.values() {
300
                if let Some(title) = &edition.contents.title {
301
                    println!("{}: {}", edition.contents.key, title)
302
                }
303
            }
304
        }
305
        Ok(())
306
    }
307
}
308

            
309
#[derive(Serialize, Deserialize, Debug)]
310
#[serde(untagged)]
311
enum TypedValue {
312
    TypeValue { r#type: String, value: String },
313
    Value(String),
314
}
315

            
316
impl TypedValue {
317
    fn value(&self) -> &str {
318
        match self {
319
            TypedValue::TypeValue { value, .. } | TypedValue::Value(value) => value,
320
        }
321
    }
322
}
323

            
324
#[derive(Serialize, Deserialize, Debug)]
325
struct AuthorRole {
326
    pub role: Option<String>,
327
    pub r#as: Option<String>,
328
    pub author: Option<ExternalKey>,
329
}
330

            
331
#[derive(Serialize, Deserialize, Debug)]
332
#[serde(untagged)]
333
enum Reference {
334
    Typed(TypedReference),
335
    Key(String),
336
}
337

            
338
impl Reference {
339
    pub fn to_key(&self) -> &str {
340
        match self {
341
            Reference::Typed(TypedReference { key, .. }) => key.to_key(),
342
            Reference::Key(key) => key,
343
        }
344
    }
345

            
346
    pub fn into_key(self) -> String {
347
        match self {
348
            Reference::Typed(typed) => typed.key.into_key(),
349
            Reference::Key(key) => key,
350
        }
351
    }
352
}
353

            
354
#[derive(Serialize, Deserialize, Debug)]
355
struct TypedReference {
356
    pub r#type: Option<String>,
357
    pub key: ExternalKey,
358
}
359

            
360
#[derive(Serialize, Deserialize, Debug)]
361
#[serde(untagged)]
362
enum ExternalKey {
363
    Tagged { key: String },
364
    Untagged(String),
365
}
366

            
367
impl ExternalKey {
368
    pub fn to_key(&self) -> &str {
369
        match self {
370
            ExternalKey::Tagged { key } => key,
371
            ExternalKey::Untagged(key) => key,
372
        }
373
    }
374

            
375
    fn into_key(self) -> String {
376
        match self {
377
            ExternalKey::Tagged { key } => key,
378
            ExternalKey::Untagged(key) => key,
379
        }
380
    }
381
}
382

            
383
#[derive(Serialize, Deserialize, Debug)]
384
struct Link {
385
    pub title: Option<String>,
386
    pub url: String,
387
}
388

            
389
#[derive(Debug, Serialize, Deserialize, Collection)]
390
#[collection(name = "ratings")]
391
struct Rating {
392
    pub work_key: String,
393
    pub edition_key: String,
394
    pub date: Date,
395
    pub rating: u8,
396
}
397

            
398
impl TryFrom<Vec<String>> for Rating {
399
    type Error = anyhow::Error;
400

            
401
    fn try_from(fields: Vec<String>) -> Result<Self, Self::Error> {
402
        if fields.len() != 4 {
403
            anyhow::bail!("expected 4 fields, got {:?}", fields);
404
        }
405

            
406
        let mut fields = fields.into_iter();
407
        let work_key = fields.next().unwrap();
408
        let edition_key = fields.next().unwrap();
409
        let rating = fields.next().unwrap();
410
        let rating = rating.parse::<u8>()?;
411
        let date = fields.next().unwrap();
412
        let mut date_parts = date.split('-');
413
        let year = date_parts.next().unwrap().to_owned();
414
        let year = year.parse::<i32>()?;
415
        let month = date_parts.next().unwrap().to_owned();
416
        let month = month.parse::<u8>()?;
417
        let month = Month::try_from(month)?;
418
        let day = date_parts.next().unwrap().to_owned();
419
        let day = day.parse::<u8>()?;
420
        let date = Date::from_calendar_date(year, month, day)?;
421

            
422
        Ok(Self {
423
            work_key,
424
            edition_key,
425
            date,
426
            rating,
427
        })
428
    }
429
}
430

            
431
#[derive(Debug, Serialize, Deserialize, Collection)]
432
#[collection(name = "reading-logs")]
433
struct ReadingLog {
434
    pub work_key: String,
435
    pub edition_key: String,
436
    pub date: Date,
437
    pub shelf: String,
438
}
439

            
440
fn parse_tsv(
441
    path: impl AsRef<Path> + Send + Sync,
442
    output: flume::Sender<Vec<String>>,
443
) -> anyhow::Result<()> {
444
    let mut file = File::open(path)?;
445
    let mut buffer = vec![0; 16192];
446
    // let mut file = gzip::Decoder::new(file)?;
447
    let mut current_record = vec![Vec::new()];
448
    loop {
449
        let bytes_read = file.read(&mut buffer)?;
450
        if bytes_read == 0 {
451
            // TODO handle dropping the last record?
452
            break;
453
        }
454
        for &ch in &buffer[..bytes_read] {
455
            match ch {
456
                b'\t' => {
457
                    // Next field
458
                    current_record.push(Vec::new());
459
                }
460
                b'\r' => {}
461
                b'\n' => {
462
                    // Swap an empty record into the current_record, and send
463
                    // the record to the output channel.
464
                    let mut record = vec![Vec::new()];
465
                    mem::swap(&mut record, &mut current_record);
466
                    // Each field should be UTF-8
467
                    let record = record
468
                        .into_iter()
469
                        .map(String::from_utf8)
470
                        .collect::<Result<Vec<String>, _>>()?;
471
                    output.send(record)?;
472
                }
473
                other => {
474
                    current_record.last_mut().unwrap().push(other);
475
                }
476
            }
477
        }
478
    }
479

            
480
    Ok(())
481
}
482

            
483
async fn import_ratings(database: &AsyncDatabase) -> anyhow::Result<()> {
484
    import_from_tsv(
485
        "./examples/open-library/ol_dump_ratings.txt",
486
        database,
487
        |records, database| async move {
488
            let mut tx = Transaction::new();
489
            for record in records {
490
                tx.push(Operation::push_serialized::<Rating>(&Rating::try_from(
491
                    record,
492
                )?)?);
493
            }
494
            let inserted = tx.operations.len();
495
            database.apply_transaction(tx).await?;
496
            Ok(inserted)
497
        },
498
    )
499
    .await
500
}
501

            
502
async fn overwrite_serialized<C: SerializedCollection>(
503
    tx: &mut Transaction,
504
    json: &str,
505
) -> anyhow::Result<()>
506
where
507
    C::Contents: DeserializeOwned,
508
{
509
    match serde_json::from_str::<C::Contents>(json) {
510
        Ok(contents) => {
511
            tx.push(Operation::overwrite_serialized::<C>(
512
                C::natural_id(&contents).unwrap(),
513
                &contents,
514
            )?);
515
            Ok(())
516
        }
517
        Err(err) => {
518
            anyhow::bail!("Error parsing json {}: {}", err, json);
519
        }
520
    }
521
}
522

            
523
async fn import_primary_data(database: &AsyncDatabase) -> anyhow::Result<()> {
524
    import_from_tsv(
525
        "./examples/open-library/ol_dump_all.txt",
526
        database,
527
        |records, database| async move {
528
            let mut tx = Transaction::new();
529
            for record in &records {
530
                match record[0].as_str() {
531
                    "/type/author" => overwrite_serialized::<Author>(&mut tx, &record[4]).await?,
532
                    "/type/edition" => overwrite_serialized::<Edition>(&mut tx, &record[4]).await?,
533
                    "/type/work" => overwrite_serialized::<Work>(&mut tx, &record[4]).await?,
534
                    _ => {}
535
                }
536
            }
537
            let inserted = tx.operations.len();
538
            database.apply_transaction(tx).await?;
539
            Ok(inserted)
540
        },
541
    )
542
    .await
543
}
544

            
545
async fn import_from_tsv<
546
    Callback: Fn(Vec<Vec<String>>, AsyncDatabase) -> Fut + 'static,
547
    Fut: Future<Output = anyhow::Result<usize>>,
548
>(
549
    path: &'static str,
550
    database: &AsyncDatabase,
551
    callback: Callback,
552
) -> anyhow::Result<()> {
553
    const CHUNK_SIZE: usize = 500_000;
554
    let (sender, receiver) = flume::bounded(CHUNK_SIZE * 2);
555
    std::thread::spawn(move || parse_tsv(path, sender));
556

            
557
    let mut inserted = 0;
558
    let mut record_stream = receiver.into_stream().chunks(CHUNK_SIZE);
559
    while let Some(records) = record_stream.next().await {
560
        inserted += callback(records, database.clone()).await?;
561
        println!("Imported records: {}", inserted);
562
    }
563

            
564
    Ok(())
565
}
566

            
567
#[derive(Debug, Parser)]
568
struct Cli {
569
    #[clap(long, short('z'))]
570
    lz4: bool,
571
    #[clap(subcommand)]
572
    command: Command,
573
}
574

            
575
#[derive(Debug, Subcommand)]
576
enum Command {
577
    Import,
578
    Compact,
579
    Count,
580
    Author { id: String },
581
    Edition { id: String },
582
    Work { id: String },
583
}
584

            
585
async fn get_entity<S>(id: &str, database: &AsyncDatabase) -> anyhow::Result<()>
586
where
587
    S: LibraryEntity<Contents = S> + Debug,
588
{
589
    match S::get_async(S::full_id(id), database).await? {
590
        Some(doc) => doc.contents.summarize(database).await,
591
        None => {
592
            anyhow::bail!("not found");
593
        }
594
    }
595
}
596

            
597
#[tokio::main]
598
async fn main() -> anyhow::Result<()> {
599
    let args = Cli::parse();
600
    let config = if args.lz4 {
601
        StorageConfiguration::new("open-library-lz4.bonsaidb").default_compression(Compression::Lz4)
602
    } else {
603
        StorageConfiguration::new("open-library.bonsaidb")
604
    };
605
    let db = AsyncDatabase::open::<OpenLibrary>(config).await?;
606
    match args.command {
607
        Command::Import => {
608
            let primary_import = import_primary_data(&db);
609
            let ratings_import = import_ratings(&db);
610
            tokio::try_join!(primary_import, ratings_import)?;
611
            Ok(())
612
        }
613
        Command::Compact => {
614
            println!("Beginning: {:?}", Timestamp::now());
615
            db.compact().await?;
616
            println!("Done: {:?}", Timestamp::now());
617

            
618
            Ok(())
619
        }
620
        Command::Count => {
621
            println!("Total authors: {}", Author::all_async(&db).count().await?);
622
            println!("Total works: {}", Work::all_async(&db).count().await?);
623
            println!("Total editions: {}", Edition::all_async(&db).count().await?);
624
            println!("Total ratings: {}", Rating::all_async(&db).count().await?);
625

            
626
            Ok(())
627
        }
628
        Command::Work { id } => get_entity::<Work>(&id, &db).await,
629
        Command::Author { id } => get_entity::<Author>(&id, &db).await,
630
        Command::Edition { id } => get_entity::<Edition>(&id, &db).await,
631
    }
632
}
633

            
634
#[test]
635
#[ignore]
636
fn runs() {
637
    main().unwrap()
638
}