1
1
//! This example shows a creative use case for map/reduce views: generating
2
//! histograms.
3
//!
4
//! This example uses the [`hdrhistogram`] crate to create a histogram of
5
//! "samples" stored in the [`Samples`] type. The raw sample data is stored in a
6
//! collection with a timestamp (u64) and a `Vec<u64>` of samples.
7
//!
8
//! The [`AsHistogram`] view maps the sample data into a [`SyncHistogram`], and
9
//! this code provides an example on how to ipmlement a custom serializer using
10
//! the [`transmog::Format`] trait. This allows using `SyncHistogram`'s native
11
//! serialization to store the histogram into the view.
12
//!
13
//! All of this combined enables the ability to use the `reduce()` API to
14
//! retrieve server-reduced values in an efficient manner.
15

            
16
use std::ops::Deref;
17

            
18
use bonsaidb::{
19
    core::{
20
        connection::AsyncConnection,
21
        document::{CollectionDocument, Emit},
22
        schema::{
23
            view::CollectionViewSchema, Collection, ReduceResult, SerializedCollection,
24
            SerializedView, View, ViewMappedValue,
25
        },
26
        transmog::{Format, OwnedDeserializer},
27
    },
28
    local::{
29
        config::{Builder, StorageConfiguration},
30
        AsyncDatabase,
31
    },
32
};
33
use hdrhistogram::{
34
    serialization::{Serializer, V2Serializer},
35
    Histogram, SyncHistogram,
36
};
37
use rand::{rngs::StdRng, Rng, SeedableRng};
38
use serde::{Deserialize, Serialize};
39

            
40
#[tokio::main]
41
1
async fn main() -> Result<(), bonsaidb::local::Error> {
42
1
    let db = AsyncDatabase::open::<Samples>(StorageConfiguration::new("view-histogram.bonsaidb"))
43
1
        .await?;
44

            
45
1
    println!("inserting 100 new sets of samples");
46
1
    let mut rng = StdRng::from_entropy();
47
100
    for timestamp in 1..100 {
48
        // This inserts a new record, generating a random range that will trend
49
        // upwards as `timestamp` increases.
50
99
        Samples {
51
99
            timestamp,
52
99
            entries: (0..100)
53
9900
                .map(|_| rng.gen_range(50 + timestamp / 2..115 + timestamp))
54
99
                .collect(),
55
99
        }
56
99
        .push_into_async(&db)
57
99
        .await?;
58
    }
59
1
    println!("done inserting new samples");
60

            
61
    // We can ask for a histogram of all the data:
62
1
    let total_histogram = db.view::<AsHistogram>().reduce().await?;
63
1
    println!(
64
1
        "99th Percentile overall: {} ({} samples)",
65
1
        total_histogram.value_at_quantile(0.99),
66
1
        total_histogram.len()
67
1
    );
68

            
69
    // Or we can request just a specific range:
70
1
    let range_histogram = db
71
1
        .view::<AsHistogram>()
72
1
        .with_key_range(10..20)
73
1
        .reduce()
74
1
        .await?;
75
1
    println!(
76
1
        "99th Percentile from 10..20: {} ({} samples)",
77
1
        range_histogram.value_at_quantile(0.99),
78
1
        range_histogram.len()
79
1
    );
80
1
    let range_histogram = db
81
1
        .view::<AsHistogram>()
82
1
        .with_key_range(80..100)
83
1
        .reduce()
84
1
        .await?;
85
1
    println!(
86
1
        "99th Percentile from 80..100: {} ({} samples)",
87
1
        range_histogram.value_at_quantile(0.99),
88
1
        range_histogram.len()
89
1
    );
90
1

            
91
1
    Ok(())
92
1
}
93

            
94
/// A set of samples that were taken at a specific time.
95
495
#[derive(Debug, Serialize, Deserialize, Collection)]
96
#[collection(name = "samples", views = [AsHistogram])]
97
pub struct Samples {
98
    /// The timestamp of the samples.
99
    pub timestamp: u64,
100
    /// The raw samples.
101
    pub entries: Vec<u64>,
102
}
103

            
104
/// A view for [`Samples`] which produces a histogram.
105
218
#[derive(Debug, Clone, View)]
106
#[view(collection = Samples, key = u64, value = SyncHistogram<u64>, name = "as-histogram", serialization = None)]
107
pub struct AsHistogram;
108

            
109
impl CollectionViewSchema for AsHistogram {
110
    type View = Self;
111

            
112
99
    fn map(
113
99
        &self,
114
99
        document: CollectionDocument<<Self::View as View>::Collection>,
115
99
    ) -> bonsaidb::core::schema::ViewMapResult<Self::View> {
116
99
        let mut histogram = Histogram::new(4).unwrap();
117
9999
        for sample in &document.contents.entries {
118
9900
            histogram.record(*sample).unwrap();
119
9900
        }
120

            
121
99
        document
122
99
            .header
123
99
            .emit_key_and_value(document.contents.timestamp, histogram.into_sync())
124
99
    }
125

            
126
102
    fn reduce(
127
102
        &self,
128
102
        mappings: &[ViewMappedValue<Self::View>],
129
102
        _rereduce: bool,
130
102
    ) -> ReduceResult<Self::View> {
131
102
        let mut mappings = mappings.iter();
132
102
        let mut combined = SyncHistogram::from(
133
102
            mappings
134
102
                .next()
135
102
                .map(|h| h.value.deref().clone())
136
102
                .unwrap_or_else(|| Histogram::new(4).unwrap()),
137
102
        );
138
228
        for map in mappings {
139
126
            combined.add(map.value.deref()).unwrap();
140
126
        }
141
102
        Ok(combined)
142
102
    }
143
}
144

            
145
impl SerializedView for AsHistogram {
146
    type Format = Self;
147

            
148
432
    fn format() -> Self::Format {
149
432
        Self
150
432
    }
151
}
152

            
153
impl Format<'static, SyncHistogram<u64>> for AsHistogram {
154
    type Error = HistogramError;
155

            
156
    fn serialize_into<W: std::io::Write>(
157
        &self,
158
        value: &SyncHistogram<u64>,
159
        mut writer: W,
160
    ) -> Result<(), Self::Error> {
161
201
        V2Serializer::new()
162
201
            .serialize(value, &mut writer)
163
201
            .map_err(HistogramError::Serialization)?;
164
201
        Ok(())
165
201
    }
166
}
167

            
168
impl OwnedDeserializer<SyncHistogram<u64>> for AsHistogram {
169
231
    fn deserialize_from<R: std::io::Read>(
170
231
        &self,
171
231
        mut reader: R,
172
231
    ) -> Result<SyncHistogram<u64>, Self::Error> {
173
231
        hdrhistogram::serialization::Deserializer::new()
174
231
            .deserialize(&mut reader)
175
231
            .map(SyncHistogram::from)
176
231
            .map_err(HistogramError::Deserialization)
177
231
    }
178
}
179

            
180
#[derive(thiserror::Error, Debug)]
181
pub enum HistogramError {
182
    #[error("serialization error: {0}")]
183
    Serialization(#[from] hdrhistogram::serialization::V2SerializeError),
184
    #[error("deserialization error: {0}")]
185
    Deserialization(#[from] hdrhistogram::serialization::DeserializeError),
186
}
187

            
188
impl From<std::io::Error> for HistogramError {
189
    fn from(err: std::io::Error) -> Self {
190
        Self::Deserialization(hdrhistogram::serialization::DeserializeError::from(err))
191
    }
192
}
193

            
194
1
#[test]
195
1
fn runs() {
196
1
    main().unwrap()
197
1
}