1
use std::time::{Duration, Instant};
2

            
3
use bonsaidb::client::url::Url;
4
use bonsaidb::client::{AsyncClient, AsyncRemoteDatabase};
5
use bonsaidb::core::actionable::Permissions;
6
use bonsaidb::core::admin::{Admin, PermissionGroup, ADMIN_DATABASE_NAME};
7
use bonsaidb::core::circulate::flume;
8
use bonsaidb::core::keyvalue::AsyncKeyValue;
9
use bonsaidb::core::permissions::bonsai::{BonsaiAction, ServerAction};
10
use bonsaidb::core::permissions::Statement;
11
use bonsaidb::core::schema::{InsertError, SerializedCollection};
12
use bonsaidb::core::test_util::{BasicSchema, HarnessTest, TestDirectory};
13
use bonsaidb::local::config::Builder;
14
use bonsaidb::server::fabruic::Certificate;
15
use bonsaidb::server::test_util::{initialize_basic_server, BASIC_SERVER_NAME};
16
use bonsaidb::server::{DefaultPermissions, Server, ServerConfiguration};
17
use bonsaidb_core::connection::{Authentication, AuthenticationMethod, SensitiveString};
18
use once_cell::sync::Lazy;
19
use rand::distributions::Alphanumeric;
20
use rand::Rng;
21
use tokio::sync::Mutex;
22

            
23
const INCOMPATIBLE_PROTOCOL_VERSION: &str = "otherprotocol";
24

            
25
108
async fn initialize_shared_server() -> Certificate {
26
108
    static CERTIFICATE: Lazy<Mutex<Option<Certificate>>> = Lazy::new(|| Mutex::new(None));
27
108
    drop(env_logger::try_init());
28
108
    let mut certificate = CERTIFICATE.lock().await;
29
108
    if certificate.is_none() {
30
1
        let (sender, receiver) = flume::bounded(1);
31
1
        std::thread::spawn(|| run_shared_server(sender));
32

            
33
1
        *certificate = Some(receiver.recv_async().await.unwrap());
34
1
        // Give the server time to start listening
35
1
        tokio::time::sleep(Duration::from_millis(1000)).await;
36
107
    }
37

            
38
108
    certificate.clone().unwrap()
39
108
}
40

            
41
1
fn run_shared_server(certificate_sender: flume::Sender<Certificate>) -> anyhow::Result<()> {
42
1
    let rt = tokio::runtime::Runtime::new()?;
43
1
    rt.block_on(async move {
44
1
        let directory = TestDirectory::new("shared-server");
45
15
        let server = initialize_basic_server(directory.as_ref()).await.unwrap();
46
1
        certificate_sender
47
1
            .send(
48
1
                server
49
1
                    .certificate_chain()
50
2
                    .await
51
1
                    .unwrap()
52
1
                    .into_end_entity_certificate(),
53
1
            )
54
1
            .unwrap();
55
1

            
56
1
        #[cfg(feature = "websockets")]
57
1
        {
58
1
            let task_server = server.clone();
59
1
            tokio::spawn(async move {
60
1
                task_server
61
1
                    .listen_for_websockets_on("localhost:6001", false)
62
72
                    .await
63
                    .unwrap();
64
1
            });
65
1
        }
66
1

            
67
67
        server.listen_on(6000).await.unwrap();
68
1
    });
69
1

            
70
1
    Ok(())
71
1
}
72

            
73
#[cfg(feature = "websockets")]
74
mod websockets {
75
    use bonsaidb_client::{BlockingClient, BlockingRemoteDatabase};
76
    use tokio::runtime::Runtime;
77

            
78
    use super::*;
79

            
80
    struct WebsocketTestHarness {
81
        client: AsyncClient,
82
        url: Url,
83
        db: AsyncRemoteDatabase,
84
    }
85

            
86
    impl WebsocketTestHarness {
87
35
        pub async fn new(test: HarnessTest) -> anyhow::Result<Self> {
88
35
            use bonsaidb_core::connection::AsyncStorageConnection;
89
35

            
90
35
            initialize_shared_server().await;
91
35
            let url = Url::parse("ws://localhost:6001")?;
92
35
            let client = AsyncClient::new(url.clone())?;
93

            
94
35
            let dbname = format!("websockets-{test}");
95
35
            client
96
35
                .create_database::<BasicSchema>(&dbname, false)
97
35
                .await?;
98
35
            let db = client.database::<BasicSchema>(&dbname).await?;
99

            
100
35
            Ok(Self { client, url, db })
101
35
        }
102

            
103
3
        pub const fn server_name() -> &'static str {
104
3
            "websocket"
105
3
        }
106

            
107
3
        pub fn server(&self) -> &AsyncClient {
108
3
            &self.client
109
3
        }
110

            
111
34
        pub async fn connect(&self) -> anyhow::Result<AsyncRemoteDatabase> {
112
34
            Ok(self.db.clone())
113
34
        }
114

            
115
        #[allow(dead_code)] // We will want this in the future but it's currently unused
116
        pub async fn connect_with_permissions(
117
            &self,
118
            permissions: Vec<Statement>,
119
            label: &str,
120
        ) -> anyhow::Result<AsyncRemoteDatabase> {
121
            let client = AsyncClient::new(self.url.clone())?;
122
            assume_permissions(client, label, self.db.name(), permissions).await
123
        }
124

            
125
30
        pub async fn shutdown(&self) -> anyhow::Result<()> {
126
30
            Ok(())
127
30
        }
128
    }
129

            
130
46
    bonsaidb_core::define_async_connection_test_suite!(WebsocketTestHarness);
131

            
132
10
    bonsaidb_core::define_async_pubsub_test_suite!(WebsocketTestHarness);
133
162
    bonsaidb_core::define_async_kv_test_suite!(WebsocketTestHarness);
134

            
135
    struct BlockingWebsocketTestHarness {
136
        client: BlockingClient,
137
        // url: Url,
138
        db: BlockingRemoteDatabase,
139
    }
140

            
141
    impl BlockingWebsocketTestHarness {
142
36
        pub fn new(test: HarnessTest) -> anyhow::Result<Self> {
143
            use bonsaidb_core::connection::StorageConnection;
144
36
            let runtime = Runtime::new()?;
145
36
            runtime.block_on(initialize_shared_server());
146
36
            let url = Url::parse("ws://localhost:6001")?;
147
36
            let client = BlockingClient::new(url)?;
148

            
149
36
            let dbname = format!("blocking-websockets-{test}");
150
36
            client.create_database::<BasicSchema>(&dbname, false)?;
151
36
            let db = client.database::<BasicSchema>(&dbname)?;
152

            
153
36
            Ok(Self { client, db })
154
36
        }
155

            
156
3
        pub const fn server_name() -> &'static str {
157
3
            "websocket-blocking"
158
3
        }
159

            
160
3
        pub fn server(&self) -> &BlockingClient {
161
3
            &self.client
162
3
        }
163

            
164
35
        pub fn connect(&self) -> anyhow::Result<BlockingRemoteDatabase> {
165
35
            Ok(self.db.clone())
166
35
        }
167

            
168
        // #[allow(dead_code)] // We will want this in the future but it's currently unused
169
        // pub  fn connect_with_permissions(
170
        //     &self,
171
        //     permissions: Vec<Statement>,
172
        //     label: &str,
173
        // ) -> anyhow::Result<RemoteDatabase> {
174
        //     let client = Client::new(self.url.clone())?;
175
        //     assume_permissions(client, label, self.db.name(), permissions)
176
        // }
177

            
178
31
        pub fn shutdown(&self) -> anyhow::Result<()> {
179
31
            Ok(())
180
31
        }
181
    }
182

            
183
1
    #[tokio::test]
184
1
    async fn incompatible_client_version() -> anyhow::Result<()> {
185
1
        let certificate = initialize_shared_server().await;
186
1

            
187
1
        let url = Url::parse("ws://localhost:6001")?;
188
1
        let client = AsyncClient::build(url.clone())
189
1
            .with_certificate(certificate.clone())
190
1
            .with_protocol_version(INCOMPATIBLE_PROTOCOL_VERSION)
191
1
            .build()?;
192
1

            
193
1
        check_incompatible_client(client).await
194
1
    }
195

            
196
24
    bonsaidb_core::define_blocking_connection_test_suite!(BlockingWebsocketTestHarness);
197

            
198
5
    bonsaidb_core::define_blocking_pubsub_test_suite!(BlockingWebsocketTestHarness);
199
149
    bonsaidb_core::define_blocking_kv_test_suite!(BlockingWebsocketTestHarness);
200
}
201

            
202
mod bonsai {
203
    use super::*;
204
    struct BonsaiTestHarness {
205
        client: AsyncClient,
206
        url: Url,
207
        certificate: Certificate,
208
        db: AsyncRemoteDatabase,
209
    }
210

            
211
    impl BonsaiTestHarness {
212
35
        pub async fn new(test: HarnessTest) -> anyhow::Result<Self> {
213
            use bonsaidb_core::connection::AsyncStorageConnection;
214
35
            let certificate = initialize_shared_server().await;
215

            
216
35
            let url = Url::parse(&format!(
217
35
                "bonsaidb://localhost:6000?server={BASIC_SERVER_NAME}"
218
35
            ))?;
219
35
            let client = AsyncClient::build(url.clone())
220
35
                .with_certificate(certificate.clone())
221
35
                .build()?;
222

            
223
35
            let dbname = format!("bonsai-{test}");
224
35
            client
225
35
                .create_database::<BasicSchema>(&dbname, false)
226
35
                .await?;
227
35
            let db = client.database::<BasicSchema>(&dbname).await?;
228

            
229
35
            Ok(Self {
230
35
                client,
231
35
                url,
232
35
                certificate,
233
35
                db,
234
35
            })
235
35
        }
236

            
237
3
        pub fn server_name() -> &'static str {
238
3
            "bonsai"
239
3
        }
240

            
241
3
        pub fn server(&self) -> &'_ AsyncClient {
242
3
            &self.client
243
3
        }
244

            
245
34
        pub async fn connect(&self) -> anyhow::Result<AsyncRemoteDatabase> {
246
34
            Ok(self.db.clone())
247
34
        }
248

            
249
        #[allow(dead_code)] // We will want this in the future but it's currently unused
250
        pub async fn connect_with_permissions(
251
            &self,
252
            statements: Vec<Statement>,
253
            label: &str,
254
        ) -> anyhow::Result<AsyncRemoteDatabase> {
255
            let client = AsyncClient::build(self.url.clone())
256
                .with_certificate(self.certificate.clone())
257
                .build()?;
258
            assume_permissions(client, label, self.db.name(), statements).await
259
        }
260

            
261
30
        pub async fn shutdown(&self) -> anyhow::Result<()> {
262
30
            Ok(())
263
30
        }
264
    }
265

            
266
1
    #[tokio::test]
267
1
    async fn incompatible_client_version() -> anyhow::Result<()> {
268
1
        let certificate = initialize_shared_server().await;
269
1

            
270
1
        let url = Url::parse(&format!(
271
1
            "bonsaidb://localhost:6000?server={BASIC_SERVER_NAME}",
272
1
        ))?;
273
1
        let client = AsyncClient::build(url.clone())
274
1
            .with_certificate(certificate.clone())
275
1
            .with_protocol_version(INCOMPATIBLE_PROTOCOL_VERSION)
276
1
            .build()?;
277
1

            
278
1
        check_incompatible_client(client).await
279
1
    }
280

            
281
46
    bonsaidb_core::define_async_connection_test_suite!(BonsaiTestHarness);
282
10
    bonsaidb_core::define_async_pubsub_test_suite!(BonsaiTestHarness);
283
162
    bonsaidb_core::define_async_kv_test_suite!(BonsaiTestHarness);
284
}
285

            
286
2
async fn check_incompatible_client(client: AsyncClient) -> anyhow::Result<()> {
287
2
    use bonsaidb_core::connection::AsyncStorageConnection;
288
2
    match client
289
2
        .database::<()>("a database")
290
        .await?
291
2
        .set_numeric_key("a", 1_u64)
292
2
        .await
293
    {
294
2
        Err(bonsaidb_core::Error::Other { error, .. }) => {
295
2
            assert!(
296
2
                error.contains("protocol version"),
297
                "unexpected error: {error:?}",
298
            );
299
        }
300
        other => unreachable!(
301
            "Unexpected result with invalid protocol version: {:?}",
302
            other
303
        ),
304
    }
305

            
306
2
    Ok(())
307
2
}
308

            
309
#[allow(dead_code)] // We will want this in the future but it's currently unused
310
async fn assume_permissions(
311
    connection: AsyncClient,
312
    label: &str,
313
    database_name: &str,
314
    statements: Vec<Statement>,
315
) -> anyhow::Result<AsyncRemoteDatabase> {
316
    use bonsaidb_core::connection::AsyncStorageConnection;
317
    let username = format!("{database_name}-{label}");
318
    let password = SensitiveString(
319
        rand::thread_rng()
320
            .sample_iter(&Alphanumeric)
321
            .take(8)
322
            .map(char::from)
323
            .collect(),
324
    );
325
    match connection.create_user(&username).await {
326
        Ok(user_id) => {
327
            connection
328
                .set_user_password(&username, password.clone())
329
                .await
330
                .unwrap();
331

            
332
            // Create an administrators permission group, or get its ID if it already existed.
333
            let admin = connection.database::<Admin>(ADMIN_DATABASE_NAME).await?;
334
            let administrator_group_id = match (PermissionGroup {
335
                name: String::from(label),
336
                statements,
337
            }
338
            .push_into_async(&admin)
339
            .await)
340
            {
341
                Ok(doc) => doc.header.id,
342
                Err(InsertError {
343
                    error:
344
                        bonsaidb_core::Error::UniqueKeyViolation {
345
                            existing_document, ..
346
                        },
347
                    ..
348
                }) => existing_document.id.deserialize()?,
349
                Err(other) => anyhow::bail!(other),
350
            };
351

            
352
            // Make our user a member of the administrators group.
353
            connection
354
                .add_permission_group_to_user(user_id, administrator_group_id)
355
                .await
356
                .unwrap();
357
        }
358
        Err(bonsaidb_core::Error::UniqueKeyViolation { .. }) => {}
359
        Err(other) => anyhow::bail!(other),
360
    };
361

            
362
    connection
363
        .authenticate(Authentication::password(username, password)?)
364
        .await
365
        .unwrap();
366

            
367
    Ok(connection.database::<BasicSchema>(database_name).await?)
368
}
369

            
370
1
#[tokio::test]
371
1
async fn authenticated_permissions_test() -> anyhow::Result<()> {
372
1
    use bonsaidb_core::connection::AsyncStorageConnection;
373
1
    let database_path = TestDirectory::new("authenticated-permissions");
374
1
    let server = Server::open(
375
1
        ServerConfiguration::new(&database_path)
376
1
            .default_permissions(Permissions::from(
377
1
                Statement::for_any()
378
1
                    .allowing(&BonsaiAction::Server(ServerAction::Connect))
379
1
                    .allowing(&BonsaiAction::Server(ServerAction::Authenticate(
380
1
                        AuthenticationMethod::PasswordHash,
381
1
                    ))),
382
1
            ))
383
1
            .authenticated_permissions(DefaultPermissions::AllowAll),
384
1
    )
385
3
    .await?;
386
10
    server.install_self_signed_certificate(false).await?;
387
1
    let certificate = server
388
1
        .certificate_chain()
389
2
        .await?
390
1
        .into_end_entity_certificate();
391
1

            
392
1
    server.create_user("ecton").await?;
393
1
    server
394
1
        .set_user_password("ecton", SensitiveString::from("hunter2"))
395
1
        .await?;
396
1
    tokio::spawn(async move {
397
5
        server.listen_on(6002).await?;
398
1
        Result::<(), anyhow::Error>::Ok(())
399
1
    });
400
1
    // Give the server time to listen
401
1
    tokio::time::sleep(Duration::from_millis(10)).await;
402
1

            
403
1
    let url = Url::parse("bonsaidb://localhost:6002")?;
404
1
    let client = AsyncClient::build(url)
405
1
        .with_certificate(certificate)
406
1
        .build()?;
407
1
    match client.create_user("otheruser").await {
408
1
        Err(bonsaidb_core::Error::PermissionDenied(_)) => {}
409
1
        other => unreachable!(
410
            "should not have permission to create another user before logging in: {other:?}"
411
        ),
412
1
    }
413
1

            
414
1
    let authenticated_client = client
415
1
        .authenticate(Authentication::password(
416
1
            "ecton",
417
1
            SensitiveString(String::from("hunter2")),
418
1
        )?)
419
1
        .await
420
1
        .unwrap();
421
1
    authenticated_client
422
1
        .create_user("otheruser")
423
1
        .await
424
1
        .expect("should be able to create user after logging in");
425
1

            
426
1
    Ok(())
427
1
}
428

            
429
1
#[tokio::test]
430
1
async fn client_disconnection() -> anyhow::Result<()> {
431
1
    use bonsaidb_core::connection::AsyncStorageConnection;
432
1
    let database_path = TestDirectory::new("client_disconnection");
433
1
    let server = Server::open(
434
1
        ServerConfiguration::new(&database_path).default_permissions(Permissions::allow_all()),
435
1
    )
436
3
    .await?;
437
10
    server.install_self_signed_certificate(false).await?;
438
1
    let certificate = server
439
1
        .certificate_chain()
440
2
        .await?
441
1
        .into_end_entity_certificate();
442
1

            
443
1
    tokio::spawn({
444
1
        let server = server.clone();
445
1
        async move {
446
5
            server.listen_on(6003).await?;
447
1
            Result::<(), anyhow::Error>::Ok(())
448
1
        }
449
1
    });
450
1
    // Give the server time to listen
451
1
    tokio::time::sleep(Duration::from_millis(10)).await;
452
1

            
453
1
    let url = Url::parse("bonsaidb://localhost:6003")?;
454
1
    let client = AsyncClient::build(url)
455
1
        .with_certificate(certificate)
456
1
        .build()?;
457
1
    // We need to call any API to ensure the client connects.
458
1
    let _result = client.create_user("otheruser").await;
459
1

            
460
1
    let connected_client = server.connected_clients();
461
1
    assert_eq!(connected_client.len(), 1);
462
1
    drop(client);
463
1

            
464
1
    // Wait for the connected client to observe the disconnection
465
1
    let start = Instant::now();
466
3
    while connected_client[0].connected() && start.elapsed() < Duration::from_secs(2) {
467
2
        tokio::task::yield_now().await;
468
1
    }
469
1
    assert!(!connected_client[0].connected());
470
1

            
471
1
    Ok(())
472
1
}