From b9f1c38a1cb56a140c91be9f50575e07ffb4d444 Mon Sep 17 00:00:00 2001 From: haerdib Date: Mon, 6 Nov 2023 10:20:35 +0100 Subject: [PATCH] fix test and clean up ws_rpc client --- src/rpc/ws_client/mod.rs | 67 +++++++++++++-------- testing/examples/tungstenite_client_test.rs | 5 +- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index 487d6017d..2664abad0 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -112,41 +112,58 @@ impl HandleMessage for SubscriptionHandler { fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()> { let result = &context.result; let out = &context.out; - let msg = &context.msg; + let msg = &context.msg.as_text()?; info!("got on_subscription_msg {}", msg); - let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; - - let mut send_result = Ok(()); - match self.subscription_id.clone() { - Some(id) => { - if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { - if id == msg_subscription_id { - send_result = result.send( - serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?, - ); - } + let value: serde_json::Value = serde_json::from_str(msg).map_err(Box::new)?; + + let send_result = match self.subscription_id.as_ref() { + Some(id) => handle_subscription_message(result, &value, id), + None => { + self.subscription_id = get_subscription_id(&value); + if self.subscription_id.is_none() { + send_error_response(result, &value, msg) + } else { + Ok(()) } }, - None => match value["result"].as_str() { - Some(id) => self.subscription_id = Some(id.to_string()), - None => { - let message = match value["error"]["message"].is_string() { - true => serde_json::to_string(&value["error"]).map_err(Box::new)?, - false => format!("Received unexpected response: {}", msg), - }; - let _ = result.send(message); - out.close(CloseCode::Normal)?; - return Ok(()) - }, - }, }; if let Err(e) = send_result { // This may happen if the receiver has unsubscribed. - trace!("SendError: {}. will close ws", e); + trace!("SendError: {:?}. will close ws", e); out.close(CloseCode::Normal)?; }; Ok(()) } } + +fn handle_subscription_message( + result: &ThreadOut, + value: &serde_json::Value, + subscription_id: &str, +) -> Result<(), RpcClientError> { + if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { + if subscription_id == msg_subscription_id { + result.send(serde_json::to_string(&value["params"]["result"])?)?; + } + } + Ok(()) +} + +fn get_subscription_id(value: &serde_json::Value) -> Option { + value["result"].as_str().map(|id| id.to_string()) +} + +fn send_error_response( + result: &ThreadOut, + value: &serde_json::Value, + original_message: &str, +) -> Result<(), RpcClientError> { + let message = match value["error"]["message"].is_string() { + true => serde_json::to_string(&value["error"])?, + false => format!("Received unexpected response: {}", original_message), + }; + result.send(message)?; + Ok(()) +} diff --git a/testing/examples/tungstenite_client_test.rs b/testing/examples/tungstenite_client_test.rs index 715404369..34385d822 100755 --- a/testing/examples/tungstenite_client_test.rs +++ b/testing/examples/tungstenite_client_test.rs @@ -44,7 +44,10 @@ fn main() { // Check for failed extrinsic failed onchain let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); - assert_eq!(result, Err(DispatchError::Token(TokenError::FundsUnavailable))); + match result { + Ok(_) => panic!("Expected an error"), + Err(e) => assert_eq!(e, DispatchError::Token(TokenError::FundsUnavailable)), + }; // Check directly failed extrinsic (before actually submitted to a block) let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock);