Skip to content

Commit

Permalink
Replace synchronized block with Lock
Browse files Browse the repository at this point in the history
  • Loading branch information
TharmiganK committed Jan 7, 2025
1 parent 7b064e0 commit fff337f
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 16 deletions.
4 changes: 2 additions & 2 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ path = "../native/build/libs/http-native-2.13.0-SNAPSHOT.jar"
groupId = "io.ballerina.stdlib"
artifactId = "mime-native"
version = "2.11.0"
path = "./lib/mime-native-2.11.0-20241209-180600-aa73132.jar"
path = "./lib/mime-native-2.11.0-20241218-125100-e28a03b.jar"

[[platform.java21.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "constraint-native"
version = "1.6.0"
path = "./lib/constraint-native-1.6.0-20241209-172100-2facdca.jar"
path = "./lib/constraint-native-1.6.0-20241218-112400-cd313f2.jar"

[[platform.java21.dependency]]
groupId = "io.netty"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import io.ballerina.stdlib.http.transport.contract.exceptions.SslException;
import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.pool.ConnectionManager;
import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.pool.PoolConfiguration;
import io.ballerina.stdlib.http.transport.internal.ResourceLock;
import io.ballerina.stdlib.http.transport.message.Http2PushPromise;
import io.ballerina.stdlib.http.transport.message.HttpCarbonMessage;
import io.ballerina.stdlib.http.transport.message.HttpMessageDataStreamer;
Expand Down Expand Up @@ -206,6 +207,7 @@ public class HttpUtil {
private static final String JAVA_CONFIG_TLS_NAMED_GROUPS = "jdk.tls.namedGroups";
private static final String[] DEFAULT_NAMED_GROUPS = { "X25519Kyber768Draft00", "x25519", "secp256r1",
"secp384r1", "secp521r1" };
private static final ResourceLock resourceLock = new ResourceLock();

/**
* Set new entity to in/out request/response struct.
Expand Down Expand Up @@ -1324,7 +1326,7 @@ public static void populateSenderConfigurations(SenderConfiguration senderConfig
public static ConnectionManager getConnectionManager(BMap poolStruct) {
ConnectionManager poolManager = (ConnectionManager) poolStruct.getNativeData(HttpConstants.CONNECTION_MANAGER);
if (poolManager == null) {
synchronized (poolStruct) {
try (ResourceLock ignored = resourceLock.obtain()) {
if (poolStruct.getNativeData(HttpConstants.CONNECTION_MANAGER) == null) {
PoolConfiguration userDefinedPool = new PoolConfiguration();
populatePoolingConfig(poolStruct, userDefinedPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.ballerina.stdlib.http.transport.contract.Constants;
import io.ballerina.stdlib.http.transport.contract.HttpResponseFuture;
import io.ballerina.stdlib.http.transport.contract.exceptions.ServerConnectorException;
import io.ballerina.stdlib.http.transport.internal.ResourceLock;
import io.ballerina.stdlib.http.transport.message.HttpCarbonMessage;
import io.ballerina.stdlib.http.transport.message.HttpPipeliningFuture;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -41,6 +42,7 @@
public class PipeliningHandler {

private static final Logger log = LoggerFactory.getLogger(PipeliningHandler.class);
private static final ResourceLock resourceLock = new ResourceLock();

/**
* This method should be used whenever a response should be sent out via other places (eg:- error responses,
Expand Down Expand Up @@ -79,7 +81,7 @@ public static HttpResponseFuture executePipeliningLogic(ChannelHandlerContext so
PipelinedResponse pipelinedResponse) {
HttpResponseFuture responseFuture = null;

synchronized (sourceContext.channel().attr(Constants.RESPONSE_QUEUE).get()) {
try (ResourceLock ignored = resourceLock.obtain()) {
Queue<PipelinedResponse> responseQueue = sourceContext.channel().attr(Constants.RESPONSE_QUEUE).get();
if (thresholdReached(sourceContext, responseQueue)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.ballerina.stdlib.http.transport.contractimpl.listener.SourceHandler;
import io.ballerina.stdlib.http.transport.internal.HandlerExecutor;
import io.ballerina.stdlib.http.transport.internal.HttpTransportContextHolder;
import io.ballerina.stdlib.http.transport.internal.ResourceLock;
import io.ballerina.stdlib.http.transport.message.HttpCarbonMessage;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class SendingEntityBody implements ListenerState {
private HttpCarbonMessage outboundResponseMsg;
private ChannelHandlerContext sourceContext;
private SourceHandler sourceHandler;
private final ResourceLock resourceLock = new ResourceLock();

SendingEntityBody(HttpOutboundRespListener outboundRespListener,
ListenerReqRespStateManager listenerReqRespStateManager,
Expand Down Expand Up @@ -245,7 +247,7 @@ private void triggerPipeliningLogic(HttpCarbonMessage outboundResponseMsg) {
if (outboundResponseMsg.isPipeliningEnabled() && Constants.HTTP_1_1_VERSION.equalsIgnoreCase
(httpVersion)) {
Queue responseQueue;
synchronized (sourceContext.channel().attr(Constants.RESPONSE_QUEUE).get()) {
try (ResourceLock ignored = resourceLock.obtain()) {
responseQueue = sourceContext.channel().attr(Constants.RESPONSE_QUEUE).get();
Long nextSequenceNumber = sourceContext.channel().attr(Constants.NEXT_SEQUENCE_NUMBER).get();
//IMPORTANT:Next sequence number should never be incremented for interim 100 continue response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.BootstrapConfiguration;
import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.TargetChannel;
import io.ballerina.stdlib.http.transport.contractimpl.sender.http2.Http2ConnectionManager;
import io.ballerina.stdlib.http.transport.internal.ResourceLock;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
Expand All @@ -46,6 +47,7 @@ public class ConnectionManager {
private final Map<String, GenericObjectPool> globalConnPool;
private final Map<String, PoolableTargetChannelFactory> globalFactoryObjects;
private final Http2ConnectionManager http2ConnectionManager;
private final ResourceLock resourceLock = new ResourceLock();

public ConnectionManager(PoolConfiguration poolConfiguration) {
this.poolConfiguration = poolConfiguration;
Expand Down Expand Up @@ -103,7 +105,7 @@ private GenericObjectPool getTrgHlrPoolFromGlobalPool(HttpRoute httpRoute, Sende
EventLoopGroup clientEventGroup) {
GenericObjectPool trgHlrConnPool;
Class eventLoopClass = NioSocketChannel.class;
synchronized (this) {
try (ResourceLock ignored = resourceLock.obtain()) {
if (!globalConnPool.containsKey(httpRoute.toString())) {
createTrgHlrPoolInGlobalPool(httpRoute, senderConfig, bootstrapConfig, clientEventGroup,
eventLoopClass);
Expand All @@ -122,7 +124,7 @@ private GenericObjectPool getTrgHlrPoolFromGlobalPoolWithSrcPool(HttpRoute httpR
Map<String, GenericObjectPool> srcHlrConnPool) {
GenericObjectPool trgHlrConnPool = srcHlrConnPool.get(trgHlrConnPoolId);
if (trgHlrConnPool == null) {
synchronized (this) {
try (ResourceLock ignored = resourceLock.obtain()) {
if (!globalConnPool.containsKey(httpRoute.toString())) {
createTrgHlrPoolInGlobalPool(httpRoute, senderConfig, bootstrapConfig,
clientEventGroup, eventLoopClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.ballerina.stdlib.http.transport.contractimpl.sender.http2;

import io.ballerina.stdlib.http.transport.internal.ResourceLock;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -55,7 +56,7 @@ static class PerRouteConnectionPool {
// Maximum number of allowed active streams
private final int maxActiveStreams;
private CountDownLatch newChannelInitializerLatch = new CountDownLatch(1);
private final Object lock = new Object();
private final ResourceLock lock = new ResourceLock(); // Use ResourceLock here
private boolean newChannelInitializer = true;

PerRouteConnectionPool(int maxActiveStreams) {
Expand All @@ -70,7 +71,7 @@ static class PerRouteConnectionPool {
*
* @return active TargetChannel
*/
synchronized Http2ClientChannel fetchTargetChannel() {
Http2ClientChannel fetchTargetChannel() {
waitTillNewChannelInitialized();
if (!http2ClientChannels.isEmpty()) {
Http2ClientChannel http2ClientChannel = http2ClientChannels.peek();
Expand All @@ -96,7 +97,7 @@ synchronized Http2ClientChannel fetchTargetChannel() {
// ballerina thread will not take http1.1 thread as the channels queue is not empty. In such cases,
// threads wait on the countdown latch cannot be released until another thread is returned. Hence
// synchronized on a lock
synchronized (lock) {
try (ResourceLock ignored = lock.obtain()) {
if (http2ClientChannels.isEmpty()) {
newChannelInitializer = true;
newChannelInitializerLatch = new CountDownLatch(1);
Expand All @@ -117,7 +118,7 @@ void addChannel(Http2ClientChannel http2ClientChannel) {
}

void releaseCountdown() {
synchronized (lock) {
try (ResourceLock ignored = lock.obtain()) {
newChannelInitializerLatch.countDown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.ballerina.stdlib.http.transport.contract.Constants;
import io.ballerina.stdlib.http.transport.contractimpl.common.HttpRoute;
import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext;
import io.ballerina.stdlib.http.transport.internal.ResourceLock;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class Http2ClientChannel {
private Http2Connection connection;
private ChannelFuture channelFuture;
private HttpRoute httpRoute;
private Http2ConnectionManager http2ConnectionManager;
private final Http2ConnectionManager http2ConnectionManager;
// Whether channel is operates with maximum number of allowed streams
private AtomicBoolean isExhausted = new AtomicBoolean(false);
// Number of active streams. Need to start from 1 to prevent someone stealing the connection from the creator
Expand All @@ -63,6 +64,7 @@ public class Http2ClientChannel {
private StreamCloseListener streamCloseListener;
private long timeSinceMarkedAsStale = 0;
private AtomicBoolean isStale = new AtomicBoolean(false);
private final ResourceLock resourceLock = new ResourceLock();

public Http2ClientChannel(Http2ConnectionManager http2ConnectionManager, Http2Connection connection,
HttpRoute httpRoute, Channel channel) {
Expand Down Expand Up @@ -328,7 +330,7 @@ void removeFromConnectionPool() {
}

void markAsStale() {
synchronized (http2ConnectionManager) {
try (ResourceLock ignored = resourceLock.obtain()) {
isStale.set(true);
http2ConnectionManager.markClientChannelAsStale(httpRoute, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.ballerina.stdlib.http.transport.contractimpl.common.HttpRoute;
import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext;
import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.pool.PoolConfiguration;
import io.ballerina.stdlib.http.transport.internal.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,6 +40,7 @@ public class Http2ConnectionManager {
private final Http2ChannelPool http2ChannelPool = new Http2ChannelPool();
private final BlockingQueue<Http2ClientChannel> http2StaleClientChannels = new LinkedBlockingQueue<>();
private final PoolConfiguration poolConfiguration;
private final ResourceLock resourceLock = new ResourceLock();

public Http2ConnectionManager(PoolConfiguration poolConfiguration) {
this.poolConfiguration = poolConfiguration;
Expand Down Expand Up @@ -124,7 +126,7 @@ private synchronized Http2ChannelPool.PerRouteConnectionPool createPerRouteConne
*/
public Http2ClientChannel fetchChannel(HttpRoute httpRoute) {
Http2ChannelPool.PerRouteConnectionPool perRouteConnectionPool;
synchronized (this) {
try (ResourceLock ignored = resourceLock.obtain()) {
perRouteConnectionPool = getOrCreatePerRoutePool(this.http2ChannelPool, generateKey(httpRoute));
return perRouteConnectionPool != null ? perRouteConnectionPool.fetchTargetChannel() : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package io.ballerina.stdlib.http.transport.message;

import io.ballerina.stdlib.http.transport.internal.ResourceLock;

/**
* Default implementation of the {@link FullHttpMessageFuture}.
*/
Expand All @@ -26,14 +28,15 @@ public class DefaultFullHttpMessageFuture implements FullHttpMessageFuture {
private final HttpCarbonMessage httpCarbonMessage;
private FullHttpMessageListener messageListener;
private Exception error;
private final ResourceLock resourceLock = new ResourceLock();

DefaultFullHttpMessageFuture(HttpCarbonMessage httpCarbonMessage) {
this.httpCarbonMessage = httpCarbonMessage;
}

@Override
public void addListener(FullHttpMessageListener messageListener) {
synchronized (httpCarbonMessage) {
try (ResourceLock ignored = resourceLock.obtain()) {
this.messageListener = messageListener;
if (httpCarbonMessage.isLastHttpContentArrived()) {
notifySuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.ballerina.stdlib.http.transport.message;

import io.ballerina.stdlib.http.transport.internal.ResourceLock;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.slf4j.Logger;
Expand All @@ -31,13 +32,14 @@ public class MessageFuture {
private static final Logger LOG = LoggerFactory.getLogger(MessageFuture.class);
private MessageListener messageListener;
private final HttpCarbonMessage httpCarbonMessage;
private final ResourceLock resourceLock = new ResourceLock();

public MessageFuture(HttpCarbonMessage httpCarbonMessage) {
this.httpCarbonMessage = httpCarbonMessage;
}

public void setMessageListener(MessageListener messageListener) {
synchronized (httpCarbonMessage) {
try (ResourceLock ignored = resourceLock.obtain()) {
this.messageListener = messageListener;
while (!httpCarbonMessage.isEmpty()) {
HttpContent httpContent = httpCarbonMessage.getHttpContent();
Expand Down

0 comments on commit fff337f

Please sign in to comment.