From 2f85ebab93a367dcaa25299455274030440eba9a Mon Sep 17 00:00:00 2001 From: Lucas Jacques Date: Sun, 12 Jan 2025 00:43:55 +0100 Subject: [PATCH] feat: add machine gateways --- agent/client/machines.go | 18 ++++ agent/machine.go | 27 +++++- agent/machinerunner/machine_gateway.go | 33 +++++++ agent/machinerunner/state/events.go | 24 +++++ agent/server/machines.go | 32 +++++++ agent/server/routes.go | 12 +++ agent/structs/machine_instance.go | 36 ++++---- api/machines.go | 16 +++- core/cluster/agent.go | 12 ++- core/cluster/cluster_state.go | 14 +-- core/cluster/corrosion/instances.go | 22 ++++- core/cluster/corrosion/machines.go | 8 +- core/cluster/corrosion/schema.sql | 3 +- core/cluster/models.go | 21 ++--- initd/initd.go | 1 + proxy/edge/backends.go | 21 +++-- proxy/edge/certificates.go | 22 ++--- proxy/edge/edge.go | 38 ++++---- proxy/edge/initd_authz.go | 9 +- proxy/edge/initd_proxy.go | 83 ----------------- proxy/edge/machine_proxy.go | 118 +++++++++++++++++++++++++ proxy/proxy.go | 18 ++-- ravel/machines.go | 20 ++++- ravel/orchestrator/create_machine.go | 11 +-- ravel/orchestrator/machines.go | 18 ++++ ravel/server/endpoints/endpoints.go | 16 ++++ ravel/server/endpoints/machines.go | 34 +++++++ 27 files changed, 495 insertions(+), 192 deletions(-) create mode 100644 agent/machinerunner/machine_gateway.go delete mode 100644 proxy/edge/initd_proxy.go create mode 100644 proxy/edge/machine_proxy.go diff --git a/agent/client/machines.go b/agent/client/machines.go index bb4c55e..4aae9f7 100644 --- a/agent/client/machines.go +++ b/agent/client/machines.go @@ -105,3 +105,21 @@ func (a *AgentClient) GetMachineLogsRaw(ctx context.Context, id string, follow b } return a.client.RawGet(ctx, path) } + +func (a *AgentClient) DisableMachineGateway(ctx context.Context, id string) error { + path := "/machines/" + id + "/gateway/disable" + err := a.client.Post(ctx, path, nil) + if err != nil { + return err + } + return nil +} + +func (a *AgentClient) EnableMachineGateway(ctx context.Context, id string) error { + path := "/machines/" + id + "/gateway/enable" + err := a.client.Post(ctx, path, nil) + if err != nil { + return err + } + return nil +} diff --git a/agent/machine.go b/agent/machine.go index d16be00..d09b5f6 100644 --- a/agent/machine.go +++ b/agent/machine.go @@ -65,10 +65,11 @@ func (a *Agent) PutMachine(ctx context.Context, opt cluster.PutMachineOptions) ( Machine: opt.Machine, Version: opt.Version, State: structs.MachineInstanceState{ - DesiredStatus: desiredStatus, - Status: api.MachineStatusCreated, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), + DesiredStatus: desiredStatus, + Status: api.MachineStatusCreated, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + MachineGatewayEnabled: opt.EnableGateway, }, } @@ -139,3 +140,21 @@ func (d *Agent) MachineExec(ctx context.Context, id string, cmd []string, timeou return machine.Exec(ctx, cmd, timeout) } + +func (d *Agent) EnableMachineGateway(ctx context.Context, id string) error { + machine, err := d.machines.GetMachine(id) + if err != nil { + return err + } + + return machine.EnableGateway(ctx) +} + +func (d *Agent) DisableMachineGateway(ctx context.Context, id string) error { + machine, err := d.machines.GetMachine(id) + if err != nil { + return err + } + + return machine.DisableGateway(ctx) +} diff --git a/agent/machinerunner/machine_gateway.go b/agent/machinerunner/machine_gateway.go new file mode 100644 index 0000000..a042e6a --- /dev/null +++ b/agent/machinerunner/machine_gateway.go @@ -0,0 +1,33 @@ +package machinerunner + +import "context" + +func (m *MachineRunner) EnableGateway(ctx context.Context) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.state.MachineInstance().State.MachineGatewayEnabled { + return nil + } + + if err := m.state.PushGatewayEvent(true); err != nil { + return err + } + + return nil +} + +func (m *MachineRunner) DisableGateway(ctx context.Context) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + if !m.state.MachineInstance().State.MachineGatewayEnabled { + return nil + } + + if err := m.state.PushGatewayEvent(false); err != nil { + return err + } + + return nil +} diff --git a/agent/machinerunner/state/events.go b/agent/machinerunner/state/events.go index 639b1c8..b95d4da 100644 --- a/agent/machinerunner/state/events.go +++ b/agent/machinerunner/state/events.go @@ -323,3 +323,27 @@ func (s *MachineInstanceState) PushDestroyedEvent(ctx context.Context) (err erro return nil } + +func (s *MachineInstanceState) PushGatewayEvent(enabled bool) (err error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + event := s.newEvent( + api.MachineGateway, + api.OriginUser, + s.mi.State.Status, + api.MachineEventPayload{ + Gateway: &api.MachineGatewayEventPayload{ + Enabled: enabled, + }, + }, + ) + + s.mi.State.MachineGatewayEnabled = enabled + + if err = s.persistStateAndEvent(event); err != nil { + return + } + + return nil +} diff --git a/agent/server/machines.go b/agent/server/machines.go index 14662a6..127330a 100644 --- a/agent/server/machines.go +++ b/agent/server/machines.go @@ -130,3 +130,35 @@ func (s *AgentServer) getMachineLogs(ctx context.Context, req *GetMachineLogsReq } return &GetMachineLogsResponse{Body: logs}, nil } + +type EnableMachineGatewayRequest struct { + Id string `path:"id"` +} + +type EnableMachineGatewayResponse struct { +} + +func (s *AgentServer) enableMachineGateway(ctx context.Context, req *EnableMachineGatewayRequest) (*EnableMachineGatewayResponse, error) { + err := s.agent.EnableMachineGateway(ctx, req.Id) + if err != nil { + s.log("Failed to enable machine gateway", err) + return nil, err + } + return &EnableMachineGatewayResponse{}, nil +} + +type DisableMachineGatewayRequest struct { + Id string `path:"id"` +} + +type DisableMachineGatewayResponse struct { +} + +func (s *AgentServer) disableMachineGateway(ctx context.Context, req *DisableMachineGatewayRequest) (*DisableMachineGatewayResponse, error) { + err := s.agent.DisableMachineGateway(ctx, req.Id) + if err != nil { + s.log("Failed to disable machine gateway", err) + return nil, err + } + return &DisableMachineGatewayResponse{}, nil +} diff --git a/agent/server/routes.go b/agent/server/routes.go index acbeed3..191a1a1 100644 --- a/agent/server/routes.go +++ b/agent/server/routes.go @@ -73,4 +73,16 @@ func (s AgentServer) registerEndpoints(mux humago.Mux) { Method: http.MethodDelete, }, s.destroyMachine) + huma.Register(api, huma.Operation{ + OperationID: "enableMachineGateway", + Path: "/machines/{id}/gateway/enable", + Method: http.MethodPost, + }, s.enableMachineGateway) + + huma.Register(api, huma.Operation{ + OperationID: "disableMachineGateway", + Path: "/machines/{id}/gateway/disable", + Method: http.MethodPost, + }, s.disableMachineGateway) + } diff --git a/agent/structs/machine_instance.go b/agent/structs/machine_instance.go index 30c7c91..b6af7b8 100644 --- a/agent/structs/machine_instance.go +++ b/agent/structs/machine_instance.go @@ -9,13 +9,14 @@ import ( ) type MachineInstanceState struct { - DesiredStatus api.MachineStatus `json:"desired_status"` - Status api.MachineStatus `json:"status"` - Restarts int `json:"restarts"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` - LocalIPV4 string `json:"local_ipv4"` - LastEvents []api.MachineEvent `json:"last_events"` + DesiredStatus api.MachineStatus `json:"desired_status"` + Status api.MachineStatus `json:"status"` + Restarts int `json:"restarts"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + LocalIPV4 string `json:"local_ipv4"` + LastEvents []api.MachineEvent `json:"last_events"` + MachineGatewayEnabled bool `json:"machine_gateway_enabled"` } type MachineInstance struct { @@ -47,15 +48,16 @@ func (mi *MachineInstance) InstanceOptions() instance.InstanceOptions { func (mi *MachineInstance) ClusterInstance() cluster.MachineInstance { return cluster.MachineInstance{ - Id: mi.Machine.InstanceId, - Node: mi.Machine.Node, - Namespace: mi.Machine.Namespace, - MachineId: mi.Machine.Id, - MachineVersion: mi.Version.Id, - Events: mi.State.LastEvents, - Status: mi.State.Status, - LocalIPV4: mi.State.LocalIPV4, - CreatedAt: mi.State.CreatedAt, - UpdatedAt: mi.State.UpdatedAt, + Id: mi.Machine.InstanceId, + Node: mi.Machine.Node, + Namespace: mi.Machine.Namespace, + MachineId: mi.Machine.Id, + MachineVersion: mi.Version.Id, + Events: mi.State.LastEvents, + Status: mi.State.Status, + LocalIPV4: mi.State.LocalIPV4, + CreatedAt: mi.State.CreatedAt, + UpdatedAt: mi.State.UpdatedAt, + EnableMachineGateway: mi.State.MachineGatewayEnabled, } } diff --git a/api/machines.go b/api/machines.go index 43df5b2..04eb3dd 100644 --- a/api/machines.go +++ b/api/machines.go @@ -29,6 +29,7 @@ type Machine struct { UpdatedAt time.Time `json:"updated_at"` Events []MachineEvent `json:"events"` Status MachineStatus `json:"state"` + GatewayEnabled bool `json:"gateway_enabled"` } type MachineStatus string @@ -69,7 +70,7 @@ type ( MachineConfig struct { Image string `json:"image"` Guest GuestConfig `json:"guest"` - Workload Workload `json:"workload"` + Workload Workload `json:"workload,omitempty"` StopConfig *StopConfig `json:"stop_config,omitempty"` AutoDestroy bool `json:"auto_destroy,omitempty"` } @@ -120,12 +121,14 @@ const ( MachineExited MachineEventType = "machine.exited" MachineDestroy MachineEventType = "machine.destroy" MachineDestroyed MachineEventType = "machine.destroyed" + MachineGateway MachineEventType = "machine.gateway" ) type CreateMachinePayload struct { - Region string `json:"region"` - Config MachineConfig `json:"config"` - SkipStart bool `json:"skip_start"` + Region string `json:"region"` + Config MachineConfig `json:"config"` + SkipStart bool `json:"skip_start,omitempty"` + EnableMachineGateway bool `json:"enable_machine_gateway,omitempty"` } type MachineStartEventPayload struct { @@ -159,6 +162,10 @@ type MachineDestroyEventPayload struct { Force bool `json:"force"` } +type MachineGatewayEventPayload struct { + Enabled bool `json:"enabled"` +} + type MachineEventPayload struct { PrepareFailed *MachinePrepareFailedEventPayload `json:"prepare_failed,omitempty"` Stop *MachineStopEventPayload `json:"stop,omitempty"` @@ -167,6 +174,7 @@ type MachineEventPayload struct { Started *MachineStartedEventPayload `json:"started,omitempty"` Exited *MachineExitedEventPayload `json:"stopped,omitempty"` Destroy *MachineDestroyEventPayload `json:"destroy,omitempty"` + Gateway *MachineGatewayEventPayload `json:"gateway,omitempty"` } type Origin string diff --git a/core/cluster/agent.go b/core/cluster/agent.go index 3d46593..1f5a085 100644 --- a/core/cluster/agent.go +++ b/core/cluster/agent.go @@ -8,10 +8,11 @@ import ( ) type PutMachineOptions struct { - Machine Machine `json:"machine"` - Version api.MachineVersion `json:"version"` - AllocationId string `json:"allocation_id"` - Start bool `json:"start"` + Machine Machine `json:"machine"` + Version api.MachineVersion `json:"version"` + AllocationId string `json:"allocation_id"` + Start bool `json:"start"` + EnableGateway bool `json:"enable_gateway"` } type Agent interface { @@ -26,4 +27,7 @@ type Agent interface { DestroyMachine(ctx context.Context, machineId string, force bool) error SubscribeToMachineLogs(ctx context.Context, id string) ([]*api.LogEntry, <-chan *api.LogEntry, error) GetMachineLogs(ctx context.Context, id string) ([]*api.LogEntry, error) + + EnableMachineGateway(ctx context.Context, id string) error + DisableMachineGateway(ctx context.Context, id string) error } diff --git a/core/cluster/cluster_state.go b/core/cluster/cluster_state.go index 5f78dec..11df3b3 100644 --- a/core/cluster/cluster_state.go +++ b/core/cluster/cluster_state.go @@ -8,6 +8,8 @@ import ( ) type Queries interface { + + /* Used by the API server */ CreateGateway(ctx context.Context, gateway api.Gateway) error DeleteGateway(ctx context.Context, id string) error DeleteFleetGateways(ctx context.Context, fleetId string) error @@ -17,17 +19,17 @@ type Queries interface { UpdateMachine(ctx context.Context, m Machine) error DestroyMachine(ctx context.Context, id string) error - UpsertInstance(ctx context.Context, i MachineInstance) error + ListAPIMachines(ctx context.Context, namespace string, fleetId string, includeDestroyed bool) ([]api.Machine, error) + GetAPIMachine(ctx context.Context, namespace string, fleetId string, machineId string) (*api.Machine, error) + DestroyNamespaceData(ctx context.Context, namespace string) error GetNode(ctx context.Context, id string) (api.Node, error) - UpsertNode(ctx context.Context, node api.Node) error ListNodesInRegion(ctx context.Context, region string) ([]api.Node, error) ListNodes(ctx context.Context) ([]api.Node, error) - ListAPIMachines(ctx context.Context, namespace string, fleetId string, includeDestroyed bool) ([]api.Machine, error) - GetAPIMachine(ctx context.Context, namespace string, fleetId string, machineId string) (*api.Machine, error) - - DestroyNamespaceData(ctx context.Context, namespace string) error + /* Used by raveld */ + UpsertNode(ctx context.Context, node api.Node) error + UpsertInstance(ctx context.Context, i MachineInstance) error } type ClusterState interface { diff --git a/core/cluster/corrosion/instances.go b/core/cluster/corrosion/instances.go index 58672ed..42994cb 100644 --- a/core/cluster/corrosion/instances.go +++ b/core/cluster/corrosion/instances.go @@ -9,22 +9,36 @@ import ( "github.com/valyentdev/ravel/core/instance" ) +type corroBool uint8 + +func (b corroBool) Bool() bool { + return b == 1 +} + +func fromBool(b bool) corroBool { + if b { + return 1 + } + return 0 +} + func (c *Queries) UpsertInstance(ctx context.Context, i cluster.MachineInstance) error { eventsBytes, err := json.Marshal(i.Events) if err != nil { return err } _, err = c.dbtx.Exec(ctx, - `INSERT INTO instances (id, node, machine_id, machine_version, status, created_at, updated_at, local_ipv4, events, namespace) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + `INSERT INTO instances (id, node, machine_id, machine_version, status, created_at, updated_at, local_ipv4, events, namespace, enable_machine_gateway) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (id, machine_id) DO UPDATE SET status = $5, updated_at = $7, local_ipv4 = $8, - events = $9 + events = $9, + enable_machine_gateway = $11 `, - i.Id, i.Node, i.MachineId, i.MachineVersion, i.Status, i.CreatedAt.Unix(), i.UpdatedAt.Unix(), i.LocalIPV4, string(eventsBytes), i.Namespace, + i.Id, i.Node, i.MachineId, i.MachineVersion, i.Status, i.CreatedAt.Unix(), i.UpdatedAt.Unix(), i.LocalIPV4, string(eventsBytes), i.Namespace, fromBool(i.EnableMachineGateway), ) if err != nil { return err diff --git a/core/cluster/corrosion/machines.go b/core/cluster/corrosion/machines.go index a8f8dfa..e91f1f7 100644 --- a/core/cluster/corrosion/machines.go +++ b/core/cluster/corrosion/machines.go @@ -100,7 +100,7 @@ func (c *Queries) DestroyMachine(ctx context.Context, id string) error { return nil } -const baseSelectAPIMachine = `SELECT m.id, m.namespace, m.fleet_id, m.instance_id, m.machine_version, m.region, m.created_at, m.updated_at, i.status, mv.config, i.events +const baseSelectAPIMachine = `SELECT m.id, m.namespace, m.fleet_id, m.instance_id, m.machine_version, m.region, m.created_at, m.updated_at, i.status, mv.config, i.events, i.enable_machine_gateway FROM machines m JOIN instances i ON m.instance_id = i.id JOIN machine_versions mv ON m.machine_version = mv.id` @@ -112,8 +112,8 @@ func scanAPIMachine(row dbutil.Scannable) (*api.Machine, error) { var createdAt int64 var updatedAt int64 var state string - - err := row.Scan(&m.Id, &m.Namespace, &m.FleetId, &m.InstanceId, &m.MachineVersion, &m.Region, &createdAt, &updatedAt, &state, &config, &events) + var gatewayEnabled corroBool + err := row.Scan(&m.Id, &m.Namespace, &m.FleetId, &m.InstanceId, &m.MachineVersion, &m.Region, &createdAt, &updatedAt, &state, &config, &events, &gatewayEnabled) if err != nil { if err == pgx.ErrNoRows { return nil, errdefs.NewNotFound("machine not found") @@ -121,6 +121,8 @@ func scanAPIMachine(row dbutil.Scannable) (*api.Machine, error) { return nil, err } + m.GatewayEnabled = gatewayEnabled.Bool() + m.Status = api.MachineStatus(state) err = json.Unmarshal(config, &m.Config) diff --git a/core/cluster/corrosion/schema.sql b/core/cluster/corrosion/schema.sql index d49b360..5cb32cf 100644 --- a/core/cluster/corrosion/schema.sql +++ b/core/cluster/corrosion/schema.sql @@ -9,7 +9,7 @@ CREATE table gateways ( target_port int not null default 80 ); --- Instances +-- Machine Instances CREATE TABLE instances ( id text not null default '', @@ -22,6 +22,7 @@ CREATE TABLE instances ( updated_at integer not null default 0, local_ipv4 text not null default '', events text not null default '[]', + enable_machine_gateway integer not null default 0, primary key (id, machine_id) ); diff --git a/core/cluster/models.go b/core/cluster/models.go index 496cf56..87ae9f5 100644 --- a/core/cluster/models.go +++ b/core/cluster/models.go @@ -20,14 +20,15 @@ type Machine struct { } type MachineInstance struct { - Id string `json:"id"` - Node string `json:"node"` - Namespace string `json:"namespace"` - MachineId string `json:"machine_id"` - MachineVersion string `json:"machine_version"` - Status api.MachineStatus `json:"status"` - Events []api.MachineEvent `json:"events"` - LocalIPV4 string `json:"local_ipv4"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + Id string `json:"id"` + Node string `json:"node"` + Namespace string `json:"namespace"` + MachineId string `json:"machine_id"` + MachineVersion string `json:"machine_version"` + Status api.MachineStatus `json:"status"` + Events []api.MachineEvent `json:"events"` + LocalIPV4 string `json:"local_ipv4"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + EnableMachineGateway bool `json:"enable_machine_gateway"` } diff --git a/initd/initd.go b/initd/initd.go index 65ea04e..04c7147 100644 --- a/initd/initd.go +++ b/initd/initd.go @@ -1,6 +1,7 @@ package initd const InitdPort = 64242 +const InitdPortStr = "64242" type WaitResult struct { ExitCode int `json:"exit_code"` diff --git a/proxy/edge/backends.go b/proxy/edge/backends.go index dc45e5b..26c446f 100644 --- a/proxy/edge/backends.go +++ b/proxy/edge/backends.go @@ -13,11 +13,12 @@ import ( ) type instance struct { - Id string `json:"id"` - Namespace string `json:"namespace"` - MachineId string `json:"machine_id"` - Address string `json:"address"` - Port int `json:"port"` + Id string `json:"id"` + Namespace string `json:"namespace"` + MachineId string `json:"machine_id"` + Address string `json:"address"` + Port int `json:"port"` + EnableMachineGateway bool `json:"enable_machine_gateway"` } func (i *instance) Url() *url.URL { @@ -58,16 +59,20 @@ func (b *instanceBackends) Start() { func scanInstance(row *corroclient.Row) (instance, error) { var i instance - err := row.Scan(&i.Id, &i.MachineId, &i.Namespace, &i.Address, &i.Port) + var machineGatewayEnabled uint8 + + err := row.Scan(&i.Id, &i.MachineId, &i.Namespace, &i.Address, &i.Port, &machineGatewayEnabled) if err != nil { return instance{}, err } + i.EnableMachineGateway = machineGatewayEnabled == 1 + return i, nil } const getBackendsQuery = ` - select i.id, m.id, m.namespace, n.address, n.http_proxy_port + select i.id, m.id, m.namespace, n.address, n.http_proxy_port, i.enable_machine_gateway from instances i join machines m on m.instance_id = i.id join nodes n on n.id = i.node @@ -103,6 +108,8 @@ func (b *instanceBackends) sync() error { } switch change.ChangeType { + case corroclient.ChangeTypeUpdate: + b.setBackend(i) case corroclient.ChangeTypeInsert: b.setBackend(i) case corroclient.ChangeTypeDelete: diff --git a/proxy/edge/certificates.go b/proxy/edge/certificates.go index 149e87a..76ae465 100644 --- a/proxy/edge/certificates.go +++ b/proxy/edge/certificates.go @@ -5,21 +5,21 @@ import ( "strings" ) -func newCertStore(gwDomain, initdDomain string, gw, initd *tls.Certificate) *certStore { +func newCertStore(gwDomain, machinesGwDomain string, gw, machineGw *tls.Certificate) *certStore { return &certStore{ - gwDomain: gwDomain, - gw: gw, - initdDomain: initdDomain, - initd: initd, + gwDomain: gwDomain, + gw: gw, + machinesGwDomain: machinesGwDomain, + machinesGw: machineGw, } } // dumb certStore, to be replaced when support for custom domains is added type certStore struct { - gwDomain string - gw *tls.Certificate - initdDomain string - initd *tls.Certificate + gwDomain string + gw *tls.Certificate + machinesGwDomain string + machinesGw *tls.Certificate } func (cs *certStore) GetCertificate(chi *tls.ClientHelloInfo) (*tls.Certificate, error) { @@ -33,8 +33,8 @@ func (cs *certStore) GetCertificate(chi *tls.ClientHelloInfo) (*tls.Certificate, switch after { case cs.gwDomain: return cs.gw, nil - case cs.initdDomain: - return cs.initd, nil + case cs.machinesGwDomain: + return cs.machinesGw, nil } return nil, nil // the go tls package take care to send errNoCert diff --git a/proxy/edge/edge.go b/proxy/edge/edge.go index dd570d7..ea09219 100644 --- a/proxy/edge/edge.go +++ b/proxy/edge/edge.go @@ -8,7 +8,6 @@ import ( "time" "github.com/valyentdev/corroclient" - "github.com/valyentdev/ravel/initd" "github.com/valyentdev/ravel/proxy" "github.com/valyentdev/ravel/proxy/httpproxy" "github.com/valyentdev/ravel/proxy/server" @@ -16,7 +15,7 @@ import ( type edgeProxy struct { gws httpproxy.ProxyHandler - initd httpproxy.ProxyHandler + machines httpproxy.ProxyHandler gatewayDomain string initdDomain string } @@ -27,31 +26,34 @@ func loadCertificates(config *proxy.Config) (*certStore, error) { return nil, err } - initdCert, err := tls.LoadX509KeyPair(config.Edge.Initd.TLS.CertFile, config.Edge.Initd.TLS.KeyFile) + machinesCert, err := tls.LoadX509KeyPair(config.Edge.MachineGateways.TLS.CertFile, config.Edge.MachineGateways.TLS.KeyFile) if err != nil { return nil, err } - return newCertStore(config.Edge.DefaultDomain, config.Edge.Initd.Domain, &gwCert, &initdCert), nil + return newCertStore(config.Edge.DefaultDomain, config.Edge.MachineGateways.Domain, &gwCert, &machinesCert), nil } func (p *edgeProxy) serveHTTPS(w http.ResponseWriter, r *http.Request) { host := httpproxy.StripHostPort(r.Host) - sni := r.TLS.ServerName - if sni != host { - httpproxy.AnswerErrorStatus(w, r, http.StatusBadGateway) - return - } + + // We'll need to check the SNI later when we'll support custom domains + // for now, we'll just check the host as all domains are subdomains of the + // defaults domains + // sni := r.TLS.ServerNacme + // if sni != host { + // slog.Warn("SNI does not match host", "sni", sni, "host", host) + // httpproxy.AnswerErrorStatus(w, r, http.StatusBadGateway) + // return + // } if isSubDomain(p.gatewayDomain, host) { - slog.Debug("proxying to a gateway") p.gws.ServeHTTP(w, r) return } if isSubDomain(p.initdDomain, host) { - slog.Debug("proxying to initd") - p.initd.ServeHTTP(w, r) + p.machines.ServeHTTP(w, r) return } @@ -76,28 +78,28 @@ func NewEdgeProxyServer(config *proxy.Config) (*server.Server, error) { backends.Start() gatewayDomainSuffix := "." + config.Edge.DefaultDomain - initdDomainSuffix := "." + config.Edge.Initd.Domain + machinesGwDomainSuffix := "." + config.Edge.MachineGateways.Domain gwService := newGatewayProxyService(gatewayDomainSuffix, backends, corro) gwProxy := httpproxy.NewProxy(gwService, nil) var authorizer Authorizer - if config.Edge.Initd.Authz != nil { - authorizer = newValyentAuthorizer(config.Edge.Initd.Authz.Endpoint) + if config.Edge.MachineGateways.InitdAuthz != nil { + authorizer = newValyentAuthorizer(config.Edge.MachineGateways.InitdAuthz.Endpoint) } else { slog.Warn("[DANGER] No authorization is configured for initd proxy") authorizer = &noAuthAuthorizer{} } - initdProxyService := newInitdProxyService(backends, initdDomainSuffix, initd.InitdPort, authorizer) + initdProxyService := newInitdProxyService(backends, machinesGwDomainSuffix, authorizer) initdProxy := httpproxy.NewProxy(initdProxyService, nil) proxy := &edgeProxy{ gws: gwProxy, - initd: initdProxy, + machines: initdProxy, gatewayDomain: config.Edge.DefaultDomain, - initdDomain: config.Edge.Initd.Domain, + initdDomain: config.Edge.MachineGateways.Domain, } certStore, err := loadCertificates(config) diff --git a/proxy/edge/initd_authz.go b/proxy/edge/initd_authz.go index 237c31c..16f62eb 100644 --- a/proxy/edge/initd_authz.go +++ b/proxy/edge/initd_authz.go @@ -1,14 +1,13 @@ package edge import ( - "context" "net/http" "github.com/valyentdev/ravel/internal/httpclient" ) type Authorizer interface { - Authorize(ctx context.Context, r *http.Request, namespace string) bool + Authorize(r *http.Request, namespace string) bool } type valyentAuthorizer struct { @@ -29,10 +28,10 @@ func newValyentAuthorizer(endpoint string) *valyentAuthorizer { } } -func (v *valyentAuthorizer) Authorize(ctx context.Context, r *http.Request, namespace string) bool { +func (v *valyentAuthorizer) Authorize(r *http.Request, namespace string) bool { var result authResult err := v.client.Get( - context.Background(), + r.Context(), v.endpoint, &result, httpclient.WithHeader("Authorization", r.Header.Get("Authorization")), @@ -50,6 +49,6 @@ func (v *valyentAuthorizer) Authorize(ctx context.Context, r *http.Request, name type noAuthAuthorizer struct{} -func (n *noAuthAuthorizer) Authorize(ctx context.Context, r *http.Request, namespace string) bool { +func (n *noAuthAuthorizer) Authorize(r *http.Request, namespace string) bool { return true } diff --git a/proxy/edge/initd_proxy.go b/proxy/edge/initd_proxy.go deleted file mode 100644 index b1a8396..0000000 --- a/proxy/edge/initd_proxy.go +++ /dev/null @@ -1,83 +0,0 @@ -package edge - -import ( - "log/slog" - "net/http" - "net/url" - "strconv" - "strings" - - "github.com/valyentdev/ravel/proxy/httpproxy" -) - -type initdRCTX struct { - instanceId string - namespace string -} -type initdProxyService struct { - authorizer Authorizer - backends *instanceBackends - domainSuffix string - port string -} - -func newInitdProxyService(backends *instanceBackends, domainSuffix string, port int, authorizer Authorizer) *initdProxyService { - return &initdProxyService{ - authorizer: authorizer, - backends: backends, - domainSuffix: domainSuffix, - port: strconv.Itoa(port), - } -} - -var _ interface { - httpproxy.HTTPProxyService[initdRCTX] - httpproxy.WithRewrite[initdRCTX] - httpproxy.WithFilter[initdRCTX] -} = (*initdProxyService)(nil) - -func (p *initdProxyService) NewRCTX() *initdRCTX { - return &initdRCTX{} -} -func (p *initdProxyService) GetUpstream(r *http.Request, rctx *initdRCTX) *url.URL { - slog.Debug("getting upstream initd") - host := r.Host - if !strings.HasSuffix(host, p.domainSuffix) { - slog.Debug("host does not have suffix", "host", host, "suffix", p.domainSuffix) - return nil - } - - host = strings.TrimSuffix(host, p.domainSuffix) - - parts := strings.SplitN(host, "-", 2) - if len(parts) != 2 { - slog.Debug("host does not have two parts", "host", host) - return nil - } - - machineId := parts[0] - - instance, ok := p.backends.getBackend(machineId) - if !ok { - slog.Debug("instance not found", "machineId", machineId) - return nil - } - - if instance.Namespace != parts[1] { - slog.Debug("namespace does not match", "instance", instance, "namespace", parts[1]) - return nil - } - - rctx.instanceId = instance.Id - rctx.namespace = instance.Namespace - return instance.Url() -} - -func (p *initdProxyService) Rewrite(r *httpproxy.ProxyRequest, rctx *initdRCTX) { - r.Out.Header.Set("Ravel-Instance-Id", rctx.instanceId) - r.Out.Header.Set("Ravel-Instance-Port", p.port) -} - -func (p *initdProxyService) Filter(w http.ResponseWriter, r *http.Request, rctx *initdRCTX) bool { - return true -} diff --git a/proxy/edge/machine_proxy.go b/proxy/edge/machine_proxy.go new file mode 100644 index 0000000..8fe496d --- /dev/null +++ b/proxy/edge/machine_proxy.go @@ -0,0 +1,118 @@ +package edge + +import ( + "log/slog" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/valyentdev/ravel/initd" + "github.com/valyentdev/ravel/proxy/httpproxy" +) + +type machineRCTX struct { + instanceId string + namespace string + needAuth bool + port string +} +type machineProxyService struct { + authorizer Authorizer + backends *instanceBackends + domainSuffix string +} + +func newInitdProxyService(backends *instanceBackends, domainSuffix string, authorizer Authorizer) *machineProxyService { + return &machineProxyService{ + authorizer: authorizer, + backends: backends, + domainSuffix: domainSuffix, + } +} + +var _ interface { + httpproxy.HTTPProxyService[machineRCTX] + httpproxy.WithRewrite[machineRCTX] + httpproxy.WithFilter[machineRCTX] +} = (*machineProxyService)(nil) + +func (p *machineProxyService) NewRCTX() *machineRCTX { + return &machineRCTX{} +} + +func getPort(part string) (portStr string, isInitd bool, ok bool) { + if part == "initd" { + return initd.InitdPortStr, true, true + } + + if part == initd.InitdPortStr { + return initd.InitdPortStr, true, true + } + + _, err := strconv.Atoi(part) + if err != nil { + slog.Debug("part is not a number", "part", part) + return "", false, false + } + + return part, false, true + +} +func (p *machineProxyService) GetUpstream(r *http.Request, rctx *machineRCTX) *url.URL { + host := r.Host + host, _, found := strings.Cut(host, ".") + if !found { + slog.Debug("host does not have a suffix", "host", host) + return nil + } + + parts := strings.SplitN(host, "-", 2) + if len(parts) != 2 { + slog.Debug("host does not have two parts", "host", host) + return nil + } + + machineId := parts[0] + + instance, ok := p.backends.getBackend(machineId) + if !ok { + slog.Debug("instance not found", "machineId", machineId) + return nil + } + + port, isInitd, ok := getPort(parts[1]) + if !ok { + slog.Debug("port not found", "host", host) + return nil + } + + if !isInitd && !instance.EnableMachineGateway { + slog.Debug("machine gateway not enabled", "instance", instance) + return nil + } + + rctx.port = port + rctx.needAuth = isInitd + rctx.instanceId = instance.Id + rctx.namespace = instance.Namespace + + return instance.Url() +} + +func (p *machineProxyService) Rewrite(r *httpproxy.ProxyRequest, rctx *machineRCTX) { + r.Out.Header.Set("Ravel-Instance-Id", rctx.instanceId) + r.Out.Header.Set("Ravel-Instance-Port", rctx.port) +} + +func (p *machineProxyService) Filter(w http.ResponseWriter, r *http.Request, rctx *machineRCTX) bool { + if rctx.needAuth { + if !p.authorizer.Authorize(r, rctx.namespace) { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + slog.Debug("unauthorized request", "rctx", rctx) + return false + } + } + + return true +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 4640e98..915cbe0 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -27,20 +27,20 @@ type Config struct { Local LocalConfig `toml:"local"` } -type InitdAPIConfig struct { - Domain string `toml:"domain"` - Authz *struct { +type MachineGatewaysConfig struct { + Domain string `toml:"domain"` + InitdAuthz *struct { Endpoint string `toml:"endpoint_url"` - } + } `toml:"initd_authz"` TLS TLS `toml:"tls"` } type EdgeConfig struct { - DefaultDomain string `toml:"default_domain"` - HttpAddr string `toml:"http_addr"` - HttpsAddr string `toml:"https_addr"` - Initd *InitdAPIConfig `toml:"initd"` - TLS TLS `toml:"tls"` + DefaultDomain string `toml:"default_domain"` + HttpAddr string `toml:"http_addr"` + HttpsAddr string `toml:"https_addr"` + MachineGateways *MachineGatewaysConfig `toml:"machine_gateways"` + TLS TLS `toml:"tls"` } type LocalConfig struct { diff --git a/ravel/machines.go b/ravel/machines.go index 100d9e8..59a909d 100644 --- a/ravel/machines.go +++ b/ravel/machines.go @@ -114,7 +114,7 @@ func (r *Ravel) CreateMachine(ctx context.Context, namespace string, fleet strin return nil, err } - err = r.o.PutMachine(ctx, nodeId, &machine, mv, !createOptions.SkipStart) + err = r.o.PutMachine(ctx, nodeId, &machine, mv, !createOptions.SkipStart, createOptions.EnableMachineGateway) if err != nil { return nil, err } @@ -282,3 +282,21 @@ func (r *Ravel) WaitMachineStatus(ctx context.Context, ns, fleet, machineId stri Timeout: opt.timeout, }) } + +func (r *Ravel) EnableMachineGateway(ctx context.Context, ns, fleet, machineId string) error { + m, err := r.getMachine(ctx, ns, fleet, machineId, false) + if err != nil { + return err + } + + return r.o.EnableMachineGateway(ctx, m) +} + +func (r *Ravel) DisableMachineGateway(ctx context.Context, ns, fleet, machineId string) error { + m, err := r.getMachine(ctx, ns, fleet, machineId, false) + if err != nil { + return err + } + + return r.o.DisableMachineGateway(ctx, m) +} diff --git a/ravel/orchestrator/create_machine.go b/ravel/orchestrator/create_machine.go index 02b3b84..3c111e6 100644 --- a/ravel/orchestrator/create_machine.go +++ b/ravel/orchestrator/create_machine.go @@ -30,7 +30,7 @@ func (o *Orchestrator) PrepareAllocation(ctx context.Context, region string, all return } -func (o *Orchestrator) PutMachine(ctx context.Context, nodeId string, machine *cluster.Machine, mv api.MachineVersion, start bool) error { +func (o *Orchestrator) PutMachine(ctx context.Context, nodeId string, machine *cluster.Machine, mv api.MachineVersion, start bool, enableGateway bool) error { member, err := o.clusterState.GetNode(ctx, nodeId) if err != nil { return fmt.Errorf("failed to get node: %w", err) @@ -42,10 +42,11 @@ func (o *Orchestrator) PutMachine(ctx context.Context, nodeId string, machine *c } _, err = ac.PutMachine(ctx, cluster.PutMachineOptions{ - AllocationId: machine.Id, - Machine: *machine, - Version: mv, - Start: start, + AllocationId: machine.Id, + Machine: *machine, + Version: mv, + Start: start, + EnableGateway: enableGateway, }) if err != nil { return err diff --git a/ravel/orchestrator/machines.go b/ravel/orchestrator/machines.go index 04e177a..832250a 100644 --- a/ravel/orchestrator/machines.go +++ b/ravel/orchestrator/machines.go @@ -115,3 +115,21 @@ func (o *Orchestrator) GetMachineLogsRaw(ctx context.Context, machine cluster.Ma return agentClient.GetMachineLogsRaw(ctx, machine.Id, follow) } + +func (o *Orchestrator) EnableMachineGateway(ctx context.Context, machine cluster.Machine) error { + agentClient, err := o.getAgentClient(machine.Node) + if err != nil { + return err + } + + return agentClient.EnableMachineGateway(ctx, machine.Id) +} + +func (o *Orchestrator) DisableMachineGateway(ctx context.Context, machine cluster.Machine) error { + agentClient, err := o.getAgentClient(machine.Node) + if err != nil { + return err + } + + return agentClient.DisableMachineGateway(ctx, machine.Id) +} diff --git a/ravel/server/endpoints/endpoints.go b/ravel/server/endpoints/endpoints.go index e089eb1..4e05337 100644 --- a/ravel/server/endpoints/endpoints.go +++ b/ravel/server/endpoints/endpoints.go @@ -190,6 +190,22 @@ func (e *Endpoints) Register(api huma.API) { Tags: []string{"machines"}, }, e.waitMachineStatus) + huma.Register(api, huma.Operation{ + OperationID: "enableMachineGateway", + Summary: "Enable the Machine Gateway", + Method: http.MethodPost, + Path: "/fleets/{fleet}/machines/{machine_id}/gateway/enable", + Tags: []string{"machines"}, + }, e.enableMachineGateway) + + huma.Register(api, huma.Operation{ + OperationID: "disableMachineGateway", + Summary: "Disable the Machine Gateway", + Method: http.MethodPost, + Path: "/fleets/{fleet}/machines/{machine_id}/gateway/disable", + Tags: []string{"machines"}, + }, e.disableMachineGateway) + huma.Register(api, huma.Operation{ OperationID: "createGateway", Summary: "Create a gateway", diff --git a/ravel/server/endpoints/machines.go b/ravel/server/endpoints/machines.go index 9994055..e380d80 100644 --- a/ravel/server/endpoints/machines.go +++ b/ravel/server/endpoints/machines.go @@ -252,3 +252,37 @@ func (e *Endpoints) waitMachineStatus(ctx context.Context, req *WaitMachineStatu return nil, nil } + +type EnableMachineGatewayRequest struct { + MachineResolver +} + +type EnableMachineGatewayResponse struct { +} + +func (e *Endpoints) enableMachineGateway(ctx context.Context, req *EnableMachineGatewayRequest) (*EnableMachineGatewayResponse, error) { + err := e.ravel.EnableMachineGateway(ctx, req.Namespace, req.Fleet, req.MachineId) + if err != nil { + e.log("Failed to enable machine gateway", err) + return nil, err + } + + return nil, nil +} + +type DisableMachineGatewayRequest struct { + MachineResolver +} + +type DisableMachineGatewayResponse struct { +} + +func (e *Endpoints) disableMachineGateway(ctx context.Context, req *DisableMachineGatewayRequest) (*DisableMachineGatewayResponse, error) { + err := e.ravel.DisableMachineGateway(ctx, req.Namespace, req.Fleet, req.MachineId) + if err != nil { + e.log("Failed to disable machine gateway", err) + return nil, err + } + + return nil, nil +}