Skip to content

Commit

Permalink
day19 challenge task2 simple room based chat server
Browse files Browse the repository at this point in the history
  • Loading branch information
m4salah committed Dec 30, 2023
1 parent c905497 commit b479a9b
Showing 1 changed file with 148 additions and 81 deletions.
229 changes: 148 additions & 81 deletions src/handlers/day19.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#[allow(dead_code, unused_variables)]
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
fmt::Display,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
};

use axum::{
Expand All @@ -15,53 +18,103 @@ use axum::{
Router,
};
use futures::{stream::StreamExt, SinkExt};
use serde_json::json;
use tokio::sync::broadcast;

// Our shared state
struct AppState {
/// Keys are the name of the channel
views: Arc<AtomicUsize>,
rooms: Arc<RwLock<HashMap<usize, RoomState>>>,
}

#[derive(Clone)]
struct RoomState {
rooms: Mutex<HashMap<u64, Room>>,
/// Previously stored in AppState
user_set: HashSet<String>,
/// Previously created in main.
tx: broadcast::Sender<Tweet>,
}

impl RoomState {
fn new() -> Self {
Self {
// Track usernames per room
user_set: HashSet::new(),
// Create a new channel for every room
tx: broadcast::channel(1000).0,
}
}
fn insert_user(&mut self, username: String) {
self.user_set.insert(username);
}
}

#[allow(dead_code)]
struct Room {
id: u64,
users: Mutex<HashSet<String>>,
tx: broadcast::Sender<String>,
#[derive(Clone, Debug)]
struct Tweet {
message: TweetInput,
user: String,
}

#[allow(dead_code)]
struct RoomMessage {
total_view: u64,
username: String,
message: String,
impl Display for Tweet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(
json!({"user": self.user, "message": self.message.message})
.to_string()
.as_str(),
)
}
}

impl Tweet {
fn new(user: String, message: TweetInput) -> Self {
Self { message, user }
}
}

impl AppState {
fn new() -> Self {
Self {
views: Arc::new(AtomicUsize::new(0)),
rooms: Arc::new(RwLock::new(HashMap::new())),
}
}
fn inc_views(&self) {
self.views.fetch_add(1, Ordering::Relaxed);
}

fn get_views(&self) -> usize {
self.views.load(Ordering::Relaxed)
}

fn reset_views(&self) {
self.views.store(0, Ordering::Relaxed);
}
}

pub fn router() -> Router {
let room_messages_state = Arc::new(RoomState {
rooms: Mutex::new(HashMap::new()),
});
let app_state = Arc::new(AppState::new());
Router::new()
.route("/19/health", get(|| async { StatusCode::OK }))
.route("/19/ws/ping", get(ping_ws))
.route("/19/reset", post(ping_ws))
.route("/19/views", get(ping_ws))
.route("/19/reset", post(reset_views))
.route("/19/views", get(view_count))
.route("/19/ws/room/:room_id/user/:username", get(connect_to_room))
.with_state(room_messages_state)
.with_state(app_state)
}

async fn ping_ws(ws: WebSocketUpgrade) -> impl IntoResponse {
tracing::info!("client connected");
ws.on_upgrade(handle_socket)
ws.on_upgrade(handle_ping_socket)
}

async fn handle_socket(mut socket: WebSocket) {
async fn handle_ping_socket(mut socket: WebSocket) {
while let Some(msg) = socket.recv().await {
let msg = if let Ok(msg) = msg {
msg
} else {
// client disconnected
return;
};
tracing::info!("client send msg: {:?}", msg.clone());
if let Ok(message_text) = msg.to_text() {
if message_text == "serve" {
while let Some(Ok(msg)) = socket.recv().await {
Expand All @@ -76,80 +129,94 @@ async fn handle_socket(mut socket: WebSocket) {
}
}

async fn view_count(State(app_state): State<Arc<AppState>>) -> impl IntoResponse {
app_state.get_views().to_string()
}

async fn reset_views(State(app_state): State<Arc<AppState>>) -> impl IntoResponse {
app_state.reset_views();
}

async fn connect_to_room(
ws: WebSocketUpgrade,
Path((room_id, username)): Path<(u64, String)>,
State(room_state): State<Arc<RoomState>>,
Path((room_id, username)): Path<(usize, String)>,
State(app_state): State<Arc<AppState>>,
) -> impl IntoResponse {
tracing::info!("user: {} connected to room {room_id}", username.clone());
ws.on_upgrade(move |socket| connect_to_room_handler(socket, room_state, room_id, username))
ws.on_upgrade(move |socket| connect_to_room_handler(socket, app_state, room_id, username))
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct TweetInput {
message: String,
}

async fn connect_to_room_handler(
stream: WebSocket,
room_state: Arc<RoomState>,
room_id: u64,
state: Arc<AppState>,
room_id: usize,
username: String,
) {
// By splitting we can send and receive at the same time.
let (mut sender, mut receiver) = stream.split();

let mut state_room = room_state.rooms.lock().unwrap();
let (tx, _rx) = broadcast::channel(100);
let room = if let Some(r) = state_room.get_mut(&room_id) {
r.users.lock().unwrap().insert(username.clone());
r
} else {
let mut new_hashset = HashSet::new();
new_hashset.insert(username.clone());
let new_room = Room {
id: room_id,
users: Mutex::new(new_hashset),
tx,
};
state_room.insert(room_id, new_room);
state_room.get(&room_id).unwrap()
// Get the room channel to send the message to it
let room_sender = {
// If the room already exists in the app state
// get the sender from it.
// else:
// we insert the new room, and add the user to it.
let mut rooms = state.rooms.write().unwrap();
let room_state = rooms.entry(room_id).or_insert_with(RoomState::new);
room_state.insert_user(username.clone());
let room_sender = room_state.tx.clone();
room_sender
};
let mut rx = room.tx.subscribe();

// Spawn the first task that will receive broadcast messages and send text
// messages over the websocket to our client.
#[allow(dead_code, unused_variables)]
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// In any websocket error, break loop.
if sender.send(Message::Text(msg)).await.is_err() {
break;

// create room receiver to subscribe to any new message sent to the room channel
let mut room_receiver = room_sender.subscribe();

// spawn new task listening to any message sent from the current connected client
// if there is new message sent from the current client, i send this message to
// the room channel.
let mut send_task = {
let username = username.clone();
tokio::spawn(async move {
let room_sender = room_sender.clone();
while let Some(Ok(Message::Text(msg))) = receiver.next().await {
if let Ok(tweet_input) = serde_json::from_str::<TweetInput>(msg.as_str()) {
if tweet_input.message.len() <= 128 {
room_sender
.send(Tweet::new(username.clone(), tweet_input))
.unwrap();
}
}
}
}
});

// Clone things we want to pass (move) to the receiving task.
let tx = room.tx.clone();
let name = username.clone();

// Spawn a task that takes messages from the websocket, prepends the user
// name, and sends them to all broadcast subscribers.
#[allow(dead_code, unused_variables)]
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
// Add username before message.
let _ = tx.send(format!("{name}: {text}"));
}
});
})
};

// If any one of the tasks run to completion, we abort the other.
// tokio::select! {
// _ = (&mut send_task) => recv_task.abort(),
// _ = (&mut recv_task) => send_task.abort(),
// };
// spawn new task listening to any message sent to the room channel,
// if there is new message sent to the room channel,
// i use the current sender to send this message to current connected client.
let mut recv_task = {
// This task will receive messages from client and send them to broadcast subscribers.
let state = state.clone();
tokio::spawn(async move {
while let Ok(msg) = room_receiver.recv().await {
state.inc_views();
// Add username before message.
let _ = sender.send(Message::Text(format!("{}", msg))).await;
}
})
};

// // Send "user left" message (similar to "joined" above).
// let msg = format!("{username} left.");
// tracing::debug!("{msg}");
// let _ = room.tx.send(msg);
// select on the recv and send task
// If any one of the tasks exit, abort the other.
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};

// // Remove username from map so new clients can take it again.
// room.users.lock().unwrap().remove(&username);
// TODO: if we reach here, means that the user disconnected, so we need to remove the user from the room.
// TODO: if there is no one left in the room we remove the entire room.
}

#[cfg(test)]
Expand Down

0 comments on commit b479a9b

Please sign in to comment.