Skip to content

Commit

Permalink
[SCB-2878]operation based load balancer (#4332)
Browse files Browse the repository at this point in the history
  • Loading branch information
liubao68 authored May 10, 2024
1 parent f3ad204 commit ba06fef
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public NonSwaggerInvocation(String appId, String microserviceName) {

@Override
public String getSchemaId() {
throw new UnsupportedOperationException();
return "third-schema";
}

@Override
public String getOperationName() {
throw new UnsupportedOperationException();
return "third-operation";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ public void setDiscoveryFilters(List<DiscoveryFilter> filters) {
}

private void log() {
StringBuilder sb = new StringBuilder();
sb.append("DiscoveryFilters(name, enabled, order, group):");
for (DiscoveryFilter filter : filters) {
LOGGER.info("DiscoveryFilter {}, enabled {}, order {}.",
filter.getClass().getName(), filter.enabled(), filter.getOrder());
sb.append("(").append(filter.getClass().getName()).append(",")
.append(filter.enabled()).append(",").append(filter.getOrder()).append(",")
.append(filter.isGroupingFilter()).append(")");
}
LOGGER.info(sb.toString());
}

boolean isMatch(VersionedCache existing, VersionedCache inputCache) {
Expand Down Expand Up @@ -181,7 +185,7 @@ protected DiscoveryTreeNode doDiscovery(DiscoveryContext context, DiscoveryTreeN
}

// no rerun support, go on even result is empty
// because maybe some filter use other mechanism to create a instance(eg:domain name)
// because maybe some filter use other mechanism to create an instance(eg:domain name)
}

parent = child;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;

import org.apache.servicecomb.config.ConfigUtil;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.foundation.common.LegacyPropertyFactory;

/**
Expand All @@ -30,55 +31,62 @@ public final class Configuration {
//// 2.1 configuration items
public static final String ROOT = "servicecomb.loadbalance.";

public static final String SERVER_EXPIRED_IN_SECONDS = "servicecomb.loadbalance.stats.serverExpiredInSeconds";

public static final String TIMER_INTERVAL_IN_MILLIS = "servicecomb.loadbalance.stats.timerIntervalInMillis";
public static final String RULE_STRATEGY_GLOBAL = "servicecomb.loadbalance.strategy.name";

public static final String RULE_STRATEGY_NAME = "strategy.name";

// 2.0 configuration items
public static final String ROOT_20 = "ribbon.";

// retry configurations
public static final String RETRY_HANDLER = "retryHandler";

// SessionStickinessRule configruation
public static final String SESSION_TIMEOUT_IN_SECONDS = "SessionStickinessRule.sessionTimeoutInSeconds";

public static final String SUCCESSIVE_FAILED_TIMES = "SessionStickinessRule.successiveFailedTimes";

private static final double PERCENT = 100;

public static final String FILTER_ISOLATION = "isolation.";

public static final String FILTER_OPEN = "enabled";

public static final String FILTER_ERROR_PERCENTAGE = "errorThresholdPercentage";

public static final String FILTER_ENABLE_REQUEST = "enableRequestThreshold";

public static final String FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS = "recoverImmediatelyWhenSuccess";

public static final String FILTER_SINGLE_TEST = "singleTestTime";

public static final String FILTER_MAX_SINGLE_TEST_WINDOW = "maxSingleTestWindow";

public static final String FILTER_MIN_ISOLATION_TIME = "minIsolationTime";

public static final String FILTER_CONTINUOUS_FAILURE_THRESHOLD = "continuousFailureThreshold";

public static final String TRANSACTIONCONTROL_OPTIONS_PREFIX_PATTERN =
"servicecomb.loadbalance.%s.transactionControl.options";

public static final Configuration INSTANCE = new Configuration();

public record RuleType(int type, String value) {
public static final int TYPE_SCHEMA = 1;

public static final int TYPE_OPERATION = 2;

public String getValue() {
return value;
}

public int getType() {
return type;
}
}

private Configuration() {
}

public String getRuleStrategyName(String microservice) {
return getStringProperty(null,
ROOT + microservice + "." + RULE_STRATEGY_NAME,
ROOT + RULE_STRATEGY_NAME);
public RuleType getRuleStrategyName(Invocation invocation) {
String value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
invocation.getSchemaId() + "." + invocation.getOperationName() + "." + RULE_STRATEGY_NAME);
if (value != null) {
return new RuleType(RuleType.TYPE_OPERATION, value);
}
value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
invocation.getSchemaId() + "." + RULE_STRATEGY_NAME);
if (value != null) {
return new RuleType(RuleType.TYPE_SCHEMA, value);
}
value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
RULE_STRATEGY_NAME);
if (value != null) {
return new RuleType(RuleType.TYPE_SCHEMA, value);
}
return new RuleType(RuleType.TYPE_SCHEMA,
getStringProperty("RoundRobin", RULE_STRATEGY_GLOBAL));
}

public int getSessionTimeoutInSeconds(String microservice) {
Expand All @@ -105,49 +113,6 @@ public int getSuccessiveFailedTimes(String microservice) {
}
}

public boolean isIsolationFilterOpen(String microservice) {
String p = getStringProperty("true",
ROOT + microservice + "." + FILTER_ISOLATION + FILTER_OPEN,
ROOT + FILTER_ISOLATION + FILTER_OPEN);
return Boolean.parseBoolean(p);
}

public int getErrorThresholdPercentage(String microservice) {
final int defaultValue = 0;
String p = getStringProperty("0",
ROOT + microservice + "." + FILTER_ISOLATION + FILTER_ERROR_PERCENTAGE,
ROOT + FILTER_ISOLATION + FILTER_ERROR_PERCENTAGE);
try {
int result = Integer.parseInt(p);
if (result <= PERCENT && result > 0) {
return result;
}
return defaultValue;
} catch (NumberFormatException e) {
return defaultValue;
}
}

public int getEnableRequestThreshold(String microservice) {
return getThreshold(microservice, FILTER_ENABLE_REQUEST);
}

public int getSingleTestTime(String microservice) {
final int defaultValue = 60000;
String p = getStringProperty("60000",
ROOT + microservice + "." + FILTER_ISOLATION + FILTER_SINGLE_TEST,
ROOT + FILTER_ISOLATION + FILTER_SINGLE_TEST);
try {
int result = Integer.parseInt(p);
if (result >= 0) {
return result;
}
return defaultValue;
} catch (NumberFormatException e) {
return defaultValue;
}
}

public int getMaxSingleTestWindow() {
final int defaultValue = 60000;
String p = getStringProperty(Integer.toString(defaultValue),
Expand All @@ -163,29 +128,6 @@ public int getMaxSingleTestWindow() {
}
}

public int getMinIsolationTime(String microservice) {
final int defaultValue = 3000; // 3 seconds
String p = getStringProperty("3000",
ROOT + microservice + "." + FILTER_ISOLATION + FILTER_MIN_ISOLATION_TIME,
ROOT + FILTER_ISOLATION + FILTER_MIN_ISOLATION_TIME);
try {
int result = Integer.parseInt(p);
if (result >= 0) {
return result;
}
return defaultValue;
} catch (NumberFormatException e) {
return defaultValue;
}
}

public boolean isRecoverImmediatelyWhenSuccess(String microservice) {
String p = getStringProperty("true",
ROOT + microservice + "." + FILTER_ISOLATION + FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS,
ROOT + FILTER_ISOLATION + FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS);
return Boolean.parseBoolean(p);
}

public Map<String, String> getFlowsplitFilterOptions(String microservice) {
String keyPrefix = String.format(TRANSACTIONCONTROL_OPTIONS_PREFIX_PATTERN, microservice);
return ConfigUtil.stringPropertiesWithPrefix(LegacyPropertyFactory.getEnvironment(), keyPrefix);
Expand All @@ -201,20 +143,4 @@ public static String getStringProperty(String defaultValue, String... keys) {
}
return defaultValue;
}

public int getContinuousFailureThreshold(String microservice) {
return getThreshold(microservice, FILTER_CONTINUOUS_FAILURE_THRESHOLD);
}

private int getThreshold(String microservice, String threshold) {
final int defaultValue = 5;
String p = getStringProperty("5",
ROOT + microservice + "." + FILTER_ISOLATION + threshold,
ROOT + FILTER_ISOLATION + threshold);
try {
return Integer.parseInt(p);
} catch (NumberFormatException e) {
return defaultValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@ public interface ExtensionsFactory {
default RuleExt createLoadBalancerRule(String ruleName) {
return null;
}

default ServerListFilterExt createServerListFilter(String key, String value, Object... args) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,19 @@

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExtensionsManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ExtensionsManager.class);

private final List<ExtensionsFactory> extensionsFactories;

public ExtensionsManager(List<ExtensionsFactory> extensionsFactories) {
this.extensionsFactories = extensionsFactories;
}

public RuleExt createLoadBalancerRule(String microservice) {
public RuleExt createLoadBalancerRule(String ruleStrategyName) {
RuleExt rule = null;

for (ExtensionsFactory factory : extensionsFactories) {
if (factory.isSupport(Configuration.RULE_STRATEGY_NAME,
Configuration.INSTANCE.getRuleStrategyName(microservice))) {
rule = factory.createLoadBalancerRule(
Configuration.INSTANCE.getRuleStrategyName(microservice));
if (factory.isSupport(Configuration.RULE_STRATEGY_NAME, ruleStrategyName)) {
rule = factory.createLoadBalancerRule(ruleStrategyName);
break;
}
}
Expand All @@ -46,7 +39,6 @@ public RuleExt createLoadBalancerRule(String microservice) {
rule = new RoundRobinRuleExt();
}

LOGGER.info("Using load balance rule {} for microservice {}.", rule.getClass().getName(), microservice);
return rule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

import java.net.URI;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.config.ConfigurationChangedEvent;
import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.SCBEngine;
Expand All @@ -35,6 +36,8 @@
import org.apache.servicecomb.core.governance.RetryContext;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.loadbalance.Configuration.RuleType;
import org.apache.servicecomb.registry.discovery.DiscoveryContext;
import org.apache.servicecomb.registry.discovery.DiscoveryTree;
import org.apache.servicecomb.swagger.invocation.Response;
Expand All @@ -45,6 +48,7 @@
import org.springframework.beans.factory.annotation.Autowired;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;

import io.github.resilience4j.core.metrics.Metrics.Outcome;
import jakarta.ws.rs.core.Response.Status;
Expand All @@ -70,8 +74,6 @@ public class LoadBalanceFilter extends AbstractFilter implements ConsumerFilter,

private final ExtensionsManager extensionsManager;

private String strategy = null;

private final SCBEngine scbEngine;

// set endpoint in invocation.localContext
Expand All @@ -88,6 +90,22 @@ public LoadBalanceFilter(ExtensionsManager extensionsManager,
boolean.class, false);
this.extensionsManager = extensionsManager;
this.discoveryTree = discoveryTree;
EventManager.register(this);
}

@Subscribe
@SuppressWarnings("unused")
public void onConfigurationChangedEvent(ConfigurationChangedEvent event) {
Set<String> changedKeys = event.getChanged();
for (String key : changedKeys) {
if (key.startsWith(Configuration.ROOT)) {
synchronized (lock) {
clearLoadBalancer();
}
LOGGER.info("clear load balance rule for configuration changed, {}", key);
break;
}
}
}

private void preCheck(SCBEngine scbEngine) {
Expand Down Expand Up @@ -132,15 +150,6 @@ public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode ne

invocation.addLocalContext(RetryContext.RETRY_LOAD_BALANCE, true);

String strategy = Configuration.INSTANCE.getRuleStrategyName(invocation.getMicroserviceName());
if (!Objects.equals(strategy, this.strategy)) {
//配置变化,需要重新生成所有的lb实例
synchronized (lock) {
clearLoadBalancer();
}
}
this.strategy = strategy;

LoadBalancer loadBalancer = getOrCreateLoadBalancer(invocation);

return send(invocation, nextNode, loadBalancer);
Expand Down Expand Up @@ -268,12 +277,22 @@ protected LoadBalancer getOrCreateLoadBalancer(Invocation invocation) {
invocation.getMicroserviceName());
invocation.addLocalContext(CONTEXT_KEY_SERVER_LIST, serversVersionedCache.data());

return loadBalancerMap
.computeIfAbsent(serversVersionedCache.name(), name -> createLoadBalancer(invocation.getMicroserviceName()));
RuleType ruleType = Configuration.INSTANCE.getRuleStrategyName(invocation);
String cacheKey;
if (ruleType.getType() == RuleType.TYPE_SCHEMA) {
cacheKey = invocation.getAppId() + "-" +
invocation.getMicroserviceName() + "-" + invocation.getSchemaId();
} else {
cacheKey = invocation.getAppId() + "-" +
invocation.getMicroserviceName() + "-" + invocation.getSchemaId() + "-" + invocation.getOperationName();
}
return loadBalancerMap.computeIfAbsent(cacheKey,
key -> createLoadBalancer(ruleType, key, invocation.getMicroserviceName()));
}

private LoadBalancer createLoadBalancer(String microserviceName) {
RuleExt rule = extensionsManager.createLoadBalancerRule(microserviceName);
private LoadBalancer createLoadBalancer(RuleType ruleType, String cacheKey, String microserviceName) {
RuleExt rule = extensionsManager.createLoadBalancerRule(ruleType.getValue());
LOGGER.info("Using load balance rule {} for microservice {}.", rule.getClass().getName(), cacheKey);
return new LoadBalancer(rule, microserviceName);
}
}
Loading

0 comments on commit ba06fef

Please sign in to comment.