Skip to content

Commit

Permalink
fix test and clean up ws_rpc client
Browse files Browse the repository at this point in the history
  • Loading branch information
haerdib committed Nov 6, 2023
1 parent e3c5206 commit b9f1c38
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 26 deletions.
67 changes: 42 additions & 25 deletions src/rpc/ws_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,41 +112,58 @@ impl HandleMessage for SubscriptionHandler {
fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()> {
let result = &context.result;
let out = &context.out;
let msg = &context.msg;
let msg = &context.msg.as_text()?;

info!("got on_subscription_msg {}", msg);
let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?;

let mut send_result = Ok(());
match self.subscription_id.clone() {
Some(id) => {
if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() {
if id == msg_subscription_id {
send_result = result.send(
serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?,
);
}
let value: serde_json::Value = serde_json::from_str(msg).map_err(Box::new)?;

let send_result = match self.subscription_id.as_ref() {
Some(id) => handle_subscription_message(result, &value, id),
None => {
self.subscription_id = get_subscription_id(&value);
if self.subscription_id.is_none() {
send_error_response(result, &value, msg)
} else {
Ok(())
}
},
None => match value["result"].as_str() {
Some(id) => self.subscription_id = Some(id.to_string()),
None => {
let message = match value["error"]["message"].is_string() {
true => serde_json::to_string(&value["error"]).map_err(Box::new)?,
false => format!("Received unexpected response: {}", msg),
};
let _ = result.send(message);
out.close(CloseCode::Normal)?;
return Ok(())
},
},
};

if let Err(e) = send_result {
// This may happen if the receiver has unsubscribed.
trace!("SendError: {}. will close ws", e);
trace!("SendError: {:?}. will close ws", e);
out.close(CloseCode::Normal)?;
};
Ok(())
}
}

fn handle_subscription_message(
result: &ThreadOut<String>,
value: &serde_json::Value,
subscription_id: &str,
) -> Result<(), RpcClientError> {
if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() {
if subscription_id == msg_subscription_id {
result.send(serde_json::to_string(&value["params"]["result"])?)?;
}
}
Ok(())
}

fn get_subscription_id(value: &serde_json::Value) -> Option<String> {
value["result"].as_str().map(|id| id.to_string())
}

fn send_error_response(
result: &ThreadOut<String>,
value: &serde_json::Value,
original_message: &str,
) -> Result<(), RpcClientError> {
let message = match value["error"]["message"].is_string() {
true => serde_json::to_string(&value["error"])?,
false => format!("Received unexpected response: {}", original_message),
};
result.send(message)?;
Ok(())
}
5 changes: 4 additions & 1 deletion testing/examples/tungstenite_client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ fn main() {
// Check for failed extrinsic failed onchain
let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1);
let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock);
assert_eq!(result, Err(DispatchError::Token(TokenError::FundsUnavailable)));
match result {
Ok(_) => panic!("Expected an error"),
Err(e) => assert_eq!(e, DispatchError::Token(TokenError::FundsUnavailable)),
};

// Check directly failed extrinsic (before actually submitted to a block)
let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock);
Expand Down

0 comments on commit b9f1c38

Please sign in to comment.