1
//! Tests breaking a connection and detecting it on the Client.
2

            
3
use std::sync::Arc;
4
use std::time::Duration;
5

            
6
use bonsaidb::client::url::Url;
7
use bonsaidb::client::AsyncClient;
8
use bonsaidb::core::test_util::{Basic, TestDirectory};
9
use bonsaidb::local::config::Builder;
10
use bonsaidb::server::{DefaultPermissions, Server, ServerConfiguration};
11
use bonsaidb_core::connection::{AsyncStorageConnection, HasSession, SensitiveString};
12
use bonsaidb_core::schema::SerializedCollection;
13
use bonsaidb_server::BonsaiListenConfig;
14
use futures::Future;
15

            
16
1
#[tokio::test]
17
#[cfg(feature = "websockets")]
18
1
async fn websockets() -> anyhow::Result<()> {
19
1
    test_sessions(
20
1
        "sessions-ws.bonsaidb",
21
1
        "ws://localhost:12345",
22
2
        |server| async move {
23
2
            server
24
2
                .listen_for_websockets_on("0.0.0.0:12345", false)
25
3
                .await
26
1
                .unwrap();
27
2
        },
28
1
    )
29
23
    .await
30
1
}
31

            
32
1
#[tokio::test]
33
1
async fn quic() -> anyhow::Result<()> {
34
1
    test_sessions(
35
1
        "sessions-quic.bonsaidb",
36
1
        "bonsaidb://localhost:12346",
37
2
        |server| async move {
38
2
            server
39
2
                .listen_on(BonsaiListenConfig::from(12346).reuse_address(true))
40
11
                .await
41
1
                .unwrap();
42
2
        },
43
1
    )
44
23
    .await
45
1
}
46

            
47
2
async fn test_sessions<F, Fut>(dir_name: &str, connect_addr: &str, listen: F) -> anyhow::Result<()>
48
2
where
49
2
    F: Fn(Server) -> Fut + Send + Sync + 'static,
50
2
    Fut: Future<Output = ()> + Send + 'static,
51
2
{
52
2
    drop(env_logger::try_init());
53
2
    println!("here");
54
2
    let dir = TestDirectory::new(dir_name);
55
2
    let server = Server::open(
56
2
        ServerConfiguration::new(&dir)
57
2
            .default_permissions(DefaultPermissions::AllowAll)
58
2
            .with_schema::<Basic>()?,
59
    )
60
6
    .await?;
61
2
    println!("Installing cert");
62
20
    server.install_self_signed_certificate(false).await?;
63

            
64
2
    let user_id = server.create_user("ecton").await?;
65
2
    server
66
2
        .set_user_password(user_id, SensitiveString::from("hunter2"))
67
2
        .await?;
68

            
69
2
    let certificate = server
70
2
        .certificate_chain()
71
4
        .await?
72
2
        .into_end_entity_certificate();
73
2

            
74
2
    let reboot = Arc::new(tokio::sync::Notify::new());
75
2
    let rebooted = Arc::new(tokio::sync::Notify::new());
76
2

            
77
2
    tokio::spawn({
78
2
        let reboot = reboot.clone();
79
2
        let rebooted = rebooted.clone();
80
2
        async move {
81
2
            // Start listening
82
2
            println!("Listening");
83
2

            
84
2
            tokio::spawn(listen(server.clone()));
85
2

            
86
2
            // Wait for the client to signal that we can disconnect.
87
2
            reboot.notified().await;
88
2
            println!("Server Shutting down.");
89
2
            // Completely shut the server down and restart it.
90
2
            server.shutdown(None).await.unwrap();
91
2
            drop(server);
92
2
            println!("Server shut down.");
93
2
            // Give time for the endpoint to completley close.
94
2
            // TODO this is stupid
95
2
            tokio::time::sleep(Duration::from_millis(500)).await;
96
2
            let server = Server::open(
97
2
                ServerConfiguration::new(&dir)
98
2
                    .default_permissions(DefaultPermissions::AllowAll)
99
2
                    .with_schema::<Basic>()
100
2
                    .unwrap(),
101
2
            )
102
6
            .await
103
2
            .unwrap();
104
2

            
105
2
            println!("Server listening again.");
106
2
            rebooted.notify_one();
107
6
            listen(server).await;
108
2
        }
109
2
    });
110

            
111
2
    let client = AsyncClient::build(Url::parse(connect_addr)?)
112
2
        .with_certificate(certificate.clone())
113
2
        .build()?;
114

            
115
2
    println!("Authenticating.");
116
2
    let authenticated = client
117
2
        .authenticate_with_password("ecton", SensitiveString::from("hunter2"))
118
2
        .await?;
119
2
    println!("Creating db");
120
2
    let db = client.create_database::<Basic>("basic", true).await?;
121
    // Verify we have a session at this point.
122
2
    authenticated
123
2
        .session()
124
2
        .unwrap()
125
2
        .id
126
2
        .expect("session should be present");
127
2

            
128
2
    reboot.notify_one();
129
2
    rebooted.notified().await;
130
    // Give the listener a moment to become established.
131
2
    tokio::time::sleep(Duration::from_millis(500)).await;
132
2
    println!("Continuing client.");
133
2

            
134
2
    // Get a disconnection error
135
2
    assert!(Basic::get_async(&0, &db).await.is_err());
136
2
    println!("Reconnecting");
137
2
    // Reconnect
138
2
    assert!(Basic::get_async(&0, &db).await.unwrap().is_none());
139
2
    println!("Checking session");
140
2
    // Verify the client recognizes it was de-authenticated
141
2
    assert!(client.session().unwrap().id.is_none());
142

            
143
2
    println!("Done");
144
2
    Ok(())
145
2
}