Skip to content

Commit 7b1c90b

Browse files
committed
feat: real-time reactions
1 parent e4a4701 commit 7b1c90b

File tree

14 files changed

+369
-230
lines changed

14 files changed

+369
-230
lines changed

server/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,12 @@ async fn main() {
6969
.route("/ws", get(server::subscribe_chat_handshake))
7070
.route("/api/users/@me", get(server::rest::user::get_self))
7171
.route("/api/contacts/@me", get(server::rest::contacts::get_contacts))
72-
.route("/api/messages/:contact_id", post(server::rest::messages::create_message))
73-
.route("/api/messages/:contact_id", get(server::rest::messages::get_messages))
74-
.route("/api/messages/:contact_id", put(server::rest::messages::edit_message))
75-
.route("/api/messages/:contact_id", delete(server::rest::messages::delete_message))
76-
.route("/api/messages/:contact_id/reactions", post(server::rest::reactions::add_reaction))
77-
.route("/api/messages/:contact_id/reactions/:reaction_id", delete(server::rest::reactions::remove_reaction))
72+
.route("/api/channels/:context_id/messages", post(server::rest::messages::create_message))
73+
.route("/api/channels/:context_id/messages", get(server::rest::messages::get_messages))
74+
.route("/api/channels/:context_id/messages/:message_id", put(server::rest::messages::edit_message))
75+
.route("/api/channels/:context_id/messages/:message_id", delete(server::rest::messages::delete_message))
76+
.route("/api/channels/:context_id/messages/:message_id/reactions", post(server::rest::reactions::add_reaction))
77+
.route("/api/channels/:context_id/messages/:message_id/reactions/:reaction_id", delete(server::rest::reactions::remove_reaction))
7878
.route_layer(
7979
middleware::from_fn(authorize)
8080
)

server/src/server/gateway/context.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use dashmap::DashMap;
2+
use futures_util::FutureExt;
3+
use tokio::sync::mpsc::Sender;
4+
use crate::server::messages::Packet;
5+
6+
pub async fn send_packet_to_context(packet_queue: &mut DashMap<i64, Sender<Box<dyn Packet + Send>>>, context: i64, packet: Box<dyn Packet + Send>) {
7+
println!("Sending packet to context: {}", context);
8+
if let Some(tx) = packet_queue.get(&context) {
9+
println!("Context was found, now sending packet");
10+
tx.send(packet).then(|result| {
11+
if let Err(e) = result {
12+
eprintln!("Failed to send message: {:?}", e);
13+
}
14+
futures_util::future::ready(())
15+
}).await;
16+
}
17+
}

server/src/server/gateway/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::server::messages::{Packet, PacketMessage, PacketStaticId};
77

88
pub mod receipts;
99
pub mod typing;
10+
pub mod context;
1011

1112
#[async_trait]
1213
pub trait GatewayHandler {

server/src/server/messages.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,38 @@ pub struct MessageDeleted {
9696
pub message_id: i64,
9797
#[prost(int64, tag = "2")]
9898
pub context_id: i64
99+
}
100+
101+
#[derive(Clone, PartialEq, Message)]
102+
#[packet(id = 7)]
103+
pub struct ReactionAdded {
104+
#[prost(int64, tag = "1")]
105+
pub message_id: i64,
106+
#[prost(int64, tag = "2")]
107+
pub user_id: i64,
108+
#[prost(string, tag = "3")]
109+
pub emoji: String,
110+
#[prost(int32, tag = "4")]
111+
pub reaction_count: i32,
112+
#[prost(int32, tag = "5")]
113+
pub reaction_id: i32,
114+
#[prost(int64, tag = "6")]
115+
pub context_id: i64
116+
}
117+
118+
#[derive(Clone, PartialEq, Message)]
119+
#[packet(id = 8)]
120+
pub struct ReactionRemoved {
121+
#[prost(int64, tag = "1")]
122+
pub message_id: i64,
123+
#[prost(int64, tag = "2")]
124+
pub user_id: i64,
125+
#[prost(string, tag = "3")]
126+
pub emoji: String,
127+
#[prost(int32, tag = "4")]
128+
pub reaction_count: i32,
129+
#[prost(int32, tag = "5")]
130+
pub reaction_id: i32,
131+
#[prost(int64, tag = "6")]
132+
pub context_id: i64
99133
}

server/src/server/rest/messages.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ use axum::body::Body;
33
use axum::extract::Path;
44
use axum::http::{Request, StatusCode};
55
use diesel::{debug_query, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, Table};
6+
use diesel::pg::Pg;
67
use diesel::sql_types::BigInt;
78
use diesel::prelude::*;
89
use futures_util::FutureExt;
910
use http_body_util::BodyExt;
1011
use serde::Deserialize;
1112

1213
use crate::entity::message::{CompleteMessage, Message};
13-
use crate::entity::message::messages::{content, edited, id as messageId, user_id};
14+
use crate::entity::message::messages::{content, context, edited, id as messageId, user_id};
1415
use crate::entity::message::messages::dsl::messages as messagesTable;
1516
use crate::entity::message::messages::dsl::messages;
1617
use crate::entity::user::User;
@@ -158,7 +159,6 @@ ORDER BY
158159
let bilateral_messages = query.load::<CompleteMessage>(connection).expect("Error loading messages");
159160

160161
ok(bilateral_messages.iter().map(|m| {
161-
println!("{}", &m.reactions);
162162
IterablePrivateMessage {
163163
id: m.id,
164164
user_id: m.user_id,
@@ -173,7 +173,7 @@ ORDER BY
173173
}
174174

175175
pub async fn edit_message(
176-
Path(message_id): Path<i64>,
176+
Path((channel_id, message_id)): Path<(i64, i64)>,
177177
Extension(state): Extension<SharedState>,
178178
request: Request<Body>
179179
) -> IrisResponse<PrivateMessage> {
@@ -185,6 +185,7 @@ pub async fn edit_message(
185185
let message: Json<MessageCreationRequest> = message.unwrap();
186186

187187
let query = messages
188+
.filter(context.eq(channel_id))
188189
.filter(messageId.eq(message_id))
189190
.filter(user_id.eq(user.id));
190191

@@ -220,14 +221,15 @@ pub async fn edit_message(
220221
}
221222

222223
pub async fn delete_message(
223-
Path(message_id): Path<i64>,
224+
Path((channel_id, message_id)): Path<(i64, i64)>,
224225
Extension(state): Extension<SharedState>,
225226
request: Request<Body>
226227
) -> IrisResponse<()> {
227228
let user = request.extensions().get::<User>().cloned().expect("User not found");
228229

229230
let state = &mut state.write().await;
230231
let query = messages
232+
.filter(context.eq(channel_id))
231233
.filter(messageId.eq(message_id))
232234
.filter(user_id.eq(user.id));
233235
let deleted = diesel::delete(query).returning(messagesTable::all_columns()).get_result::<Message>(&mut state.database);

server/src/server/rest/reactions.rs

Lines changed: 92 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@ use crate::SharedState;
1616
use http_body_util::BodyExt;
1717
use diesel::QueryDsl;
1818
use diesel::ExpressionMethods;
19+
use futures_util::FutureExt;
1920
use crate::entity::reactions::reaction_users::user_id;
21+
use crate::server::gateway::context::send_packet_to_context;
22+
use crate::server::messages::{ReactionAdded, ReactionRemoved};
2023

2124
pub async fn add_reaction(
22-
Path(message_reaction_id): Path<i64>,
25+
Path((channel_id, message_identifier)): Path<(i64, i64)>,
2326
Extension(state): Extension<SharedState>,
2427
request: Request<Body>
2528
) -> IrisResponse<ReactionAddResponse> {
@@ -29,97 +32,121 @@ pub async fn add_reaction(
2932
return error(StatusCode::BAD_REQUEST, "Invalid reaction");
3033
}
3134
let request: ReactionAddRequest = request.unwrap().0;
35+
let emoticon = request.reaction_type.clone();
3236

33-
let database = &mut state.write().await.database;
34-
let transaction_result = database.transaction::<_, diesel::result::Error, _>(|connection| {
35-
let reaction_details: Option<(i32, i32)> = match request.reaction_id {
36-
Some(message_reaction_id) => {
37-
let count = diesel::update(reactionsTable)
38-
.filter(reaction_id.eq(message_reaction_id))
39-
.set(reaction_count.eq(reaction_count + 1))
40-
.returning(reaction_count)
41-
.get_result::<i32>(connection)?;
42-
Some((message_reaction_id, count))
43-
},
44-
None => {
45-
let reaction = diesel::update(reactionsTable)
46-
.filter(message_id.eq(message_reaction_id))
47-
.filter(emoji.eq(request.reaction_type.clone()))
48-
.set(reaction_count.eq(reaction_count + 1))
49-
.returning((reaction_id, reaction_count))
50-
.get_result::<(i32, i32)>(connection)
51-
.optional()?;
52-
53-
if let Some(tuple) = reaction {
54-
println!("Already found a reaction, no need for new one");
55-
Some(tuple)
56-
} else {
57-
println!("Inserted");
58-
let new_reaction = ReactionInsert {
59-
message_id: message_reaction_id,
60-
emoji: request.reaction_type
61-
};
62-
let query = diesel::insert_into(reactionsTable)
63-
.values(&new_reaction)
64-
.returning(reaction_id)
37+
let state = &mut state.write().await;
38+
let transaction_result = {
39+
state.database.transaction::<_, diesel::result::Error, _>(|connection| {
40+
let reaction_details: Option<(i32, i32)> = match request.reaction_id {
41+
Some(message_reaction_id) => {
42+
let count = diesel::update(reactionsTable)
43+
.filter(reaction_id.eq(message_reaction_id))
44+
.set(reaction_count.eq(reaction_count + 1))
45+
.returning(reaction_count)
6546
.get_result::<i32>(connection)?;
47+
Some((message_reaction_id, count))
48+
}
49+
None => {
50+
let reaction = diesel::update(reactionsTable)
51+
.filter(message_id.eq(message_identifier))
52+
.filter(emoji.eq(request.reaction_type.clone()))
53+
.set(reaction_count.eq(reaction_count + 1))
54+
.returning((reaction_id, reaction_count))
55+
.get_result::<(i32, i32)>(connection)
56+
.optional()?;
57+
58+
if let Some(tuple) = reaction {
59+
println!("Already found a reaction, no need for new one");
60+
Some(tuple)
61+
} else {
62+
println!("Inserted");
63+
let new_reaction = ReactionInsert {
64+
message_id: message_identifier,
65+
emoji: request.reaction_type,
66+
};
67+
let query = diesel::insert_into(reactionsTable)
68+
.values(&new_reaction)
69+
.returning(reaction_id)
70+
.get_result::<i32>(connection)?;
6671

67-
Some((query, 1))
72+
Some((query, 1))
73+
}
6874
}
75+
};
76+
if reaction_details.is_none() {
77+
return Err(diesel::result::Error::NotFound);
6978
}
70-
};
71-
if reaction_details.is_none() {
72-
return Err(diesel::result::Error::NotFound);
73-
}
74-
75-
let (message_reaction_id, count) = reaction_details.unwrap();
76-
let reaction_user = ReactionUserInsert {
77-
reaction_id: message_reaction_id,
78-
user_id: user.id,
79-
};
80-
81-
let user_query = diesel::insert_into(reactionUsersTable)
82-
.values(&reaction_user)
83-
.execute(connection)?;
84-
Ok((message_reaction_id, count))
85-
});
79+
80+
let (message_reaction_id, count) = reaction_details.unwrap();
81+
let reaction_user = ReactionUserInsert {
82+
reaction_id: message_reaction_id,
83+
user_id: user.id,
84+
};
85+
86+
let user_query = diesel::insert_into(reactionUsersTable)
87+
.values(&reaction_user)
88+
.execute(connection)?;
89+
Ok((message_reaction_id, count))
90+
})
91+
};
8692

8793
if transaction_result.is_err() {
8894
return error(StatusCode::INTERNAL_SERVER_ERROR, "Failed to add reaction");
8995
}
9096
let (message_reaction_id, count) = transaction_result.unwrap();
9197

98+
send_packet_to_context(&mut state.packet_queue, channel_id.clone(), Box::new(ReactionAdded {
99+
message_id: message_identifier,
100+
user_id: user.id,
101+
emoji: emoticon,
102+
reaction_count: count,
103+
reaction_id: message_reaction_id,
104+
context_id: channel_id
105+
})).await;
106+
92107
ok(ReactionAddResponse {
93108
reaction_id: message_reaction_id,
94109
reaction_count: count
95110
})
96111
}
97112

98113
pub async fn remove_reaction(
99-
Path((message_identifier, reaction_identifier)): Path<(i64, i32)>,
114+
Path((channel_id, message_identifier, reaction_identifier)): Path<(i64, i64, i32)>,
100115
Extension(state): Extension<SharedState>,
101116
request: Request<Body>
102117
) -> IrisResponse<()> {
103118
let user = request.extensions().get::<User>().cloned().expect("User not found");
104119

105-
let database = &mut state.write().await.database;
106-
let transaction_result = database.transaction::<_, diesel::result::Error, _>(|connection| {
107-
// reduce one from reaction count
108-
let reaction = diesel::update(reactionsTable)
109-
.filter(reaction_id.eq(reaction_identifier))
110-
.set(reaction_count.eq(reaction_count - 1))
111-
.execute(connection)?;
112-
113-
let user_query = diesel::delete(reactionUsersTable)
114-
.filter(reactionUsersTableReactionId.eq(reaction_identifier))
115-
.filter(user_id.eq(user.id))
116-
.execute(connection)?;
117-
Ok(())
118-
});
120+
let state = &mut state.write().await;
121+
let transaction_result = {
122+
state.database.transaction::<_, diesel::result::Error, _>(|connection| {
123+
// reduce one from reaction count
124+
let count = diesel::update(reactionsTable)
125+
.filter(reaction_id.eq(reaction_identifier))
126+
.set(reaction_count.eq(reaction_count - 1))
127+
.returning(reaction_count)
128+
.get_result::<i32>(connection)?;
129+
130+
diesel::delete(reactionUsersTable)
131+
.filter(reactionUsersTableReactionId.eq(reaction_identifier))
132+
.filter(user_id.eq(user.id))
133+
.execute(connection)?;
134+
Ok((count))
135+
})
136+
};
119137

120138
if transaction_result.is_err() {
121139
return error(StatusCode::INTERNAL_SERVER_ERROR, "Failed to remove reaction");
122140
}
123141

142+
send_packet_to_context(&mut state.packet_queue, channel_id.clone(), Box::new(ReactionRemoved {
143+
message_id: message_identifier,
144+
user_id: user.id,
145+
emoji: String::from(""),
146+
reaction_count: transaction_result.unwrap(),
147+
reaction_id: reaction_identifier,
148+
context_id: channel_id
149+
})).await;
150+
124151
no_content()
125152
}

0 commit comments

Comments
 (0)