From 0fc2dfc47e236a6beccf6a30a48108cecd30f4f0 Mon Sep 17 00:00:00 2001 From: Peter McDonnell Date: Wed, 19 Nov 2014 14:51:13 +0000 Subject: [PATCH] [#20,#26] Use Memoized Handler for send, always use timeouts for replies --- .../rxcore/java/eventbus/RxEventBus.java | 54 ++++++++++++++----- .../vertx/rxcore/java/eventbus/RxMessage.java | 22 ++------ .../java/EventBusIntegrationTest.java | 49 ++++++++++++++--- 3 files changed, 90 insertions(+), 35 deletions(-) diff --git a/src/main/java/io/vertx/rxcore/java/eventbus/RxEventBus.java b/src/main/java/io/vertx/rxcore/java/eventbus/RxEventBus.java index 0d3b86d..b803b98 100644 --- a/src/main/java/io/vertx/rxcore/java/eventbus/RxEventBus.java +++ b/src/main/java/io/vertx/rxcore/java/eventbus/RxEventBus.java @@ -1,6 +1,7 @@ package io.vertx.rxcore.java.eventbus; import io.vertx.rxcore.java.impl.HandlerSubscription; +import io.vertx.rxcore.java.impl.MemoizeHandler; import io.vertx.rxcore.java.impl.SingleSubscriptionHandler; import org.vertx.java.core.AsyncResult; import org.vertx.java.core.Handler; @@ -34,17 +35,17 @@ public class RxEventBus { // Customer handlers /** Standard SendHandler */ - protected static class SendHandler extends SingleSubscriptionHandler,Message> { + protected class SendHandler extends MemoizeHandler,Message> { @Override public void handle(Message m) { - fireResult(new RxMessage(m)); + complete(new RxMessageImpl(m)); } } /** Async SendHandler */ - protected static class AsyncSendHandler extends SingleSubscriptionHandler, AsyncResult>> { + protected class AsyncSendHandler extends SingleSubscriptionHandler, AsyncResult>> { @Override public void handle(AsyncResult> r) { if (r.succeeded()) { - fireResult(new RxMessage(r.result())); + fireResult(new RxMessageImpl(r.result())); } else { fireError(r.cause()); @@ -53,7 +54,7 @@ protected static class AsyncSendHandler extends SingleSubscriptionHandler extends HandlerSubscription>,RxMessage> { + protected class AsyncSendSubscription extends HandlerSubscription>,RxMessage> { /** Create new AsyncSendSubscription */ public AsyncSendSubscription(Subscriber> s) { @@ -63,7 +64,7 @@ public AsyncSendSubscription(Subscriber> s) { /** Handle event */ public void handle(AsyncResult> evt) { if (evt.succeeded()) { - fireComplete(new RxMessage(evt.result())); + fireComplete(new RxMessageImpl(evt.result())); } else { fireError(evt.cause()); @@ -72,19 +73,48 @@ public void handle(AsyncResult> evt) { } /** Receive handler */ - protected static class ReceiveHandler extends SingleSubscriptionHandler,Message> { + protected class ReceiveHandler extends SingleSubscriptionHandler,Message> { @Override public void handle(Message m) { - fireNext(new RxMessage(m)); + fireNext(new RxMessageImpl(m)); } } - + + /** RxMessage implementation with inherited timeouts */ + protected class RxMessageImpl extends RxMessage + { + /** Create new RxMessageImpl */ + public RxMessageImpl(Message coreMessage) { + super(coreMessage); + } + + /** Observe a reply */ + public Observable> observeReply(final R msg) + { + return Observable.create(new AsyncSendHandler() { + @Override public void execute() { + coreMessage.replyWithTimeout(msg,defaultTimeout,this); + } + }); + } + + /** Observe a reply with timeout */ + public Observable> observeReplyWithTimeout(final R msg, final long timeout) { + return Observable.create(new AsyncSendHandler() { + @Override public void execute() { + coreMessage.replyWithTimeout(msg,timeout,this); + } + }); + } + + } + // Instance variables /** Core bus */ private final EventBus eventBus; /** Default timeout */ - private final int defaultTimeout; + protected final int defaultTimeout; // Public @@ -103,7 +133,7 @@ public RxEventBus(EventBus eventBus, int defaultTimeout) { public Observable> send(final String address, final S msg) { SendHandler h=new SendHandler(); this.eventBus.send(address,msg,(Handler)h); - return Observable.create(h); + return Observable.create(h.subscribe); } /** Send a message with timeout */ @@ -131,7 +161,7 @@ public Observable> observeSendWithTimeout(final String addres /** Send message for each subscription */ public void call(Subscriber> subscriber) { AsyncSendSubscription hs=new AsyncSendSubscription(subscriber); - eventBus.sendWithTimeout(address, (Object)msg, timeout, (Handler)hs); + eventBus.sendWithTimeout(address, (Object)msg, timeout, hs); subscriber.add(hs); } }); diff --git a/src/main/java/io/vertx/rxcore/java/eventbus/RxMessage.java b/src/main/java/io/vertx/rxcore/java/eventbus/RxMessage.java index ad4cc8e..bc26df6 100644 --- a/src/main/java/io/vertx/rxcore/java/eventbus/RxMessage.java +++ b/src/main/java/io/vertx/rxcore/java/eventbus/RxMessage.java @@ -1,5 +1,6 @@ package io.vertx.rxcore.java.eventbus; +import org.vertx.java.core.Handler; import org.vertx.java.core.eventbus.Message; import rx.Observable; @@ -20,10 +21,10 @@ * * @author Tim Fox */ -public class RxMessage { +public abstract class RxMessage { /** Core Message */ - private final Message coreMessage; + protected final Message coreMessage; /** Wrap Message with RxMessage */ RxMessage(Message coreMessage) { @@ -49,7 +50,6 @@ public String replyAddress() { return coreMessage.replyAddress(); } - /** * @return The underlying core message */ @@ -68,20 +68,8 @@ public void reply(final R msg) { } /** Observe a reply */ - public Observable> observeReply(final R msg) { - return Observable.create(new RxEventBus.SendHandler() { - @Override public void execute() { - coreMessage.reply(msg,this); - } - }); - } + public abstract Observable> observeReply(final R msg); /** Observe a reply with timeout */ - public Observable> observerReplyWithTimeout(final R msg, final long timeout) { - return Observable.create(new RxEventBus.AsyncSendHandler() { - @Override public void execute() { - coreMessage.replyWithTimeout(msg,timeout,this); - } - }); - } + public abstract Observable> observeReplyWithTimeout(final R msg, final long timeout); } diff --git a/src/test/java/io/vertx/rxcore/test/integration/java/EventBusIntegrationTest.java b/src/test/java/io/vertx/rxcore/test/integration/java/EventBusIntegrationTest.java index f3b3fbd..1977751 100644 --- a/src/test/java/io/vertx/rxcore/test/integration/java/EventBusIntegrationTest.java +++ b/src/test/java/io/vertx/rxcore/test/integration/java/EventBusIntegrationTest.java @@ -18,9 +18,9 @@ * @author Tim Fox */ +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import io.vertx.rxcore.RxSupport; import io.vertx.rxcore.java.eventbus.RxEventBus; import io.vertx.rxcore.java.eventbus.RxMessage; import io.vertx.rxcore.java.eventbus.RxStream; @@ -29,17 +29,15 @@ import org.vertx.java.core.Handler; import org.vertx.java.core.buffer.Buffer; import org.vertx.java.core.eventbus.Message; +import org.vertx.java.core.eventbus.ReplyException; import org.vertx.java.core.json.JsonArray; import org.vertx.testtools.TestVerticle; import rx.Observable; -import rx.Subscriber; -import rx.Subscription; import rx.functions.*; -import rx.subscriptions.Subscriptions; import static io.vertx.rxcore.test.integration.java.RxAssert.assertCountThenComplete; import static io.vertx.rxcore.test.integration.java.RxAssert.assertMessageThenComplete; -import static io.vertx.rxcore.test.integration.java.RxAssert.assertSingle; +import static io.vertx.rxcore.test.integration.java.RxAssert.assertError; import static org.vertx.testtools.VertxAssert.assertEquals; import static org.vertx.testtools.VertxAssert.testComplete; @@ -64,6 +62,31 @@ public void call(RxMessage message) { }); } + @Test + public void testDeferredSubscribe() { + RxEventBus rxEventBus = new RxEventBus(vertx.eventBus()); + rxEventBus.registerHandler("foo").subscribe(new Action1>() { + @Override + public void call(RxMessage message) { + message.reply("pong!"); + } + }); + // Message is sent on send + final Observable> obs = rxEventBus.send("foo", "ping!"); + // Defer the subscribe + vertx.setTimer(100,new Handler() { + public void handle(Long ev) { + obs.subscribe(new Action1>() { + @Override + public void call(RxMessage message) { + assertEquals("pong!", message.body()); + testComplete(); + } + }); + } + }); + } + @Test // Send some messages in series - i.e. wait for result of previous one before sending next one // PMCD: Added check to enforce 1-at-a-time @@ -232,6 +255,21 @@ public Observable> call(RxMessage stringRxMessage) { assertMessageThenComplete(obsSend2,"goodday2"); } + @Test + public void testTimeout() { + final RxEventBus rx=new RxEventBus(vertx.eventBus()); + + // Register handler that timesout + rx.registerHandler("thewall").subscribe(new Action1>() { + public void call(RxMessage req) { + // No-one listens + assertError(req.observeReplyWithTimeout("pong",200),ReplyException.class); + } + }); + + rx.send("thewall","ping"); + } + @Test public void testRetry() { @@ -359,5 +397,4 @@ public Buffer call(JsonArray data) { assertCountThenComplete(regulator.stream(res,out),401); } - }