Skip to content

Commit

Permalink
Added skeletal (possibly non-functional) RSocket endpoint (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Aug 27, 2024
1 parent fb27874 commit f8e927a
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package net.sf.dz3r.view.webui.v2;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.RoutingMetadata;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.Instant;

public class RSocketEndpoint {

private final Logger logger = LogManager.getLogger();

private final String interfaces;
private final int port;

private final ProtocolHandler protocolHandler = new ProtocolHandler();

public RSocketEndpoint(String interfaces, int port) {
this.interfaces = interfaces;
this.port = port;
}

void run(Instant startedAt) {

var closeableChannel = RSocketServer
.create(SocketAcceptor.with(protocolHandler))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(interfaces, port))
.block();

logger.info("started in {}ms", Duration.between(startedAt, Instant.now()).toMillis());

// VT: FIXME: This behaves differently from HttpEndpoint; need to make sure shutdown is properly handled

closeableChannel.onClose().block();

logger.info("done");
}

private class ProtocolHandler implements RSocket {

@Override
public Mono<Payload> requestResponse(Payload payload) {
ThreadContext.push("requestResponse");

try {

var route = decodeRoute(payload);
logger.info("route: {}", route);

return Mono.just(ByteBufPayload.create("now: " + Instant.now()));

} finally {
payload.release();
ThreadContext.pop();
}
}

private String decodeRoute(Payload payload) {

if (!payload.hasMetadata()) {
logger.info("routing metadata missing");
return null;
}

var routingMeta = new RoutingMetadata(payload.sliceMetadata());

logger.info("routing metadata: {}", routingMeta);

return routingMeta.iterator().next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ private void run(Instant startedAt) {
.publishOn(Schedulers.boundedElastic())
.subscribe(httpEndpoint::run);

var rsocketEndpoint = new RSocketEndpoint(
config.interfaces,
config.duplexPort);

Flux.just(Instant.now())
.publishOn(Schedulers.boundedElastic())
.subscribe(rsocketEndpoint::run);

advertise();

logger.info("done");
Expand Down

0 comments on commit f8e927a

Please sign in to comment.