Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interceptor support for GRPC APIs #2556

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 48 additions & 30 deletions adapter/internal/oasparser/model/adapter_internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,45 +1274,63 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoGRPCRouteCR(grpcRoute *gwap
var policies = OperationPolicies{}
var endPoints []Endpoint
resourceAuthScheme := authScheme
resourceAPIPolicy := apiPolicy
resourceRatelimitPolicy := ratelimitPolicy
var scopes []string
for _, filter := range rule.Filters {
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindAuthentication {
if ref, found := resourceParams.ResourceAuthSchemes[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceAuthScheme = concatAuthSchemes(authScheme, &ref)
} else {
return fmt.Errorf(`auth scheme: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level Authentications`, filter.ExtensionRef.Name)
switch filter.Type {
case gwapiv1a2.GRPCRouteFilterExtensionRef:
if filter.ExtensionRef.Kind == constants.KindAuthentication {
if ref, found := resourceParams.ResourceAuthSchemes[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceAuthScheme = concatAuthSchemes(authScheme, &ref)
} else {
return fmt.Errorf(`auth scheme: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level Authentications`, filter.ExtensionRef.Name)
}
}
}
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindScope {
if ref, found := resourceParams.ResourceScopes[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
scopes = ref.Spec.Names
disableScopes = false
} else {
return fmt.Errorf("scope: %s has not been resolved in namespace %s", filter.ExtensionRef.Name, grpcRoute.Namespace)
if filter.ExtensionRef.Kind == constants.KindAPIPolicy {
if ref, found := resourceParams.ResourceAPIPolicies[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceAPIPolicy = concatAPIPolicies(apiPolicy, &ref)
} else {
return fmt.Errorf(`apipolicy: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level APIPolicies`, filter.ExtensionRef.Name)
}
}
}
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindRateLimitPolicy {
if ref, found := resourceParams.ResourceRateLimitPolicies[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceRatelimitPolicy = concatRateLimitPolicies(ratelimitPolicy, &ref)
} else {
return fmt.Errorf(`ratelimitpolicy: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level RateLimitPolicies`, filter.ExtensionRef.Name)
if filter.ExtensionRef.Kind == constants.KindScope {
if ref, found := resourceParams.ResourceScopes[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
scopes = ref.Spec.Names
disableScopes = false
} else {
return fmt.Errorf("scope: %s has not been resolved in namespace %s", filter.ExtensionRef.Name, grpcRoute.Namespace)
}
}
if filter.ExtensionRef.Kind == constants.KindRateLimitPolicy {
if ref, found := resourceParams.ResourceRateLimitPolicies[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceRatelimitPolicy = concatRateLimitPolicies(ratelimitPolicy, &ref)
} else {
return fmt.Errorf(`ratelimitpolicy: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level RateLimitPolicies`, filter.ExtensionRef.Name)
}
}
}
}

resourceAPIPolicy = concatAPIPolicies(resourceAPIPolicy, nil)
resourceAuthScheme = concatAuthSchemes(resourceAuthScheme, nil)
resourceRatelimitPolicy = concatRateLimitPolicies(resourceRatelimitPolicy, nil)
addOperationLevelInterceptors(&policies, resourceAPIPolicy, resourceParams.InterceptorServiceMapping, resourceParams.BackendMapping, grpcRoute.Namespace)

loggers.LoggerOasparser.Debugf("Calculating auths for API ..., API_UUID = %v", adapterInternalAPI.UUID)
apiAuth := getSecurity(resourceAuthScheme)
Expand All @@ -1321,7 +1339,7 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoGRPCRouteCR(grpcRoute *gwap
resourcePath := adapterInternalAPI.GetXWso2Basepath() + "." + *match.Method.Service + "/" + *match.Method.Method
endPoints = append(endPoints, GetEndpoints(backendName, resourceParams.BackendMapping)...)
resource := &Resource{path: resourcePath, pathMatchType: "Exact",
methods: []*Operation{{iD: uuid.New().String(), method: "GRPC", policies: policies,
methods: []*Operation{{iD: uuid.New().String(), method: "POST", policies: policies,
auth: apiAuth, rateLimitPolicy: parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes: scopes}},
iD: uuid.New().String(),
}
Expand Down
45 changes: 41 additions & 4 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
// handle gRPC APIs
if len(prodRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == constants.GRPC {
if apiState.ProdGRPCRoute, err = apiReconciler.resolveGRPCRouteRefs(ctx, prodRouteRefs,
namespace, api); err != nil {
namespace, apiState.InterceptorServiceMapping, api); err != nil {
return nil, fmt.Errorf("error while resolving production grpcRouteref %s in namespace :%s was not found. %s",
prodRouteRefs, namespace, err.Error())
}
Expand All @@ -483,7 +483,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1

if len(sandRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == constants.GRPC {
if apiState.SandGRPCRoute, err = apiReconciler.resolveGRPCRouteRefs(ctx, sandRouteRefs,
namespace, api); err != nil {
namespace, apiState.InterceptorServiceMapping, api); err != nil {
return nil, fmt.Errorf("error while resolving sandbox grpcRouteref %s in namespace :%s was not found. %s",
sandRouteRefs, namespace, err.Error())
}
Expand Down Expand Up @@ -603,11 +603,15 @@ func (apiReconciler *APIReconciler) resolveHTTPRouteRefs(ctx context.Context, ht
}

func (apiReconciler *APIReconciler) resolveGRPCRouteRefs(ctx context.Context, grpcRouteRefs []string,
namespace string, api dpv1alpha3.API) (*synchronizer.GRPCRouteState, error) {
namespace string, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService, api dpv1alpha3.API) (*synchronizer.GRPCRouteState, error) {
grpcRouteState, err := apiReconciler.concatGRPCRoutes(ctx, grpcRouteRefs, namespace, api)
if err != nil {
return nil, err
}
grpcRouteState.BackendMapping, err = apiReconciler.getResolvedBackendsMappingForGRPC(ctx, &grpcRouteState, interceptorServiceMapping, api)
if err != nil {
return nil, err
}
grpcRouteState.Scopes, err = apiReconciler.getScopesForGRPCRoute(ctx, grpcRouteState.GRPCRouteCombined, api)
return &grpcRouteState, err
}
Expand All @@ -633,7 +637,6 @@ func (apiReconciler *APIReconciler) concatGRPCRoutes(ctx context.Context, grpcRo
}
grpcRouteState.GRPCRoutePartitions = grpcRoutePartitions
backendNamespacedName := types.NamespacedName{
//TODO check if this is correct
Name: string(grpcRouteState.GRPCRouteCombined.Spec.Rules[0].BackendRefs[0].BackendRef.Name),
Namespace: namespace,
}
Expand Down Expand Up @@ -1065,6 +1068,40 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
return backendMapping, airl, nil
}

func (apiReconciler *APIReconciler) getResolvedBackendsMappingForGRPC(ctx context.Context,
grpcRouteState *synchronizer.GRPCRouteState, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService,
api dpv1alpha3.API) (map[string]*dpv1alpha2.ResolvedBackend, error) {
backendMapping := make(map[string]*dpv1alpha2.ResolvedBackend)
grpcRoute := grpcRouteState.GRPCRouteCombined

for _, rule := range grpcRoute.Spec.Rules {
for _, backend := range rule.BackendRefs {
backendNamespacedName := types.NamespacedName{
Name: string(backend.Name),
Namespace: utils.GetNamespace(backend.Namespace, grpcRoute.Namespace),
}
if _, exists := backendMapping[backendNamespacedName.String()]; !exists {
resolvedBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, backendNamespacedName, &api)
if resolvedBackend != nil {
backendMapping[backendNamespacedName.String()] = resolvedBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", backendNamespacedName.String())
}
}
}
}

// Resolve backends in InterceptorServices
interceptorServices := maps.Values(interceptorServiceMapping)
for _, interceptorService := range interceptorServices {
utils.ResolveAndAddBackendToMapping(ctx, apiReconciler.client, backendMapping,
interceptorService.Spec.BackendRef, interceptorService.Namespace, &api)
}

loggers.LoggerAPKOperator.Debugf("Generated backendMapping: %v", backendMapping)
return backendMapping, nil
}

// These proxy methods are designed as intermediaries for the getAPIsFor<CR objects> methods.
// Their purpose is to encapsulate the process of updating owner references within the reconciliation watch methods.
// By employing these proxies, we prevent redundant owner reference updates for the same object due to the hierarchical structure of these functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import io.cucumber.java.en.And;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -248,6 +249,18 @@ public void GetStudent(String arg0, int arg1) throws StatusRuntimeException {
}
}

@And("the GRPC response should contain header {string}")
public void GetGRPCMetadata(String arg0) throws StatusRuntimeException {
try {
String header = SimpleGRPCStudentClient.getResponseHeader(arg0);
Assert.assertNotNull(header);
Assert.assertEquals(header, "Interceptor-Response-header-value");
} catch (StatusRuntimeException e) {
sharedContext.setGrpcStatusCode(e.getStatus().getCode().value());
logger.error(e.getMessage() + " Status code: " + e.getStatus().getCode().value());
}
}

@Then("I make grpc request GetStudent default version to {string} with port {int}")
public void GetStudentDefaultVersion(String arg0, int arg1) throws StatusRuntimeException {
try {
Expand Down Expand Up @@ -329,8 +342,9 @@ public void checkEnforcerLogs(DataTable dataTable) throws IOException, Interrupt
}
try {
String logs = api.readNamespacedPodLog(podName, namespace).container("enforcer").sinceSeconds(60).execute();
Assert.assertNotNull(logs, String.format("Could not find any logs in the last 60 seconds. PodName: %s, namespace: %s", podName, namespace));
for(String word : stringsToCheck) {
Assert.assertNotNull(logs, String.format(
"Could not find any logs in the last 60 seconds. PodName: %s, namespace: %s", podName, namespace));
for (String word : stringsToCheck) {
Assert.assertTrue(logs.contains(word), "Expected word '" + word + "' not found in logs");
}
} catch (ApiException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,32 @@

import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Channel;
import java.util.Map;

import java.util.Map;

public class GenericClientInterceptor implements ClientInterceptor {

private Map<String, String> headers;
private Metadata responseHeaders;

public GenericClientInterceptor(Map<String, String> headers) {
this.headers = headers;
}

public void setResponseHeaders(Metadata responseHeaders) {
this.responseHeaders = responseHeaders;
}

public Metadata getResponseHeaders() {
return this.responseHeaders;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
Expand All @@ -31,7 +40,18 @@ public void start(Listener<RespT> responseListener, Metadata headersMetadata) {
headers.forEach((key, value) -> headersMetadata.put(
Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value));

super.start(responseListener, headersMetadata);
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
/**
* if you don't need receive header from server,
* you can use {@link io.grpc.stub.MetadataUtils#attachHeaders}
* directly to send header
*/
setResponseHeaders(headers);
super.onHeaders(headers);
}
}, headersMetadata);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.commons.logging.LogFactory;

import io.grpc.ManagedChannel;
import io.grpc.Metadata;

import org.wso2.apk.integration.utils.GenericClientInterceptor;
import org.wso2.apk.integration.utils.clients.student_service.StudentRequest;
import org.wso2.apk.integration.utils.clients.student_service.StudentResponse;
Expand All @@ -24,6 +26,7 @@ public class SimpleGRPCStudentClient {
private static final int EVENTUAL_SUCCESS_RESPONSE_TIMEOUT_IN_SECONDS = 10;
private final String host;
private final int port;
private static Metadata responseHeaders;

public SimpleGRPCStudentClient(String host, int port) {
this.host = host;
Expand Down Expand Up @@ -53,6 +56,7 @@ public StudentResponse GetStudent(Map<String, String> headers) throws StatusRunt
log.error("Failed to get student");
throw new RuntimeException("Failed to get student");
}
setResponseHeaders(interceptor.getResponseHeaders());
return response;
} catch (SSLException e) {
throw new RuntimeException("Failed to create SSL context", e);
Expand All @@ -72,6 +76,18 @@ public StudentResponse GetStudent(Map<String, String> headers) throws StatusRunt
}
}

public static String getResponseHeader(String headerName) {
Metadata.Key<String> headerValue = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
if (responseHeaders == null) {
return "";
}
return responseHeaders.get(headerValue);
}

public void setResponseHeaders(Metadata metadata) {
SimpleGRPCStudentClient.responseHeaders = metadata;
}

public StudentResponse GetStudentDefaultVersion(Map<String, String> headers) throws StatusRuntimeException {
ManagedChannel managedChannel = null;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: "6a254687f3229c35dd0189aac7f7fc4b6228e97a"
basePath: "/org.apk"
version: "v1"
type: "GRPC"
id: "grpc-interceptor-api"
endpointConfigurations:
production:
endpoint: "http://grpc-backend:6565"
defaultVersion: false
subscriptionValidation: false
operations:
- target: "student_service.StudentService"
verb: "GetStudent"
secured: true
scopes: []
- target: "student_service.StudentService"
verb: "GetStudentStream"
secured: true
scopes: []
- target: "student_service.StudentService"
verb: "SendStudentStream"
secured: true
scopes: []
- target: "student_service.StudentService"
verb: "SendAndGetStudentStream"
secured: true
scopes: []
apiPolicies:
request:
- policyName: "Interceptor"
policyVersion: v1
parameters:
backendUrl: "http://interceptor-service.apk-integration-test.svc.cluster.local:8443"
contextEnabled: true
headersEnabled: true
bodyEnabled: true
response:
- policyName: "Interceptor"
policyVersion: v1
parameters:
backendUrl: "http://interceptor-service.apk-integration-test.svc.cluster.local:8443"
contextEnabled: true
headersEnabled: true
bodyEnabled: true
21 changes: 21 additions & 0 deletions test/cucumber-tests/src/test/resources/tests/api/GRPC.feature
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,25 @@ Feature: Generating APK conf for gRPC API
Given The system is ready
And I have a valid subscription
When I undeploy the API whose ID is "grpc-default-version-api"
Then the response status code should be 202

Scenario: Deploying gRPC API with interceptor policy
Given The system is ready
And I have a valid subscription
When I use the APK Conf file "artifacts/apk-confs/grpc/grpc-interceptor.apk-conf"
And the definition file "artifacts/definitions/student.proto"
And make the API deployment request
Then the response status code should be 200
Then I set headers
| Authorization | bearer ${accessToken} |
And I make grpc request GetStudent to "default.gw.wso2.com" with port 9095
And the gRPC response status code should be 0
And the student response body should contain name: "Student" age: 10
And the GRPC response should contain header "interceptor-response-header"


Scenario: Undeploy API
Given The system is ready
And I have a valid subscription
When I undeploy the API whose ID is "grpc-interceptor-api"
Then the response status code should be 202
Loading