1
1
//! This example shows a creative use case for map/reduce views: generating
2
//! histograms.
3
//!
4
//! This example uses the [`hdrhistogram`] crate to create a histogram of
5
//! "samples" stored in the [`Samples`] type. The raw sample data is stored in a
6
//! collection with a timestamp (u64) and a `Vec<u64>` of samples.
7
//!
8
//! The [`AsHistogram`] view maps the sample data into a [`SyncHistogram`], and
9
//! this code provides an example on how to ipmlement a custom serializer using
10
//! the [`transmog::Format`] trait. This allows using `SyncHistogram`'s native
11
//! serialization to store the histogram into the view.
12
//!
13
//! All of this combined enables the ability to use the `reduce()` API to
14
//! retrieve server-reduced values in an efficient manner.
15

            
16
use std::ops::Deref;
17

            
18
use bonsaidb::{
19
    core::{
20
        connection::Connection,
21
        document::CollectionDocument,
22
        schema::{
23
            view::CollectionViewSchema, Collection, ReduceResult, SerializedView, View,
24
            ViewMappedValue,
25
        },
26
        transmog::{Format, OwnedDeserializer},
27
    },
28
    local::{
29
        config::{Builder, StorageConfiguration},
30
        Database,
31
    },
32
};
33
use hdrhistogram::{
34
    serialization::{Serializer, V2Serializer},
35
    Histogram, SyncHistogram,
36
};
37
use rand::{rngs::StdRng, Rng, SeedableRng};
38
use serde::{Deserialize, Serialize};
39

            
40
#[tokio::main]
41
1
async fn main() -> Result<(), bonsaidb::local::Error> {
42
1
    let db =
43
19
        Database::open::<Samples>(StorageConfiguration::new("view-histogram.bonsaidb")).await?;
44

            
45
1
    println!("inserting 100 new sets of samples");
46
1
    let mut rng = StdRng::from_entropy();
47
100
    for timestamp in 1..100 {
48
        // This inserts a new record, generating a random range that will trend
49
        // upwards as `timestamp` increases.
50
99
        db.collection::<Samples>()
51
99
            .push(&Samples {
52
99
                timestamp,
53
99
                entries: (0..100)
54
9900
                    .map(|_| rng.gen_range(50 + timestamp / 2..115 + timestamp))
55
99
                    .collect(),
56
99
            })
57
99
            .await?;
58
    }
59
1
    println!("done inserting new samples");
60

            
61
    // We can ask for a histogram of all the data:
62
2
    let total_histogram = db.view::<AsHistogram>().reduce().await?;
63
1
    println!(
64
1
        "99th Percentile overall: {} ({} samples)",
65
1
        total_histogram.value_at_quantile(0.99),
66
1
        total_histogram.len()
67
1
    );
68

            
69
    // Or we can request just a specific range:
70
1
    let range_histogram = db
71
1
        .view::<AsHistogram>()
72
1
        .with_key_range(10..20)
73
1
        .reduce()
74
1
        .await?;
75
1
    println!(
76
1
        "99th Percentile from 10..20: {} ({} samples)",
77
1
        range_histogram.value_at_quantile(0.99),
78
1
        range_histogram.len()
79
1
    );
80
1
    let range_histogram = db
81
1
        .view::<AsHistogram>()
82
1
        .with_key_range(80..100)
83
1
        .reduce()
84
        .await?;
85
1
    println!(
86
1
        "99th Percentile from 80..100: {} ({} samples)",
87
1
        range_histogram.value_at_quantile(0.99),
88
1
        range_histogram.len()
89
1
    );
90
1

            
91
1
    Ok(())
92
1
}
93

            
94
/// A set of samples that were taken at a specific time.
95
495
#[derive(Debug, Serialize, Deserialize, Collection)]
96
#[collection(name = "samples", views = [AsHistogram])]
97
pub struct Samples {
98
    /// The timestamp of the samples.
99
    pub timestamp: u64,
100
    /// The raw samples.
101
    pub entries: Vec<u64>,
102
}
103

            
104
/// A view for [`Samples`] which produces a histogram.
105
215
#[derive(Debug, Clone, View)]
106
#[view(collection = Samples, key = u64, value = SyncHistogram<u64>, name = "as-histogram", serialization = None)]
107
pub struct AsHistogram;
108

            
109
impl CollectionViewSchema for AsHistogram {
110
    type View = Self;
111

            
112
99
    fn map(
113
99
        &self,
114
99
        document: CollectionDocument<<Self::View as View>::Collection>,
115
99
    ) -> bonsaidb::core::schema::ViewMapResult<Self::View> {
116
99
        let mut histogram = Histogram::new(4).unwrap();
117
9999
        for sample in &document.contents.entries {
118
9900
            histogram.record(*sample).unwrap();
119
9900
        }
120

            
121
99
        Ok(document.emit_key_and_value(document.contents.timestamp, histogram.into_sync()))
122
99
    }
123

            
124
102
    fn reduce(
125
102
        &self,
126
102
        mappings: &[ViewMappedValue<Self::View>],
127
102
        _rereduce: bool,
128
102
    ) -> ReduceResult<Self::View> {
129
102
        let mut mappings = mappings.iter();
130
102
        let mut combined = SyncHistogram::from(
131
102
            mappings
132
102
                .next()
133
102
                .map(|h| h.value.deref().clone())
134
102
                .unwrap_or_else(|| Histogram::new(4).unwrap()),
135
102
        );
136
228
        for map in mappings {
137
126
            combined.add(map.value.deref()).unwrap();
138
126
        }
139
102
        Ok(combined)
140
102
    }
141
}
142

            
143
impl SerializedView for AsHistogram {
144
    type Format = Self;
145

            
146
432
    fn format() -> Self::Format {
147
432
        Self
148
432
    }
149
}
150

            
151
impl Format<'static, SyncHistogram<u64>> for AsHistogram {
152
    type Error = HistogramError;
153

            
154
    fn serialize_into<W: std::io::Write>(
155
        &self,
156
        value: &SyncHistogram<u64>,
157
        mut writer: W,
158
    ) -> Result<(), Self::Error> {
159
201
        V2Serializer::new()
160
201
            .serialize(value, &mut writer)
161
201
            .map_err(HistogramError::Serialization)?;
162
201
        Ok(())
163
201
    }
164
}
165

            
166
impl OwnedDeserializer<SyncHistogram<u64>> for AsHistogram {
167
231
    fn deserialize_from<R: std::io::Read>(
168
231
        &self,
169
231
        mut reader: R,
170
231
    ) -> Result<SyncHistogram<u64>, Self::Error> {
171
231
        hdrhistogram::serialization::Deserializer::new()
172
231
            .deserialize(&mut reader)
173
231
            .map(SyncHistogram::from)
174
231
            .map_err(HistogramError::Deserialization)
175
231
    }
176
}
177

            
178
#[derive(thiserror::Error, Debug)]
179
pub enum HistogramError {
180
    #[error("serialization error: {0}")]
181
    Serialization(#[from] hdrhistogram::serialization::V2SerializeError),
182
    #[error("deserialization error: {0}")]
183
    Deserialization(#[from] hdrhistogram::serialization::DeserializeError),
184
}
185

            
186
impl From<std::io::Error> for HistogramError {
187
    fn from(err: std::io::Error) -> Self {
188
        Self::Deserialization(hdrhistogram::serialization::DeserializeError::from(err))
189
    }
190
}
191

            
192
1
#[test]
193
1
fn runs() {
194
1
    main().unwrap()
195
1
}