1
//! This example shows a creative use case for map/reduce views: generating
2
//! histograms.
3
//!
4
//! This example uses the [`hdrhistogram`] crate to create a histogram of
5
//! "samples" stored in the [`Samples`] type. The raw sample data is stored in a
6
//! collection with a timestamp (u64) and a `Vec<u64>` of samples.
7
//!
8
//! The [`AsHistogram`] view maps the sample data into a [`SyncHistogram`], and
9
//! this code provides an example on how to ipmlement a custom serializer using
10
//! the [`transmog::Format`] trait. This allows using `SyncHistogram`'s native
11
//! serialization to store the histogram into the view.
12
//!
13
//! All of this combined enables the ability to use the `reduce()` API to
14
//! retrieve server-reduced values in an efficient manner.
15

            
16
use std::ops::Deref;
17

            
18
use bonsaidb::core::document::{CollectionDocument, Emit};
19
use bonsaidb::core::schema::{
20
    Collection, CollectionMapReduce, ReduceResult, SerializedCollection, SerializedView, View,
21
    ViewMappedValue, ViewSchema,
22
};
23
use bonsaidb::core::transmog::{Format, OwnedDeserializer};
24
use bonsaidb::local::config::{Builder, StorageConfiguration};
25
use bonsaidb::local::Database;
26
use hdrhistogram::serialization::{Serializer, V2Serializer};
27
use hdrhistogram::{Histogram, SyncHistogram};
28
use rand::rngs::StdRng;
29
use rand::{Rng, SeedableRng};
30
use serde::{Deserialize, Serialize};
31

            
32
1
fn main() -> Result<(), bonsaidb::local::Error> {
33
1
    let db = Database::open::<Samples>(StorageConfiguration::new("view-histogram.bonsaidb"))?;
34

            
35
1
    println!("inserting 100 new sets of samples");
36
1
    let mut rng = StdRng::from_entropy();
37
100
    for timestamp in 1..100 {
38
        // This inserts a new record, generating a random range that will trend
39
        // upwards as `timestamp` increases.
40
99
        Samples {
41
99
            timestamp,
42
99
            entries: (0..100)
43
9900
                .map(|_| rng.gen_range(50 + timestamp / 2..115 + timestamp))
44
99
                .collect(),
45
99
        }
46
99
        .push_into(&db)?;
47
    }
48
1
    println!("done inserting new samples");
49

            
50
    // We can ask for a histogram of all the data:
51
1
    let total_histogram = AsHistogram::entries(&db).reduce()?;
52
1
    println!(
53
1
        "99th Percentile overall: {} ({} samples)",
54
1
        total_histogram.value_at_quantile(0.99),
55
1
        total_histogram.len()
56
1
    );
57

            
58
    // Or we can request just a specific range:
59
1
    let range_histogram = AsHistogram::entries(&db).with_key_range(10..20).reduce()?;
60
1
    println!(
61
1
        "99th Percentile from 10..20: {} ({} samples)",
62
1
        range_histogram.value_at_quantile(0.99),
63
1
        range_histogram.len()
64
1
    );
65
1
    let range_histogram = AsHistogram::entries(&db).with_key_range(80..100).reduce()?;
66
1
    println!(
67
1
        "99th Percentile from 80..100: {} ({} samples)",
68
1
        range_histogram.value_at_quantile(0.99),
69
1
        range_histogram.len()
70
1
    );
71
1

            
72
1
    Ok(())
73
1
}
74

            
75
/// A set of samples that were taken at a specific time.
76
495
#[derive(Debug, Serialize, Deserialize, Collection)]
77
#[collection(name = "samples", views = [AsHistogram])]
78
pub struct Samples {
79
    /// The timestamp of the samples.
80
    pub timestamp: u64,
81
    /// The raw samples.
82
    pub entries: Vec<u64>,
83
}
84

            
85
/// A view for [`Samples`] which produces a histogram.
86
218
#[derive(Debug, Clone, View, ViewSchema)]
87
#[view(collection = Samples, key = u64, value = SyncHistogram<u64>, name = "as-histogram", serialization = None)]
88
pub struct AsHistogram;
89

            
90
impl CollectionMapReduce for AsHistogram {
91
99
    fn map<'doc>(
92
99
        &self,
93
99
        document: CollectionDocument<<Self::View as View>::Collection>,
94
99
    ) -> bonsaidb::core::schema::ViewMapResult<'doc, Self::View> {
95
99
        let mut histogram = Histogram::new(4).unwrap();
96
9999
        for sample in &document.contents.entries {
97
9900
            histogram.record(*sample).unwrap();
98
9900
        }
99

            
100
99
        document
101
99
            .header
102
99
            .emit_key_and_value(document.contents.timestamp, histogram.into_sync())
103
99
    }
104

            
105
102
    fn reduce(
106
102
        &self,
107
102
        mappings: &[ViewMappedValue<'_, Self::View>],
108
102
        _rereduce: bool,
109
102
    ) -> ReduceResult<Self::View> {
110
102
        let mut mappings = mappings.iter();
111
102
        let mut combined = SyncHistogram::from(
112
102
            mappings
113
102
                .next()
114
102
                .map(|h| h.value.deref().clone())
115
102
                .unwrap_or_else(|| Histogram::new(4).unwrap()),
116
102
        );
117
228
        for map in mappings {
118
126
            combined.add(map.value.deref()).unwrap();
119
126
        }
120
102
        Ok(combined)
121
102
    }
122
}
123

            
124
impl SerializedView for AsHistogram {
125
    type Format = Self;
126

            
127
432
    fn format() -> Self::Format {
128
432
        Self
129
432
    }
130
}
131

            
132
impl Format<'static, SyncHistogram<u64>> for AsHistogram {
133
    type Error = HistogramError;
134

            
135
    fn serialize_into<W: std::io::Write>(
136
        &self,
137
        value: &SyncHistogram<u64>,
138
        mut writer: W,
139
    ) -> Result<(), Self::Error> {
140
201
        V2Serializer::new()
141
201
            .serialize(value, &mut writer)
142
201
            .map_err(HistogramError::Serialization)?;
143
201
        Ok(())
144
201
    }
145
}
146

            
147
impl OwnedDeserializer<SyncHistogram<u64>> for AsHistogram {
148
231
    fn deserialize_from<R: std::io::Read>(
149
231
        &self,
150
231
        mut reader: R,
151
231
    ) -> Result<SyncHistogram<u64>, Self::Error> {
152
231
        hdrhistogram::serialization::Deserializer::new()
153
231
            .deserialize(&mut reader)
154
231
            .map(SyncHistogram::from)
155
231
            .map_err(HistogramError::Deserialization)
156
231
    }
157
}
158

            
159
#[derive(thiserror::Error, Debug)]
160
pub enum HistogramError {
161
    #[error("serialization error: {0}")]
162
    Serialization(#[from] hdrhistogram::serialization::V2SerializeError),
163
    #[error("deserialization error: {0}")]
164
    Deserialization(#[from] hdrhistogram::serialization::DeserializeError),
165
}
166

            
167
impl From<std::io::Error> for HistogramError {
168
    fn from(err: std::io::Error) -> Self {
169
        Self::Deserialization(hdrhistogram::serialization::DeserializeError::from(err))
170
    }
171
}
172

            
173
1
#[test]
174
1
fn runs() {
175
1
    main().unwrap()
176
1
}