Skip to content

Commit c9ac41e

Browse files
committed
feat: improve socket and client error handling and returning
1 parent 8a26235 commit c9ac41e

File tree

14 files changed

+267
-136
lines changed

14 files changed

+267
-136
lines changed

crates/hrpc-build/src/client.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,21 @@ pub fn generate<T: Service>(service: &T, proto_path: &str) -> TokenStream {
3737

3838
#[cfg(feature = "client_default_transport_hyper_http")]
3939
def_transport_impl.extend(quote! {
40-
use hrpc::{client::transport::http::Hyper, exports::http::Uri};
40+
use hrpc::{client::transport::http::{Hyper, HyperError}, exports::http::Uri};
4141

4242
impl #service_ident<Hyper> {
4343
/// Create a new client using HTTP transport.
4444
///
4545
/// Panics if the passed URI is an invalid URI.
46-
pub fn new<U>(server: U) -> ClientResult<Self, <Hyper as Service<TransportRequest>>::Error>
46+
pub fn new<U>(server: U) -> ClientResult<Self, HyperError>
4747
where
4848
U: TryInto<Uri>,
4949
U::Error: Debug,
5050
{
5151
let transport =
5252
Hyper::new(server.try_into().expect("invalid URL"))
53-
.map_err(ClientError::Transport)
54-
.map_err(ClientError::Transport)?;
53+
.map_err(TransportError::from)
54+
.map_err(ClientError::from)?;
5555
Ok(Self {
5656
inner: Client::new(transport),
5757
})
@@ -61,21 +61,21 @@ pub fn generate<T: Service>(service: &T, proto_path: &str) -> TokenStream {
6161

6262
#[cfg(feature = "client_default_transport_wasm_http")]
6363
def_transport_impl.extend(quote! {
64-
use hrpc::{client::transport::http::Wasm, exports::http::Uri};
64+
use hrpc::{client::transport::http::{Wasm, WasmError}, exports::http::Uri};
6565

6666
impl #service_ident<Hyper> {
6767
/// Create a new client using HTTP transport.
6868
///
6969
/// Panics if the passed URI is an invalid URI.
70-
pub fn new<U>(server: U) -> ClientResult<Self, <Wasm as Service<TransportRequest>>::Error>
70+
pub fn new<U>(server: U) -> ClientResult<Self, WasmError>
7171
where
7272
U: TryInto<Uri>,
7373
U::Error: Debug,
7474
{
7575
let transport =
7676
Wasm::new(server.try_into().expect("invalid URL"))
77-
.map_err(ClientError::Transport)
78-
.map_err(ClientError::Transport)?;
77+
.map_err(TransportError::from)
78+
.map_err(ClientError::from)?;
7979
Ok(Self {
8080
inner: Client::new(transport),
8181
})
@@ -95,10 +95,10 @@ pub fn generate<T: Service>(service: &T, proto_path: &str) -> TokenStream {
9595
inner: Client<Inner>,
9696
}
9797

98-
impl<Inner> #service_ident<Inner>
98+
impl<Inner, InnerErr> #service_ident<Inner>
9999
where
100-
Inner: Service<TransportRequest, Response = TransportResponse> + 'static,
101-
Inner::Error: std::error::Error + 'static,
100+
Inner: Service<TransportRequest, Response = TransportResponse, Error = TransportError<InnerErr>> + 'static,
101+
InnerErr: 'static,
102102
{
103103
#methods
104104
}
@@ -144,7 +144,7 @@ fn generate_unary<T: Method>(method: &T, proto_path: &str, path: String) -> Toke
144144
let (request, response) = method.request_response_name(proto_path);
145145

146146
quote! {
147-
pub fn #ident<Req>(&mut self, req: Req) -> impl Future<Output = ClientResult<Response<#response>, Inner::Error>> + 'static
147+
pub fn #ident<Req>(&mut self, req: Req) -> impl Future<Output = ClientResult<Response<#response>, InnerErr>> + 'static
148148
where
149149
Req: IntoRequest<#request>,
150150
{
@@ -160,7 +160,7 @@ fn generate_streaming<T: Method>(method: &T, proto_path: &str, path: String) ->
160160
let (request, response) = method.request_response_name(proto_path);
161161

162162
quote! {
163-
pub fn #ident<Req>(&mut self, req: Req) -> impl Future<Output = ClientResult<Socket<#request, #response>, Inner::Error>> + 'static
163+
pub fn #ident<Req>(&mut self, req: Req) -> impl Future<Output = ClientResult<Socket<#request, #response>, InnerErr>> + 'static
164164
where
165165
Req: IntoRequest<()>,
166166
{
@@ -176,7 +176,7 @@ fn generate_server_streaming<T: Method>(method: &T, proto_path: &str, path: Stri
176176
let (request, response) = method.request_response_name(proto_path);
177177

178178
quote! {
179-
pub fn #ident<Req>(&mut self, req: Req) -> impl Future<Output = ClientResult<Socket<#request, #response>, Inner::Error>> + 'static
179+
pub fn #ident<Req>(&mut self, req: Req) -> impl Future<Output = ClientResult<Socket<#request, #response>, InnerErr>> + 'static
180180
where
181181
Req: IntoRequest<#request>,
182182
{

crates/hrpc/src/client/error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub type ClientResult<T, TransportError> = Result<T, ClientError<TransportError>
1212

1313
/// Errors that can occur within `Client` operation.
1414
#[derive(Debug)]
15-
pub enum ClientError<TransportError: StdError> {
15+
pub enum ClientError<TransportError> {
1616
/// Occurs if an endpoint returns an error.
1717
EndpointError {
1818
/// The hRPC error.
@@ -60,13 +60,13 @@ impl<TransportError: StdError> Display for ClientError<TransportError> {
6060
}
6161
}
6262

63-
impl<TransportError: StdError> From<DecodeBodyError> for ClientError<TransportError> {
63+
impl<TransportError> From<DecodeBodyError> for ClientError<TransportError> {
6464
fn from(err: DecodeBodyError) -> Self {
6565
ClientError::MessageDecode(err)
6666
}
6767
}
6868

69-
impl<TransportError: StdError> From<IoError> for ClientError<TransportError> {
69+
impl<TransportError> From<IoError> for ClientError<TransportError> {
7070
fn from(err: IoError) -> Self {
7171
ClientError::Io(err)
7272
}

crates/hrpc/src/client/mod.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
Response,
1111
};
1212

13-
use self::transport::{TransportRequest, TransportResponse};
13+
use self::transport::{TransportError, TransportRequest, TransportResponse};
1414

1515
use super::Request;
1616
use error::*;
@@ -30,7 +30,7 @@ pub mod prelude {
3030
pub use super::{
3131
error::{ClientError, ClientResult},
3232
socket::Socket,
33-
transport::{TransportRequest, TransportResponse},
33+
transport::{TransportError, TransportRequest, TransportResponse},
3434
Client,
3535
};
3636
pub use crate::{
@@ -69,10 +69,11 @@ impl<Inner> Client<Inner> {
6969
}
7070
}
7171

72-
impl<Inner> Client<Inner>
72+
impl<Inner, InnerErr> Client<Inner>
7373
where
74-
Inner: Service<TransportRequest, Response = TransportResponse> + 'static,
75-
Inner::Error: std::error::Error + 'static,
74+
Inner: Service<TransportRequest, Response = TransportResponse, Error = TransportError<InnerErr>>
75+
+ 'static,
76+
InnerErr: 'static,
7677
{
7778
/// Layer this client with a new [`Layer`].
7879
pub fn layer<S, L>(self, l: L) -> Client<S>
@@ -90,17 +91,17 @@ where
9091
pub fn execute_request<Req: prost::Message, Resp: prost::Message + Default>(
9192
&mut self,
9293
req: Request<Req>,
93-
) -> impl Future<Output = ClientResult<Response<Resp>, Inner::Error>> + 'static {
94+
) -> impl Future<Output = ClientResult<Response<Resp>, InnerErr>> + 'static {
9495
Service::call(&mut self.transport, TransportRequest::Unary(req.map()))
9596
.map_ok(|resp| resp.extract_unary().map::<Resp>())
96-
.map_err(ClientError::Transport)
97+
.map_err(ClientError::from)
9798
}
9899

99100
/// Connect a socket with the server and return it.
100101
pub fn connect_socket<Req, Resp>(
101102
&mut self,
102103
req: Request<()>,
103-
) -> impl Future<Output = ClientResult<Socket<Req, Resp>, Inner::Error>> + 'static
104+
) -> impl Future<Output = ClientResult<Socket<Req, Resp>, InnerErr>> + 'static
104105
where
105106
Req: prost::Message,
106107
Resp: prost::Message + Default,
@@ -111,7 +112,7 @@ where
111112

112113
Socket::new(rx, tx, socket::encode_message, socket::decode_message)
113114
})
114-
.map_err(ClientError::Transport)
115+
.map_err(ClientError::from)
115116
}
116117

117118
/// Connect a socket with the server, send a message and return it.
@@ -120,7 +121,7 @@ where
120121
pub fn connect_socket_req<Req, Resp>(
121122
&mut self,
122123
request: Request<Req>,
123-
) -> impl Future<Output = ClientResult<Socket<Req, Resp>, Inner::Error>> + 'static
124+
) -> impl Future<Output = ClientResult<Socket<Req, Resp>, InnerErr>> + 'static
124125
where
125126
Req: prost::Message + Default + 'static,
126127
Resp: prost::Message + Default + 'static,
@@ -147,9 +148,18 @@ where
147148
socket
148149
.send_message(message)
149150
.await
150-
.map_err(|err| ClientError::EndpointError {
151-
hrpc_error: err,
152-
endpoint,
151+
.map_err(|err| match err {
152+
SocketError::MessageDecode(err) => ClientError::MessageDecode(err),
153+
SocketError::Protocol(err) => ClientError::EndpointError {
154+
hrpc_error: err,
155+
endpoint,
156+
},
157+
// TODO: this is not good... we need a proper way to expose this error to the user
158+
// maybe by returning double result?
159+
SocketError::Transport(err) => ClientError::EndpointError {
160+
hrpc_error: HrpcError::from(err).with_identifier("hrpcrs.socket-error"),
161+
endpoint,
162+
},
153163
})?;
154164

155165
Ok(socket)

crates/hrpc/src/client/socket.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,42 @@
11
use bytes::BytesMut;
22
use prost::Message as PbMsg;
33

4-
use crate::{decode::DecodeBodyError, proto::Error as HrpcError};
4+
use crate::{common::socket::DecodeResult, decode::DecodeBodyError, proto::Error as HrpcError};
55

6-
pub use crate::common::socket::{ReadSocket, Socket, WriteSocket};
6+
pub use crate::common::socket::{ReadSocket, Socket, SocketError, WriteSocket};
77

88
pub(super) fn encode_message<Msg: PbMsg>(buf: &mut BytesMut, msg: &Msg) -> Vec<u8> {
99
crate::encode::encode_protobuf_message_to(buf, msg);
1010
// TODO: don't allocate here?
1111
buf.to_vec()
1212
}
1313

14-
pub(super) fn decode_message<Msg: PbMsg + Default>(raw: Vec<u8>) -> Result<Msg, HrpcError> {
14+
pub(super) fn decode_message<Msg: PbMsg + Default>(
15+
raw: Vec<u8>,
16+
) -> Result<DecodeResult<Msg>, DecodeBodyError> {
1517
if raw.is_empty() {
16-
return Err(
17-
DecodeBodyError::InvalidProtoMessage(prost::DecodeError::new("empty protobuf message"))
18-
.into(),
19-
);
18+
return Err(DecodeBodyError::InvalidProtoMessage(
19+
prost::DecodeError::new("empty protobuf message"),
20+
));
2021
}
2122

2223
let opcode = raw[0];
2324

2425
if opcode == 0 {
2526
Msg::decode(&raw[1..])
26-
.map_err(|err| HrpcError::from(DecodeBodyError::InvalidProtoMessage(err)))
27+
.map(DecodeResult::Msg)
28+
.map_err(DecodeBodyError::InvalidProtoMessage)
2729
} else if opcode == 1 {
28-
Err(HrpcError::decode(&raw[1..])
29-
.unwrap_or_else(|err| HrpcError::from(DecodeBodyError::InvalidProtoMessage(err))))
30+
HrpcError::decode(&raw[1..])
31+
.map(DecodeResult::Error)
32+
.map_err(DecodeBodyError::InvalidProtoMessage)
3033
} else {
31-
Err(HrpcError::from((
32-
"hrpcrs.http.invalid-socket-message-opcode",
33-
"invalid socket binary message opcode",
34-
))
35-
.with_details(raw))
34+
Err(DecodeBodyError::InvalidBody(Box::new(
35+
HrpcError::from((
36+
"hrpcrs.http.invalid-socket-message-opcode",
37+
"invalid socket binary message opcode",
38+
))
39+
.with_details(raw),
40+
)))
3641
}
3742
}

crates/hrpc/src/client/transport/http/hyper.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use tower::Service;
2222
use crate::{
2323
client::{
2424
error::{ClientError, HrpcError},
25-
transport::{box_socket_stream_sink, CallResult, TransportRequest, TransportResponse},
25+
transport::{
26+
box_socket_stream_sink, CallResult, TransportError, TransportRequest, TransportResponse,
27+
},
2628
},
2729
common::transport::{
2830
http::{
@@ -107,7 +109,7 @@ impl Hyper {
107109
impl Service<TransportRequest> for Hyper {
108110
type Response = TransportResponse;
109111

110-
type Error = ClientError<HyperError>;
112+
type Error = TransportError<HyperError>;
111113

112114
type Future = CallResult<'static, TransportResponse, HyperError>;
113115

@@ -156,10 +158,11 @@ impl Service<TransportRequest> for Hyper {
156158
.map_err(HyperError::Http)?;
157159
let hrpc_error = HrpcError::decode(raw_error.as_ref())
158160
.unwrap_or_else(|_| HrpcError::invalid_hrpc_error(raw_error));
159-
return Err(ClientError::EndpointError {
161+
return Err((ClientError::EndpointError {
160162
hrpc_error,
161163
endpoint,
162-
});
164+
})
165+
.into());
163166
}
164167

165168
// Handle non-protobuf successful responses
@@ -171,7 +174,7 @@ impl Service<TransportRequest> for Hyper {
171174
.and_then(|t| t.as_bytes().split(|c| b';'.eq(c)).next())
172175
.map_or(false, is_hrpc)
173176
{
174-
return Err(ClientError::ContentNotSupported);
177+
return Err(ClientError::ContentNotSupported.into());
175178
}
176179

177180
// check if the spec version matches
@@ -184,7 +187,7 @@ impl Service<TransportRequest> for Hyper {
184187
})
185188
{
186189
// TODO: parse the header properly and extract the version instead of just doing a contains
187-
return Err(ClientError::IncompatibleSpecVersion);
190+
return Err(ClientError::IncompatibleSpecVersion.into());
188191
}
189192

190193
let (parts, body) = resp.into_parts();
@@ -236,9 +239,9 @@ impl Service<TransportRequest> for Hyper {
236239
.and_then(|h| h.to_str().ok())
237240
.map_or(false, |v| v.contains(&ws_version()))
238241
{
239-
return Err(ClientError::Transport(HyperError::SocketInitError(
240-
SocketInitError::InvalidProtocol,
241-
)));
242+
return Err(
243+
HyperError::SocketInitError(SocketInitError::InvalidProtocol).into(),
244+
);
242245
}
243246

244247
let (ws_tx, ws_rx) = WebSocket::new(ws_stream).split();
@@ -292,9 +295,9 @@ impl From<hyper::Error> for HyperError {
292295
}
293296
}
294297

295-
impl From<HyperError> for ClientError<HyperError> {
298+
impl From<HyperError> for TransportError<HyperError> {
296299
fn from(err: HyperError) -> Self {
297-
ClientError::Transport(err)
300+
TransportError::Transport(err)
298301
}
299302
}
300303

0 commit comments

Comments
 (0)