Skip to content

Commit 77164e0

Browse files
authored
Merge pull request #3 from rsachdeva/chatty3
initial refactoring using only task handles
2 parents 020f92e + bb6c5d7 commit 77164e0

File tree

4 files changed

+37
-49
lines changed

4 files changed

+37
-49
lines changed

chatty-tcp/src/bin/server.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use chatty_tcp::listen::room::serve;
66
use chatty_tcp::listen::state::RoomState;
77
use chatty_types::config::{setup_tracing, Component::Server};
88
use chatty_types::response::ChatResponse;
9-
use std::collections::{HashMap, HashSet};
9+
use std::collections::HashMap;
1010
use std::sync::Arc;
1111
use tokio::net::TcpListener;
1212
use tokio::sync::broadcast;
@@ -27,17 +27,12 @@ pub async fn main() -> Result<()> {
2727
span.in_scope(|| info!("listening on {}", listening_on));
2828

2929
// Set up room state for use
30-
let user_set = Mutex::new(HashSet::new());
3130
// bounded channel
3231
let (tx, _rx) = broadcast::channel::<ChatResponse>(100);
3332
// task handles
3433
let task_handles = Mutex::new(HashMap::new());
3534

36-
let room_state = Arc::new(RoomState {
37-
user_set,
38-
tx,
39-
task_handles,
40-
});
35+
let room_state = Arc::new(RoomState { tx, task_handles });
4136

4237
let mut connection_handles = Vec::new();
4338

chatty-tcp/src/listen/command.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,10 @@ pub async fn process_command(
4040
let command: ChatCommand = serde_json::from_str(&line)?;
4141
match command {
4242
ChatCommand::Join(username) => {
43-
let mut users = room_state.user_set.lock().await;
44-
let user_already_exist = users.contains(&username);
43+
let user_already_exist =
44+
room_state.task_handles.lock().await.contains_key(&username);
4545

4646
let chat_response = if !user_already_exist {
47-
users.insert(username.clone());
48-
info!("Users in room after addition: {:?}", users);
49-
info!("Client {} joined as {}", addr, username);
5047
let rx = room_state.tx.subscribe();
5148
let send_task_handle = tokio::spawn(send_from_broadcast_channel_task(
5249
writer.clone(),
@@ -58,6 +55,11 @@ pub async fn process_command(
5855
.lock()
5956
.await
6057
.insert(username.clone(), send_task_handle);
58+
info!(
59+
"Users in room after addition: {:?}",
60+
room_state.task_handles.lock().await.keys()
61+
);
62+
info!("Client {} joined as {}", addr, username);
6163
send_to_broadcast_channel(
6264
ChatResponse::Broadcast(ChatMemo {
6365
username: username.clone(),
@@ -96,10 +98,7 @@ pub async fn process_command(
9698
ChatCommand::Leave(username) => {
9799
remove_username(username.clone(), room_state.clone()).await;
98100
debug!("User {} has left", username);
99-
if let Some(handle) = room_state.task_handles.lock().await.remove(&username) {
100-
info!("Aborting background task for user: {}", username);
101-
handle.abort();
102-
}
101+
103102
debug!("User {} has left so sending broadcast message", username);
104103
send_to_broadcast_channel(
105104
ChatResponse::Broadcast(ChatMemo {
@@ -117,40 +116,45 @@ pub async fn process_command(
117116
}
118117

119118
pub async fn remove_username(username: String, room_state: Arc<RoomState>) {
120-
let mut users = room_state.user_set.lock().await;
121-
users.remove(&username);
119+
let mut lookup = room_state.task_handles.lock().await;
120+
if let Some(handle) = lookup.remove(&username) {
121+
info!("Aborting background task for user: {}", username);
122+
handle.abort();
123+
}
122124
info!("User {} removed from room", username);
123125
// list connected users
124-
let users: Vec<String> = users.iter().cloned().collect();
126+
let users: Vec<String> = lookup.keys().cloned().collect();
125127
info!("Users in room after removal: {:?}", users);
126128
}
127129

128130
#[cfg(test)]
129131
mod tests {
130132
use super::*;
131-
use std::collections::{HashMap, HashSet};
133+
use std::collections::HashMap;
132134
use tokio::sync::broadcast;
135+
use tokio::task::JoinHandle;
133136

134137
#[tokio::test]
135138
async fn test_remove_username() {
136-
let mut user_set = HashSet::new();
137-
user_set.insert("test_user".to_string());
138-
user_set.insert("other_user".to_string());
139+
let mut lookup_initial = HashMap::new();
140+
let dummy_task: JoinHandle<Result<(), RoomError>> = tokio::spawn(async { Ok(()) });
141+
lookup_initial.insert("test_user".to_string(), dummy_task);
142+
let dummy_task2: JoinHandle<Result<(), RoomError>> = tokio::spawn(async { Ok(()) });
143+
lookup_initial.insert("other_user".to_string(), dummy_task2);
139144

140145
let (tx, _) = broadcast::channel(100);
141146
let room_state = Arc::new(RoomState {
142-
user_set: Mutex::new(user_set),
143147
tx,
144-
task_handles: Mutex::new(HashMap::new()),
148+
task_handles: Mutex::new(lookup_initial),
145149
});
146150

147151
// Execute removal
148152
remove_username("test_user".to_string(), room_state.clone()).await;
149153

150154
// Verify user was removed
151-
let users = room_state.user_set.lock().await;
152-
assert!(!users.contains("test_user"));
153-
assert!(users.contains("other_user"));
154-
assert_eq!(users.len(), 1);
155+
let lookup = room_state.task_handles.lock().await;
156+
assert!(!lookup.contains_key("test_user"));
157+
assert!(lookup.contains_key("other_user"));
158+
assert_eq!(lookup.len(), 1);
155159
}
156160
}

chatty-tcp/src/listen/state.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
use crate::listen::command::RoomError;
22
use chatty_types::response::ChatResponse;
3-
use std::collections::{HashMap, HashSet};
3+
use std::collections::HashMap;
44
use tokio::sync::broadcast;
55
use tokio::sync::Mutex;
66
use tokio::task::JoinHandle;
77

88
type TaskHandleMap = Mutex<HashMap<String, JoinHandle<Result<(), RoomError>>>>;
99

1010
pub struct RoomState {
11-
pub user_set: Mutex<HashSet<String>>,
1211
pub tx: broadcast::Sender<ChatResponse>,
1312
pub task_handles: TaskHandleMap,
1413
}

chatty-tcp/tests/client_server_integration.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use chatty_tcp::listen::state::RoomState;
44
use chatty_types::config::setup_tracing;
55
use chatty_types::config::Component::Server;
66
use chatty_types::response::ChatResponse;
7-
use std::collections::{HashMap, HashSet};
7+
use std::collections::HashMap;
88
use std::sync::Arc;
99
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1010
use tokio::net::TcpStream;
@@ -25,15 +25,10 @@ fn init_tracing_for_tests() {
2525
async fn single_client() {
2626
init_tracing_for_tests();
2727
// Set up room state
28-
let user_set = Mutex::new(HashSet::new());
2928
let (tx, _rx) = broadcast::channel::<ChatResponse>(100);
3029
let task_handles = Mutex::new(HashMap::new());
3130

32-
let room_state = Arc::new(RoomState {
33-
user_set,
34-
tx,
35-
task_handles,
36-
});
31+
let room_state = Arc::new(RoomState { tx, task_handles });
3732

3833
// Start the server in a background task
3934
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
@@ -83,8 +78,8 @@ async fn single_client() {
8378

8479
// // Verify user is there
8580
let room_state_for_removal = room_state.clone();
86-
let users = room_state_for_removal.user_set.lock().await;
87-
assert!(users.contains("alone"));
81+
let lookup = room_state_for_removal.task_handles.lock().await;
82+
assert!(lookup.contains_key("alone"));
8883

8984
// leave command
9085
let leave_command = r#"{"Leave":"alone"}"#;
@@ -104,12 +99,7 @@ async fn multiple_clients() {
10499
let task_handles = Mutex::new(HashMap::new());
105100

106101
// Set up room state
107-
let user_set = Mutex::new(HashSet::new());
108-
let room_state = Arc::new(RoomState {
109-
user_set,
110-
tx,
111-
task_handles,
112-
});
102+
let room_state = Arc::new(RoomState { tx, task_handles });
113103
let state = room_state.clone();
114104

115105
// Start the server in a background task
@@ -194,9 +184,9 @@ async fn multiple_clients() {
194184
let expected_message2 = r#"{"Broadcast":{"username":"carl","content":"Left"}}"#;
195185
assert_eq!(broadcast_message, expected_message2);
196186

197-
let user_set = state.user_set.lock().await;
198-
assert_eq!(user_set.len(), 1);
199-
assert!(user_set.contains("david"));
187+
let lookup = state.task_handles.lock().await;
188+
assert_eq!(lookup.len(), 1);
189+
assert!(lookup.contains_key("david"));
200190

201191
// Clean up
202192
server_handle.abort();

0 commit comments

Comments
 (0)