Skip to content

Commit

Permalink
Merge pull request #1731 from hanbingleixue/develop
Browse files Browse the repository at this point in the history
Fix some bugs in xds flow control
  • Loading branch information
Sherlockhan authored Jan 8, 2025
2 parents 5185ebd + 91e61f3 commit ffbdcd6
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private static XdsRetryPolicy parseRetryPolicy(RetryPolicy retryPolicy) {
if (!StringUtils.isEmpty(retryPolicy.getRetryOn())) {
xdsRetryPolicy.setRetryConditions(Arrays.asList(retryPolicy.getRetryOn().split(CommonConstant.COMMA)));
}
xdsRetryPolicy.setMaxAttempts(retryPolicy.getHostSelectionRetryMaxAttempts());
xdsRetryPolicy.setMaxAttempts(retryPolicy.getNumRetries().getValue());
long perTryTimeout = Duration.ofSeconds(retryPolicy.getPerTryTimeout().getSeconds()).toMillis();
xdsRetryPolicy.setPerTryTimeout(perTryTimeout);
return xdsRetryPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public class CommonConst {
*/
public static final String REQUEST_INFO = "REQUEST_INFO";

/**
* the default contentType
*/
public static final String DEFAULT_CONTENT_TYPE = "text/plain";

private CommonConst() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(null, ex, null, null)) {
if (retryConditionOptional.get().needRetry(this, ex, null, null)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public class XdsCircuitBreakerManager {

/**
* The map that stores the count of active requests, where the Key of the first level is the service name,
* the Key of the second level is the cluster name, the Key of the three level is the server address
* the Key of the second level is the cluster name
*/
private static final Map<String, Map<String, Map<String, AtomicInteger>>> REQUEST_CIRCUIT_BREAKER_MAP =
private static final Map<String, Map<String, AtomicInteger>> REQUEST_CIRCUIT_BREAKER_MAP =
new ConcurrentHashMap<>();

private static final String GATE_WAY_FAILURE = "502,503,504";
Expand All @@ -57,22 +57,20 @@ private XdsCircuitBreakerManager() {
*
* @param serviceName service name
* @param clusterName route name
* @param address request address
* @return active request num
*/
public static int incrementActiveRequests(String serviceName, String clusterName, String address) {
return getActiveRequestCount(serviceName, clusterName, address).incrementAndGet();
public static int incrementActiveRequests(String serviceName, String clusterName) {
return getActiveRequestCount(serviceName, clusterName).incrementAndGet();
}

/**
* decrease Active Request
*
* @param serviceName service name
* @param clusterName route name
* @param address request address
*/
public static void decreaseActiveRequests(String serviceName, String clusterName, String address) {
getActiveRequestCount(serviceName, clusterName, address).decrementAndGet();
public static void decreaseActiveRequests(String serviceName, String clusterName) {
getActiveRequestCount(serviceName, clusterName).decrementAndGet();
}

/**
Expand Down Expand Up @@ -193,11 +191,9 @@ private static XdsCircuitBreakerInfo getCircuitBreakerInfo(String serviceName, S
return instanceCircuitBreakerMap.computeIfAbsent(address, key -> new XdsCircuitBreakerInfo());
}

private static AtomicInteger getActiveRequestCount(String serviceName, String clusterName, String address) {
Map<String, Map<String, AtomicInteger>> clusterCircuitBreakerMap = REQUEST_CIRCUIT_BREAKER_MAP.
private static AtomicInteger getActiveRequestCount(String serviceName, String clusterName) {
Map<String, AtomicInteger> clusterCircuitBreakerMap = REQUEST_CIRCUIT_BREAKER_MAP.
computeIfAbsent(serviceName, key -> new ConcurrentHashMap<>());
Map<String, AtomicInteger> requestCircuitBreakerMap = clusterCircuitBreakerMap.
computeIfAbsent(clusterName, key -> new ConcurrentHashMap<>());
return requestCircuitBreakerMap.computeIfAbsent(address, key -> new AtomicInteger());
return clusterCircuitBreakerMap.computeIfAbsent(clusterName, key -> new AtomicInteger());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public class XdsCircuitBreakerManagerTest {

@Test
public void testActiveRequests() {
assertEquals(1, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS));
assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS));
XdsCircuitBreakerManager.decreaseActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS);
assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS));
assertEquals(1, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME));
assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME));
XdsCircuitBreakerManager.decreaseActiveRequests(SERVICE_NAME, CLUSTER_NAME);
assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public AbstractXdsHttpClientInterceptor(io.sermant.flowcontrol.common.handler.re
public boolean isNeedCircuitBreak() {
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName())
|| StringUtils.isEmpty(scenarioInfo.getClusterName())
|| StringUtils.isEmpty(scenarioInfo.getAddress())) {
|| StringUtils.isEmpty(scenarioInfo.getClusterName())) {
return false;
}
Optional<XdsRequestCircuitBreakers> circuitBreakersOptional = XdsHandler.INSTANCE.
Expand All @@ -98,7 +97,7 @@ public boolean isNeedCircuitBreak() {
return false;
}
int activeRequestNum = XdsCircuitBreakerManager.incrementActiveRequests(scenarioInfo.getServiceName(),
scenarioInfo.getClusterName(), scenarioInfo.getAddress());
scenarioInfo.getClusterName());
int maxRequest = circuitBreakersOptional.get().getMaxRequests();
return maxRequest > 0 && activeRequestNum > maxRequest;
}
Expand Down Expand Up @@ -134,11 +133,11 @@ public void executeWithRetryPolicy(ExecuteContext context) {
context.skip(result);
return;
}
if (ex != null) {
context.setThrowableOut(getRealCause(ex));
if (ex == null) {
context.skip(result);
return;
}
context.skip(result);
context.setThrowableOut(getRealCause(ex));
} catch (Throwable throwable) {
LOGGER.warning(String.format(Locale.ENGLISH,
"Failed to invoke method:%s for few times, reason:%s",
Expand All @@ -147,6 +146,9 @@ public void executeWithRetryPolicy(ExecuteContext context) {
} finally {
RetryContext.INSTANCE.remove();
}
if (context.getThrowableOut() != null) {
onThrow(context);
}
}

@Override
Expand Down Expand Up @@ -183,12 +185,10 @@ public ExecuteContext doThrow(ExecuteContext context) {
private void decreaseActiveRequestsAndCountFailureRequests(ExecuteContext context,
FlowControlScenario scenarioInfo) {
if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName())
|| StringUtils.isEmpty(scenarioInfo.getClusterName())
|| StringUtils.isEmpty(scenarioInfo.getAddress())) {
|| StringUtils.isEmpty(scenarioInfo.getClusterName())) {
return;
}
XdsCircuitBreakerManager.decreaseActiveRequests(scenarioInfo.getServiceName(), scenarioInfo.getClusterName(),
scenarioInfo.getAddress());
XdsCircuitBreakerManager.decreaseActiveRequests(scenarioInfo.getServiceName(), scenarioInfo.getClusterName());
int statusCode = getStatusCode(context);
if (statusCode >= MIN_SUCCESS_CODE && statusCode <= MAX_SUCCESS_CODE) {
return;
Expand Down Expand Up @@ -233,12 +233,16 @@ protected Optional<ServiceInstance> chooseServiceInstanceForXds() {
}
if (StringUtils.isEmpty(scenarioInfo.getClusterName())) {
scenarioInfo.setClusterName(StringUtils.EMPTY);
return Optional.ofNullable(chooseServiceInstanceByLoadBalancer(
XdsHandler.INSTANCE.getServiceInstanceByServiceName(scenarioInfo.getServiceName()), scenarioInfo));
Set<ServiceInstance> serviceInstanceSet = XdsHandler.INSTANCE.
getServiceInstanceByServiceName(scenarioInfo.getServiceName());
if (CollectionUtils.isEmpty(serviceInstanceSet)) {
return Optional.empty();
}
return Optional.ofNullable(chooseServiceInstanceByLoadBalancer(serviceInstanceSet, scenarioInfo));
}
Set<ServiceInstance> serviceInstanceSet = XdsHandler.INSTANCE.
getMatchedServiceInstance(scenarioInfo.getServiceName(), scenarioInfo.getClusterName());
if (serviceInstanceSet.isEmpty()) {
if (CollectionUtils.isEmpty(serviceInstanceSet)) {
return Optional.empty();
}
if (RetryContext.INSTANCE.isPolicyNeedRetry()) {
Expand Down Expand Up @@ -285,7 +289,7 @@ private void removeCircuitBreakerInstance(FlowControlScenario scenarioInfo, Set<
float maxCircuitBreakerPercent = (float) outlierDetection.getMaxEjectionPercent() / HUNDRED;
int maxCircuitBreakerInstances = (int) Math.floor(count * maxCircuitBreakerPercent);
for (ServiceInstance serviceInstance : instanceSet) {
if (maxCircuitBreakerInstances > 0
if (maxCircuitBreakerPercent > 0
&& hasReachedCircuitBreakerThreshold(circuitBreakerInstances, maxCircuitBreakerInstances)) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.MapUtils;
import io.sermant.core.utils.ReflectUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor;
import io.sermant.flowcontrol.common.config.CommonConst;
import io.sermant.flowcontrol.common.handler.retry.AbstractRetry;
Expand Down Expand Up @@ -213,17 +214,14 @@ public boolean isNeedRetry(Throwable throwable, XdsRetryPolicy retryPolicy) {
return false;
}
Optional<String> statusCodeOptional = this.getCode(null);
if (!statusCodeOptional.isPresent()) {
return false;
}
String statusCode = statusCodeOptional.get();
String statusCode = statusCodeOptional.orElse(StringUtils.EMPTY);
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.
getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(null, throwable, statusCode, null)) {
if (retryConditionOptional.get().needRetry(this, throwable, statusCode, null)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -85,9 +87,11 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception {

// When triggering some flow control rules, it is necessary to skip execution and return the result directly
if (flowControlResult.isSkip()) {
String msg = flowControlResult.buildResponseMsg();
Response.Builder builder = new Response.Builder();
context.skip(builder.code(flowControlResult.getResponse().getCode()).protocol(Protocol.HTTP_1_1)
.message(flowControlResult.buildResponseMsg()).request(request).build());
.message(msg).body(ResponseBody.create(MediaType.parse(CommonConst.DEFAULT_CONTENT_TYPE), msg))
.request(request).build());
return context;
}

Expand All @@ -96,7 +100,8 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception {
if (isNeedCircuitBreak()) {
Response.Builder builder = new Response.Builder();
context.skip(builder.code(CommonConst.INTERVAL_SERVER_ERROR)
.message(MESSAGE).request(request).protocol(Protocol.HTTP_1_1).build());
.message(MESSAGE).request(request).protocol(Protocol.HTTP_1_1)
.body(ResponseBody.create(MediaType.parse(CommonConst.DEFAULT_CONTENT_TYPE), MESSAGE)).build());
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import com.squareup.okhttp.Headers;
import com.squareup.okhttp.HttpUrl;
import com.squareup.okhttp.MediaType;
import com.squareup.okhttp.Protocol;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.ResponseBody;

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.service.xds.entity.ServiceInstance;
Expand Down Expand Up @@ -79,8 +81,10 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception {
// When triggering some flow control rules, it is necessary to skip execution and return the result directly
if (flowControlResult.isSkip()) {
Response.Builder builder = new Response.Builder();
String msg = flowControlResult.buildResponseMsg();
context.skip(builder.code(flowControlResult.getResponse().getCode())
.message(flowControlResult.buildResponseMsg()).request(request)
.message(msg).request(request)
.body(ResponseBody.create(MediaType.parse(CommonConst.DEFAULT_CONTENT_TYPE), msg))
.protocol(Protocol.HTTP_1_1).build());
return context;
}
Expand All @@ -90,7 +94,8 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception {
if (isNeedCircuitBreak()) {
Response.Builder builder = new Response.Builder();
context.skip(builder.code(CommonConst.INTERVAL_SERVER_ERROR)
.message(MESSAGE).request(request).protocol(Protocol.HTTP_1_1).build());
.message(MESSAGE).request(request).protocol(Protocol.HTTP_1_1)
.body(ResponseBody.create(MediaType.parse(CommonConst.DEFAULT_CONTENT_TYPE), MESSAGE)).build());
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public ExecuteContext after(ExecuteContext context) throws Exception {
}

@Override
public ExecuteContext onThrow(ExecuteContext context) throws Exception {
public ExecuteContext onThrow(ExecuteContext context) {
if (canInvoke(context)) {
return doThrow(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.entity.RequestEntity;
import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType;
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler;
import io.sermant.flowcontrol.res4j.chain.HandlerConstants;

Expand All @@ -34,6 +35,7 @@ public class XdsBusinessClientRequestHandler extends AbstractXdsChainHandler {
public void onBefore(RequestEntity requestEntity, FlowControlScenario scenario) {
FlowControlScenario matchedScenario = XdsRouteMatchManager.INSTANCE.getMatchedScenarioInfo(
requestEntity, requestEntity.getServiceName());
XdsThreadLocalUtil.setScenarioInfo(matchedScenario);
super.onBefore(requestEntity, matchedScenario);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* @since 2024-12-05
*/
public class XdsFaultRequestHandler extends AbstractXdsChainHandler {
private static final String MESSAGE = "Request has been aborted by fault-ThrowException";
private static final String MESSAGE = "The request has been aborted due to triggering fault injection";

private static final Logger LOGGER = LoggerFactory.getLogger();

Expand Down

0 comments on commit ffbdcd6

Please sign in to comment.