Skip to content

Commit

Permalink
aeon: connect implementation
Browse files Browse the repository at this point in the history
Implement aeon console connection.

The ‘Console’ module has been separate from the ‘Connect’ abstraction,
to allow it being used independently of the transport layer.

Closes #1050

@TarantoolBot document
Title: Implement aeon console connection.

Command allow connect to Enterprise Aeon database with specified URL.
Available command options:
- `sslkeyfile <private_key>` - path to private part of certificate.
- `sslcertfile <pub_cert>` - path to public part of certificate.
- `sslcafile <ca_file>` - path to root CA for self-signed certificate.
- `transport [ssl|plain]` - connection mode.
  • Loading branch information
dmyger committed Jan 15, 2025
1 parent e844f27 commit ff7dff7
Show file tree
Hide file tree
Showing 20 changed files with 1,257 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* `sslcertfile` - path to an SSL certificate file,
* `sslcafile` - path to a trusted certificate authorities (CA) file,
* `sslciphers` - colon-separated list of SSL cipher suites the connection.
- `tt aeon connect`: add support to connect Aeon database.

### Changed

Expand Down
199 changes: 199 additions & 0 deletions cli/aeon/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package aeon

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/apex/log"

"github.com/tarantool/go-prompt"
"github.com/tarantool/tt/cli/aeon/cmd"
"github.com/tarantool/tt/cli/aeon/pb"
"github.com/tarantool/tt/cli/connector"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

// Client structure with parameters for gRPC connection to Aeon.
type Client struct {
title string
conn *grpc.ClientConn
client pb.SQLServiceClient
}

func makeAddress(ctx cmd.ConnectCtx) string {
if ctx.Network == connector.UnixNetwork {
if strings.HasPrefix(ctx.Address, "@") {
return "unix-abstract:" + (ctx.Address)[1:]
}
return "unix:" + ctx.Address
}
return ctx.Address
}

func getCertificate(args cmd.Ssl) (tls.Certificate, error) {
if args.CertFile == "" && args.KeyFile == "" {
return tls.Certificate{}, nil
}
tls_cert, err := tls.LoadX509KeyPair(args.CertFile, args.KeyFile)
if err != nil {
return tls_cert, fmt.Errorf("could not load client key pair: %w", err)
}
return tls_cert, nil
}

func getTlsConfig(args cmd.Ssl) (*tls.Config, error) {
if args.CaFile == "" {
return &tls.Config{
ClientAuth: tls.NoClientCert,
}, nil
}

ca, err := os.ReadFile(args.CaFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file: %w", err)
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(ca) {
return nil, errors.New("failed to append CA data")
}
cert, err := getCertificate(args)
if err != nil {
return nil, fmt.Errorf("failed get certificate: %w", err)
}
return &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
RootCAs: certPool,
}, nil
}

func getDialOpts(ctx cmd.ConnectCtx) (grpc.DialOption, error) {
var creds credentials.TransportCredentials
if ctx.Transport == cmd.TransportSsl {
config, err := getTlsConfig(ctx.Ssl)
if err != nil {
return nil, fmt.Errorf("not tls config: %w", err)
}
creds = credentials.NewTLS(config)
} else {
creds = insecure.NewCredentials()
}
return grpc.WithTransportCredentials(creds), nil
}

// NewAeonHandler create new grpc connection to Aeon server.
func NewAeonHandler(ctx cmd.ConnectCtx) (*Client, error) {
c := Client{title: ctx.Address}
target := makeAddress(ctx)
// var err error
opt, err := getDialOpts(ctx)
if err != nil {
return nil, fmt.Errorf("%w", err)
}
c.conn, err = grpc.NewClient(target, opt)
if err != nil {
return nil, fmt.Errorf("fail to dial: %w", err)
}
if err := c.ping(); err == nil {
log.Infof("Aeon responses at %q", target)
} else {
return nil, fmt.Errorf("can't ping to Aeon at %q: %w", target, err)
}

c.client = pb.NewSQLServiceClient(c.conn)
return &c, nil
}

func (c *Client) ping() error {
log.Infof("Start ping aeon server")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

diag := pb.NewDiagServiceClient(c.conn)
_, err := diag.Ping(ctx, &pb.PingRequest{})
if err != nil {
log.Warnf("Aeon ping %s", err)
}
return err
}

// Title implements console.Handler interface.
func (c *Client) Title() string {
return c.title
}

// Validate implements console.Handler interface.
func (c *Client) Validate(input string) bool {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

check, err := c.client.SQLCheck(ctx, &pb.SQLRequest{Query: input})
if err != nil {
log.Warnf("Aeon validate %s\nFor request: %q", err, input)
return false
}

return check.Status == pb.SQLCheckStatus_SQL_QUERY_VALID
}

// Execute implements console.Handler interface.
func (c *Client) Execute(input string) any {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

resp, err := c.client.SQL(ctx, &pb.SQLRequest{Query: input})
if err != nil {
return err
}
return parseSQLResponse(resp)
}

// Stop implements console.Handler interface.
func (c *Client) Close() {
c.conn.Close()
}

// Complete implements console.Handler interface.
func (c *Client) Complete(input prompt.Document) []prompt.Suggest {
// TODO: waiting until there is support from Aeon side.
return nil
}

// parseSQLResponse returns result as table in map.
// Where keys is name of columns. And body is array of values.
// On any issue return an error.
func parseSQLResponse(resp *pb.SQLResponse) any {
if resp.Error != nil {
return ResultError{resp.Error}
}
if resp.TupleFormat == nil {
return ResultType{}
}
res := ResultType{
names: make([]string, len(resp.TupleFormat.Names)),
rows: make([]ResultRow, len(resp.Tuples)),
}
for i, n := range resp.TupleFormat.Names {
res.names[i] = n
res.rows[i] = make([]any, 0, len(resp.TupleFormat.Names))
}

for r, row := range resp.Tuples {
for _, v := range row.Fields {
val, err := decodeValue(v)
if err != nil {
return fmt.Errorf("tuple %d can't decode %s: %w", r, v.String(), err)
}
res.rows[r] = append(res.rows[r], val)
}
}
return res
}
4 changes: 4 additions & 0 deletions cli/aeon/cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ type ConnectCtx struct {
Ssl Ssl
// Transport is a connection mode.
Transport Transport
// Network is kind of transport layer.
Network string
// Address is a connection URL, unix socket address and etc.
Address string
}
100 changes: 100 additions & 0 deletions cli/aeon/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package aeon

import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool/v2/datetime"
"github.com/tarantool/go-tarantool/v2/decimal"
"github.com/tarantool/tt/cli/aeon/pb"
)

// decodeValue convert a value obtained from protobuf into a value that can be used as an
// argument to Tarantool functions.
//
// Copy from https://github.com/tarantool/aeon/blob/master/aeon/grpc/server/pb/decode.go
func decodeValue(val *pb.Value) (any, error) {
switch casted := val.Kind.(type) {
case *pb.Value_UnsignedValue:
return val.GetUnsignedValue(), nil
case *pb.Value_StringValue:
return val.GetStringValue(), nil
case *pb.Value_NumberValue:
return val.GetNumberValue(), nil
case *pb.Value_IntegerValue:
return val.GetIntegerValue(), nil
case *pb.Value_BooleanValue:
return val.GetBooleanValue(), nil
case *pb.Value_VarbinaryValue:
return val.GetVarbinaryValue(), nil
case *pb.Value_DecimalValue:
decStr := val.GetDecimalValue()
res, err := decimal.MakeDecimalFromString(decStr)
if err != nil {
return nil, err
}
return res, nil
case *pb.Value_UuidValue:
uuidStr := val.GetUuidValue()
res, err := uuid.Parse(uuidStr)
if err != nil {
return nil, err
}
return res, nil
case *pb.Value_DatetimeValue:
sec := casted.DatetimeValue.Seconds
nsec := casted.DatetimeValue.Nsec
t := time.Unix(sec, nsec)
if len(casted.DatetimeValue.Location) > 0 {
locStr := casted.DatetimeValue.Location
loc, err := time.LoadLocation(locStr)
if err != nil {
return nil, err
}
t = t.In(loc)
}
res, err := datetime.MakeDatetime(t)
if err != nil {
return nil, err
}
return res, nil
case *pb.Value_IntervalValue:
res := datetime.Interval{
Year: casted.IntervalValue.Year,
Month: casted.IntervalValue.Month,
Week: casted.IntervalValue.Week,
Day: casted.IntervalValue.Day,
Hour: casted.IntervalValue.Hour,
Min: casted.IntervalValue.Min,
Sec: casted.IntervalValue.Sec,
Nsec: casted.IntervalValue.Nsec,
Adjust: datetime.Adjust(casted.IntervalValue.Adjust)}
return res, nil
case *pb.Value_ArrayValue:
array := val.GetArrayValue()
res := make([]any, len(array.Fields))
for k, v := range array.Fields {
field, err := decodeValue(v)
if err != nil {
return nil, err
}
res[k] = field
}
return res, nil
case *pb.Value_MapValue:
res := make(map[any]any, len(casted.MapValue.Fields))
for k, v := range casted.MapValue.Fields {
item, err := decodeValue(v)
if err != nil {
return nil, err
}
res[k] = item
}
return res, nil
case *pb.Value_NullValue:
return nil, nil
default:
return nil, fmt.Errorf("unsupported type for value")
}
}
54 changes: 54 additions & 0 deletions cli/aeon/results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package aeon

import (
"fmt"

"github.com/tarantool/tt/cli/aeon/pb"
"github.com/tarantool/tt/cli/console"
"github.com/tarantool/tt/cli/formatter"
)

// ResultRow keeps values for one table row.
type ResultRow []any

// ResultType is a custom type to format output with console.Formatter interface.
type ResultType struct {
names []string
rows []ResultRow
}

// ResultError wraps pb.Error to implement console.Formatter interface.
type ResultError struct {
*pb.Error
}

// asYaml prepare results for formatter.MakeOutput.
func (r ResultType) asYaml() string {
yaml := "---\n"
for _, row := range r.rows {
mark := "-"
for i, v := range row {
n := r.names[i]
yaml += fmt.Sprintf("%s %s: %v\n", mark, n, v)
mark = " "
}
}
return yaml
}

// Format produce formatted string according required console.Format settings.
func (r ResultType) Format(f console.Format) (string, error) {
if len(r.names) == 0 {
return "", nil
}
output, err := formatter.MakeOutput(f.Mode, r.asYaml(), f.Opts)
if err != nil {
return "", err
}
return output, nil
}

// Format produce formatted string according required console.Format settings.
func (e *ResultError) Format(_ console.Format) (string, error) {
return fmt.Sprintf("---\nError: %s\n%q", e.Name, e.Msg), nil
}
16 changes: 16 additions & 0 deletions cli/aeon/results_export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package aeon

import "github.com/tarantool/tt/cli/aeon/pb"

func NewResultType(names []string, rows []ResultRow) ResultType {
return ResultType{
names: names,
rows: rows,
}
}

func NewResultError(name string, msg string) ResultError {
return ResultError{&pb.Error{
Name: name,
Msg: msg}}
}
Loading

0 comments on commit ff7dff7

Please sign in to comment.