Skip to content

Commit

Permalink
[#3727]use separate filter chain for edge and consumer (#4008)
Browse files Browse the repository at this point in the history
  • Loading branch information
liubao68 authored Nov 2, 2023
1 parent 58bef31 commit 4344043
Show file tree
Hide file tree
Showing 55 changed files with 319 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@
import org.apache.servicecomb.core.CoreConst;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.filter.AbstractFilter;
import org.apache.servicecomb.core.filter.EdgeFilter;
import org.apache.servicecomb.core.filter.Filter;
import org.apache.servicecomb.core.filter.FilterNode;
import org.apache.servicecomb.core.filter.ProviderFilter;
import org.apache.servicecomb.foundation.common.utils.AsyncUtils;
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
import org.apache.servicecomb.foundation.vertx.stream.BufferOutputStream;
import org.apache.servicecomb.swagger.invocation.InvocationType;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.context.TransportContext;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,7 +54,7 @@
import jakarta.servlet.http.Part;
import jakarta.ws.rs.core.HttpHeaders;

public class RestServerCodecFilter implements ProviderFilter {
public class RestServerCodecFilter extends AbstractFilter implements ProviderFilter, EdgeFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(RestServerCodecFilter.class);

public static final String NAME = "rest-server-codec";
Expand All @@ -63,7 +65,7 @@ public String getName() {
}

@Override
public int getOrder(InvocationType invocationType, String application, String serviceName) {
public int getOrder() {
// almost time, should be the first filter.
return Filter.PROVIDER_SCHEDULE_FILTER_ORDER - 2000;
}
Expand All @@ -83,6 +85,10 @@ public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode ne
}

protected CompletableFuture<Response> invokeNext(Invocation invocation, FilterNode nextNode) {
if (invocation.isEdge()) {
TransportContext transportContext = invocation.getTransportContext();
return nextNode.onFilter(invocation).whenComplete((r, e) -> invocation.setTransportContext(transportContext));
}
return nextNode.onFilter(invocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private void createProducerMicroserviceMeta() {
String microserviceName = BootStrapProperties.readServiceName(environment);
producerMicroserviceMeta = new MicroserviceMeta(this,
BootStrapProperties.readApplication(environment), microserviceName, false);
producerMicroserviceMeta.setFilterChain(filterChainsManager.findProducerChain(
producerMicroserviceMeta.setProviderFilterChain(filterChainsManager.findProducerChain(
BootStrapProperties.readApplication(environment), microserviceName));
producerMicroserviceMeta.setMicroserviceVersionsMeta(new MicroserviceVersionsMeta(this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.servicecomb.core.bootstrap;

import java.util.Arrays;
import java.util.List;

import org.apache.servicecomb.config.priority.ConfigObjectFactory;
import org.apache.servicecomb.config.priority.PriorityPropertyFactory;
import org.apache.servicecomb.config.priority.PriorityPropertyManager;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.filter.Filter;
import org.apache.servicecomb.core.filter.FilterChainsManager;
import org.apache.servicecomb.core.filter.impl.EmptyFilter;
import org.apache.servicecomb.foundation.common.event.EventManager;
Expand All @@ -37,11 +35,12 @@
*/
public class SCBEngineForTest extends SCBEngine {
public SCBEngineForTest(Environment environment) {
List<Filter> filters = Arrays.asList(
new EmptyFilter()
);
EmptyFilter emptyFilter = new EmptyFilter();
emptyFilter.setEnvironment(environment);
setFilterChainsManager(new FilterChainsManager()
.addFilters(filters));
.setProviderFilters(List.of(emptyFilter))
.setConsumerFilters(List.of(emptyFilter))
.setEdgeFilters(List.of(emptyFilter)));

PriorityPropertyFactory propertyFactory = new PriorityPropertyFactory(environment);
ConfigObjectFactory configObjectFactory = new ConfigObjectFactory(propertyFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@

import io.swagger.v3.oas.models.OpenAPI;

/**
* should named MicroserviceVersionMeta<br>
* but for compatible reason, keep the old name
*/
public class MicroserviceMeta {
private final SCBEngine scbEngine;

Expand All @@ -45,9 +41,14 @@ public class MicroserviceMeta {
// key is OperationMeta.getMicroserviceQualifiedName()
private final Map<String, OperationMeta> operationMetas = new HashMap<>();

// Used to indicate configuration items type. EDGE & CONSUMER chain are all consumer.
private final boolean consumer;

private FilterNode filterChain = FilterNode.EMPTY;
private FilterNode consumerFilterChain = FilterNode.EMPTY;

private FilterNode providerFilterChain = FilterNode.EMPTY;

private FilterNode edgeFilterChain = FilterNode.EMPTY;

private final VendorExtensions vendorExtensions = new VendorExtensions();

Expand Down Expand Up @@ -132,11 +133,27 @@ public <T> T getExtData(String key) {
return vendorExtensions.get(key);
}

public FilterNode getFilterChain() {
return filterChain;
public FilterNode getConsumerFilterChain() {
return consumerFilterChain;
}

public void setConsumerFilterChain(FilterNode consumerFilterChain) {
this.consumerFilterChain = consumerFilterChain;
}

public FilterNode getProviderFilterChain() {
return providerFilterChain;
}

public void setProviderFilterChain(FilterNode providerFilterChain) {
this.providerFilterChain = providerFilterChain;
}

public FilterNode getEdgeFilterChain() {
return edgeFilterChain;
}

public void setFilterChain(FilterNode filterChain) {
this.filterChain = filterChain;
public void setEdgeFilterChain(FilterNode edgeFilterChain) {
this.edgeFilterChain = edgeFilterChain;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.core.filter;

import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;

public abstract class AbstractFilter implements Filter, EnvironmentAware {
private static final String ORDER_KEY = "servicecomb.filter.%s.%s.%s.order";

private static final String ENABLE_KEY = "servicecomb.filter.%s.%s.%s.enabled";

protected Environment environment;

@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}

@Override
public int getOrder(String application, String serviceName) {
Integer custom = environment.getProperty(String.format(ORDER_KEY, getName(), application, serviceName),
Integer.class);
if (custom != null) {
return custom;
}
return getOrder();
}

@Override
public boolean enabledForMicroservice(String application, String serviceName) {
Boolean custom = environment.getProperty(String.format(ENABLE_KEY, getName(), application, serviceName),
Boolean.class);
if (custom != null) {
return custom;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package org.apache.servicecomb.core.filter;

import org.apache.servicecomb.swagger.invocation.InvocationType;

public interface ConsumerFilter extends Filter {
@Override
default boolean enabledForInvocationType(InvocationType invocationType) {
return invocationType == InvocationType.CONSUMER;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
@Configuration
public class CoreFilterConfiguration {
@Bean
public ProviderFilter producerOperationFilter() {
public ProviderOperationFilter producerOperationFilter() {
return new ProviderOperationFilter();
}

@Bean
public ProviderFilter scheduleFilter() {
public ScheduleFilter scheduleFilter() {
return new ScheduleFilter();
}

Expand All @@ -40,7 +40,7 @@ public FilterChainsManager filterChainsManager() {
}

@Bean
public ProviderFilter parameterValidatorFilter() {
public ParameterValidatorFilter parameterValidatorFilter() {
return new ParameterValidatorFilter();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.core.filter;

public interface EdgeFilter extends Filter {

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
import java.util.concurrent.CompletableFuture;

import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.swagger.invocation.InvocationType;
import org.apache.servicecomb.swagger.invocation.Response;
import org.springframework.core.Ordered;

/**
* <pre>
* Filters are the basics of how an invocation is executed.
*
* thread rule:
* assume a producer filter chains is: f1, f2, schedule, f3, f4
* assume a provider filter chains is: f1, f2, schedule, f3, f4
*
* schedule is a builtIn filter, which will dispatch invocations to operation related threadPool
*
Expand All @@ -47,15 +47,11 @@
* (<a href="https://vertx.io/docs/vertx-core/java/#golden_rule">reactive golden rule</a>)
* </pre>
*/
public interface Filter {
public interface Filter extends Ordered {
int PROVIDER_SCHEDULE_FILTER_ORDER = 0;

int CONSUMER_LOAD_BALANCE_ORDER = 0;

default boolean enabledForInvocationType(InvocationType invocationType) {
return true;
}

default boolean enabledForTransport(String transport) {
return true;
}
Expand All @@ -64,7 +60,11 @@ default boolean enabledForMicroservice(String application, String serviceName) {
return true;
}

default int getOrder(InvocationType invocationType, String application, String serviceName) {
default int getOrder(String application, String serviceName) {
return 0;
}

default int getOrder() {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,30 @@
import java.util.List;
import java.util.stream.Collectors;

import org.apache.servicecomb.swagger.invocation.InvocationType;
import org.springframework.beans.factory.annotation.Autowired;

public class FilterChainsManager {
private final InvocationFilterChains consumerChains = new InvocationFilterChains(InvocationType.CONSUMER);
private InvocationFilterChains consumerChains;

private final InvocationFilterChains producerChains = new InvocationFilterChains(InvocationType.PROVIDER);
private InvocationFilterChains providerChains;

private InvocationFilterChains edgeChains;

@Autowired
public FilterChainsManager addFilters(List<Filter> filters) {
for (Filter filter : filters) {
if (filter.enabledForInvocationType(InvocationType.CONSUMER)) {
consumerChains.addFilter(filter);
}
public FilterChainsManager setEdgeFilters(List<EdgeFilter> filters) {
edgeChains = new InvocationFilterChains(filters);
return this;
}

if (filter.enabledForInvocationType(InvocationType.PROVIDER)) {
producerChains.addFilter(filter);
}
}
@Autowired
public FilterChainsManager setConsumerFilters(List<ConsumerFilter> filters) {
consumerChains = new InvocationFilterChains(filters);
return this;
}

@Autowired
public FilterChainsManager setProviderFilters(List<ProviderFilter> filters) {
providerChains = new InvocationFilterChains(filters);
return this;
}

Expand All @@ -54,24 +58,31 @@ public FilterNode findConsumerChain(String application, String serviceName) {
}

public FilterNode findProducerChain(String application, String serviceName) {
return producerChains.findChain(application, serviceName);
return providerChains.findChain(application, serviceName);
}

public FilterNode findEdgeChain(String application, String serviceName) {
return edgeChains.findChain(application, serviceName);
}

public String collectResolvedChains() {
StringBuilder sb = new StringBuilder();

appendLine(sb, "consumer: ");
appendLine(sb, " filters: %s", collectFilterNames(consumerChains, InvocationType.CONSUMER));
appendLine(sb, " filters: %s", collectFilterNames(consumerChains));

appendLine(sb, "producer: ");
appendLine(sb, " filters: %s", collectFilterNames(producerChains, InvocationType.PROVIDER));
appendLine(sb, " filters: %s", collectFilterNames(providerChains));

appendLine(sb, "edge: ");
appendLine(sb, " filters: %s", collectFilterNames(edgeChains));

return deleteLast(sb, 1).toString();
}

private List<String> collectFilterNames(InvocationFilterChains chains, InvocationType invocationType) {
private List<String> collectFilterNames(InvocationFilterChains chains) {
return chains.getFilters().stream()
.map(filter -> filter.getName() + "(" + filter.getOrder(invocationType, null, null) + ")")
.map(filter -> filter.getName() + "(" + filter.getOrder() + ")")
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private void setNextNode(FilterNode nextNode) {
}

public CompletableFuture<Response> onFilter(Invocation invocation) {
if (!filter.enabledForTransport(invocation.getTransportName())) {
// When transport name is empty, maybe edge and transport filters need to be executed.
// And we can't set Endpoint before load balance in edge.
if (invocation.getTransportName() != null && !filter.enabledForTransport(invocation.getTransportName())) {
return nextNode.onFilter(invocation);
}

Expand Down
Loading

0 comments on commit 4344043

Please sign in to comment.