Skip to content

Commit

Permalink
Syncer supports to enable rbac (#1487)
Browse files Browse the repository at this point in the history
  • Loading branch information
humingcheng authored Aug 13, 2024
1 parent b133e42 commit 983f3f3
Show file tree
Hide file tree
Showing 14 changed files with 404 additions and 41 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/eventbase-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
uses: actions/checkout@v1
- name: UT test
run: |
sudo docker-compose -f ./scripts/docker-compose.yaml up -d
sudo docker compose -f ./scripts/docker-compose.yaml up -d
sleep 20
export TEST_DB_MODE=mongo
export TEST_DB_URI=mongodb://127.0.0.1:27017
Expand All @@ -31,7 +31,7 @@ jobs:
uses: actions/checkout@v1
- name: UT for etcd
run: |
time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379
time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379
while ! nc -z 127.0.0.1 2379; do
sleep 1
done
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/static_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
uses: actions/checkout@v1
- name: UT-MONGO
run: |
sudo docker-compose -f ./scripts/docker-compose.yaml up -d
sudo docker compose -f ./scripts/docker-compose.yaml up -d
sleep 20
bash -x scripts/ut_test_in_docker.sh mongo
integration-test:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
version: '3'
services:
etcd:
image: 'quay.io/coreos/etcd:latest'
image: 'quay.io/coreos/etcd:v3.5.15'
# restart: always
#ports:
# - "2379:2379"
Expand Down
2 changes: 2 additions & 0 deletions etc/conf/syncer.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
sync:
enableOnStart: false
rbacEnabled: false
peers:
- name: dc
kind: servicecomb
endpoints: ["127.0.0.1:30105"]
# only allow mode implemented in incremental approach like push, watch(such as pub/sub, long polling)
mode: [push]
token:
tombstone:
retire:
# use linux crontab not Quartz cron
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ set +e
docker rm -f etcd
kill -9 $(ps aux | grep 'service-center' | awk '{print $2}')
set -e
sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
while ! nc -z 127.0.0.1 2379; do
echo "Waiting Etcd to launch on 2379..."
sleep 1
Expand Down
2 changes: 1 addition & 1 deletion scripts/ut_test_in_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ echo "${green}Starting Unit Testing for Service Center${reset}"

if [ "${db_name}" == "etcd" ];then
echo "${green}Starting etcd in docker${reset}"
docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
while ! nc -z 127.0.0.1 2379; do
echo "Waiting Etcd to launch on 2379..."
sleep 1
Expand Down
13 changes: 10 additions & 3 deletions syncer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"fmt"
"path/filepath"

"github.com/go-chassis/go-archaius"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/go-chassis/go-archaius"
)

var config Config
Expand All @@ -33,15 +34,21 @@ type Config struct {
}

type Sync struct {
EnableOnStart bool `yaml:"enableOnStart"`
Peers []*Peer `yaml:"peers"`
EnableOnStart bool `yaml:"enableOnStart"`
// When RbacEnabled is true, syncer's API requires the rbac token,
// and service-center also provides the rbac token to communicate with peer.
// At the same time, service-center rbac must be enabled.
RbacEnabled bool `yaml:"rbacEnabled"`
Peers []*Peer `yaml:"peers"`
}

type Peer struct {
Name string `yaml:"name"`
Kind string `yaml:"kind"`
Endpoints []string `yaml:"endpoints"`
Mode []string `yaml:"mode"`
// The token to communicate with peer, this takes effect only when RbacEnabled is true
Token string `yaml:"token"`
}

func Init() error {
Expand Down
63 changes: 63 additions & 0 deletions syncer/rpc/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package rpc

import (
"context"
"fmt"
"strings"

"github.com/go-chassis/cari/rbac"
"github.com/go-chassis/go-chassis/v2/security/authr"
"github.com/go-chassis/go-chassis/v2/server/restful"
"google.golang.org/grpc/metadata"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/config"
)

var errWrongAccountNorRole = fmt.Errorf("account should be %s, and roles should contain %s", RbacAllowedAccountName, RbacAllowedRoleName)

func auth(ctx context.Context) error {
if !config.GetConfig().Sync.RbacEnabled {
return nil
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return rbac.NewError(rbac.ErrNoAuthHeader, "")
}

authHeader := md.Get(restful.HeaderAuth)
if len(authHeader) == 0 {
return rbac.NewError(rbac.ErrNoAuthHeader, fmt.Sprintf("header %s not found nor content empty", restful.HeaderAuth))
}

s := strings.Split(authHeader[0], " ")
if len(s) != 2 {
return rbac.ErrInvalidHeader
}
to := s[1]

claims, err := authr.Authenticate(ctx, to)
if err != nil {
return err
}
m, ok := claims.(map[string]interface{})
if !ok {
log.Error("claims convert failed", rbac.ErrConvert)
return rbac.ErrConvert
}
account, err := rbac.GetAccount(m)
if err != nil {
log.Error("get account from token failed", err)
return err
}

if account.Name != RbacAllowedAccountName {
return errWrongAccountNorRole
}
for _, role := range account.Roles {
if role == RbacAllowedRoleName {
return nil
}
}
return errWrongAccountNorRole
}
130 changes: 130 additions & 0 deletions syncer/rpc/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package rpc

import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"

"github.com/go-chassis/cari/pkg/errsvc"
"github.com/go-chassis/cari/rbac"
"github.com/go-chassis/go-chassis/v2/security/authr"
"github.com/go-chassis/go-chassis/v2/server/restful"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/metadata"

"github.com/apache/servicecomb-service-center/syncer/config"
)

type testAuth struct{}

func (testAuth) Login(ctx context.Context, user string, password string, opts ...authr.LoginOption) (string, error) {
return "", nil
}

func (testAuth) Authenticate(ctx context.Context, token string) (interface{}, error) {
var claim map[string]interface{}
return claim, json.Unmarshal([]byte(token), &claim)
}

func Test_auth(t *testing.T) {
// use the custom auth plugin
authr.Install("test", func(opts *authr.Options) (authr.Authenticator, error) {
return testAuth{}, nil
})
assert.NoError(t, authr.Init(authr.WithPlugin("test")))

type args struct {
ctx context.Context
}
tests := []struct {
name string
preDo func()
args args
wantErr assert.ErrorAssertionFunc
}{
{
name: "sync rbac disables",
preDo: func() {
config.SetConfig(config.Config{
Sync: &config.Sync{
RbacEnabled: false,
}})
},
args: args{
ctx: context.Background(), // rbac disabled, empty ctx should pass the auth
},
wantErr: assert.NoError,
},
{
name: "no header",
preDo: func() {
config.SetConfig(config.Config{
Sync: &config.Sync{
RbacEnabled: true,
}})
},
args: args{
ctx: context.Background(), // rbac enabled, empty ctx should not pass the auth
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
var errSvcErr *errsvc.Error
ok := errors.As(err, &errSvcErr)
assert.True(t, ok)

return assert.Equal(t, rbac.ErrNoAuthHeader, errSvcErr.Code)
},
},
{
name: "with header but no auth header",
args: args{
ctx: metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"fake": "fake"})),
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
var errSvcErr *errsvc.Error
ok := errors.As(err, &errSvcErr)
assert.True(t, ok)

return assert.Equal(t, rbac.ErrNoAuthHeader, errSvcErr.Code)
},
},
{
name: "auth header format error",
args: args{
ctx: metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{restful.HeaderAuth: "fake"})),
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
return assert.Equal(t, rbac.ErrInvalidHeader, err)
},
},
{
name: "wrong account nor role",
args: args{
ctx: metadata.NewIncomingContext(context.Background(),
metadata.New(map[string]string{restful.HeaderAuth: `Bear {"account":"x","roles":["x"]}`})),
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
return assert.Equal(t, errWrongAccountNorRole, err)
},
},
{
name: "valid token",
args: args{
ctx: metadata.NewIncomingContext(context.Background(),
metadata.New(map[string]string{restful.HeaderAuth: `Bear {"account":"sync-user","roles":["sync-admin"]}`})),
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
return assert.NoError(t, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.preDo != nil {
tt.preDo()
}
tt.wantErr(t, auth(tt.args.ctx), fmt.Sprintf("auth(%v)", tt.args.ctx))
})
}
}
38 changes: 34 additions & 4 deletions syncer/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ import (
"fmt"
"time"

"github.com/apache/servicecomb-service-center/syncer/service/replicator"
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"

"github.com/apache/servicecomb-service-center/pkg/log"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/service/replicator"
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
)

const (
HealthStatusConnected = "CONNECTED"
HealthStatusAbnormal = "ABNORMAL"
HealthStatusClose = "CLOSE"
HealthStatusAuthFail = "AuthFail"

RbacAllowedAccountName = "sync-user"
RbacAllowedRoleName = "sync-admin"
)

func NewServer() *Server {
Expand All @@ -49,13 +52,33 @@ type Server struct {
}

func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Results, error) {
err := auth(ctx)
if err != nil {
log.Error("auth failed", err)
return generateFailedResults(events, err)
}

log.Info(fmt.Sprintf("start sync: %s", events.Flag()))

res := s.replicator.Persist(ctx, events)

return s.toResults(res), nil
}

func generateFailedResults(events *v1sync.EventList, err error) (*v1sync.Results, error) {
if events == nil || len(events.Events) == 0 {
return &v1sync.Results{Results: map[string]*v1sync.Result{}}, nil
}
rsts := make(map[string]*v1sync.Result, len(events.Events))
for _, evt := range events.Events {
rsts[evt.Id] = &v1sync.Result{
Code: resource.Fail,
Message: err.Error(),
}
}
return &v1sync.Results{Results: rsts}, nil
}

func (s *Server) toResults(results []*resource.Result) *v1sync.Results {
syncResult := make(map[string]*v1sync.Result, len(results))
for _, r := range results {
Expand All @@ -69,11 +92,18 @@ func (s *Server) toResults(results []*resource.Result) *v1sync.Results {
}
}

func (s *Server) Health(_ context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) {
func (s *Server) Health(ctx context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) {
resp := &v1sync.HealthReply{
Status: HealthStatusConnected,
LocalTimestamp: time.Now().UnixNano(),
}
err := auth(ctx)
if err != nil {
resp.Status = HealthStatusAuthFail
log.Error("auth failed", err)
return resp, nil
}

// TODO enable to close syncer
if !config.GetConfig().Sync.EnableOnStart {
resp.Status = HealthStatusClose
Expand Down
Loading

0 comments on commit 983f3f3

Please sign in to comment.