Skip to content

Commit

Permalink
Unify endpoints for clustering nodes (#109)
Browse files Browse the repository at this point in the history
Previously, a client needed to call different API endpoints to
generate control plane or worker token and joining those different node
types to the k8s cluster.
This commit adds a unified endpoint for creating a token
(`POST /k8sd/tokens`) and joining a node (`POST /k8sd/cluster/<node>`).
  • Loading branch information
bschimke95 authored Feb 6, 2024
1 parent 4c2b06f commit 469aefc
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 105 deletions.
11 changes: 11 additions & 0 deletions src/k8s/api/v1/cluster_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package v1

// JoinNodeRequest is used to request to add a node to the cluster.
type JoinNodeRequest struct {
Name string `json:"name"`
Address string `json:"address"`
Token string `json:"token"`
}

// JoinNodeResponse is the response from "POST 1.0/k8sd/cluster/{node}"
type JoinNodeResponse struct{}
18 changes: 18 additions & 0 deletions src/k8s/api/v1/tokens.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package v1

// TokenRequest is used to request a token for joining a node to the cluster.
type TokenRequest struct {
// If true, a token for joining a worker node is created.
// If false, a token for joining a control plane node is created.
Worker bool `json:"worker"`
// Name of the node that should join.
// Only required for control plane nodes as all workers share the same token.
Name string `json:"name"`
}

// TokensResponse is used to return a token for joining nodes in the cluster.
type TokensResponse struct {
// We want to be able to quickly find the tokens in the code, but have the same
// JSON response for control-plane and worker nodes, thus the discrepancy in naming.
EncodedToken string `json:"token"`
}
10 changes: 0 additions & 10 deletions src/k8s/api/v1/worker.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package v1

// WorkerNodeTokenRequest is used to request a token for joining the cluster as a worker node.
type WorkerNodeTokenRequest struct{}

// WorkerNodeTokenResponse is used to return a token for joining worker nodes in the cluster.
type WorkerNodeTokenResponse struct {
// We want to be able to quickly find the worker tokens in the code, but have the same
// JSON response for control-plane and worker nodes, thus the discrepancy in naming.
EncodedToken string `json:"token"`
}

// WorkerNodeInfoRequest is used by a worker node to retrieve the required credentials
// to join a cluster.
type WorkerNodeInfoRequest struct {
Expand Down
45 changes: 14 additions & 31 deletions src/k8s/pkg/k8s/client/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,30 @@ package client
import (
"context"
"fmt"
"time"
"os"

"github.com/canonical/k8s/pkg/k8sd/types"
apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/snap"
"github.com/canonical/k8s/pkg/utils/control"
"github.com/canonical/lxd/shared/api"
)

func (c *Client) joinWorkerNode(ctx context.Context, name, address, token string) error {
return c.m.NewCluster(name, address, map[string]string{"workerToken": token}, time.Second*180)
}

func (c *Client) joinControlPlaneNode(ctx context.Context, name, address, token string) error {
return c.m.JoinCluster(name, address, token, nil, time.Second*180)
}

func (c *Client) JoinNode(ctx context.Context, name string, address string, token string) error {
if err := c.m.Ready(30); err != nil {
return fmt.Errorf("cluster did not come up in time: %w", err)
}

// differentiate between control plane and worker node tokens
info := &types.InternalWorkerNodeToken{}
if info.Decode(token) == nil {
// valid worker node token
if err := c.joinWorkerNode(ctx, name, address, token); err != nil {

// TODO: Handle worker node specific cleanup.
// If the node setup unrecoverably fails after the worker has
// registered itself to the cluster, the worker needs to remove itself again.
// For that:
// - we need an endpoint on the control-plane with which workers can remove themselves.
// - we need unique worker tokens (right now, all workers share the same one) so that
// each worker can only remove itself and not other workers.
return fmt.Errorf("failed to join k8sd cluster as worker: %w", err)
}
} else {
if err := c.joinControlPlaneNode(ctx, name, address, token); err != nil {
// TODO(neoaggelos): print message that join failed, and that we are cleaning up
c.CleanupNode(ctx, name)
return fmt.Errorf("failed to join k8sd cluster as control plane: %w", err)
}
request := apiv1.JoinNodeRequest{
Name: name,
Address: address,
Token: token,
}
var response apiv1.JoinNodeResponse
err := c.mc.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster", "join"), request, &response)
if err != nil {
fmt.Fprintln(os.Stderr, "failed to join node - cleaning up now")
c.CleanupNode(ctx, name)
return fmt.Errorf("failed to query endpoint POST /k8sd/cluster/join: %w", err)
}

c.WaitForDqliteNodeToBeReady(ctx, name)
Expand Down
28 changes: 0 additions & 28 deletions src/k8s/pkg/k8s/client/token.go

This file was deleted.

23 changes: 23 additions & 0 deletions src/k8s/pkg/k8s/client/tokens.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package client

import (
"context"
"fmt"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/lxd/shared/api"
)

func (c *Client) CreateJoinToken(ctx context.Context, name string, worker bool) (string, error) {
request := apiv1.TokenRequest{
Name: name,
Worker: worker,
}
response := apiv1.TokensResponse{}

err := c.mc.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster", "tokens"), request, &response)
if err != nil {
return "", fmt.Errorf("failed to query endpoint POST /k8sd/cluster/tokens: %w", err)
}
return response.EncodedToken, nil
}
56 changes: 56 additions & 0 deletions src/k8s/pkg/k8sd/api/cluster_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package api

import (
"encoding/json"
"fmt"
"net/http"
"time"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/microcluster"
"github.com/canonical/microcluster/state"
)

func postClusterNode(s *state.State, r *http.Request) response.Response {
req := apiv1.JoinNodeRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return response.BadRequest(fmt.Errorf("failed to parse request: %w", err))
}

// differentiate between control plane and worker node tokens
info := &types.InternalWorkerNodeToken{}
if info.Decode(req.Token) == nil {
// valid worker node token
if err := joinWorkerNode(s, r, req.Name, req.Address, req.Token); err != nil {
return response.SmartError(fmt.Errorf("failed to join k8sd cluster as worker: %w", err))
}
} else {
if err := joinControlPlaneNode(s, r, req.Name, req.Address, req.Token); err != nil {
return response.SmartError(fmt.Errorf("failed to join k8sd cluster as control plane: %w", err))
}
}

return response.SyncResponse(true, &apiv1.JoinNodeResponse{})
}

func joinWorkerNode(s *state.State, r *http.Request, name, address, token string) error {
m, err := microcluster.App(r.Context(), microcluster.Args{
StateDir: s.OS.StateDir,
})
if err != nil {
return fmt.Errorf("failed to get microcluster app: %w", err)
}
return m.NewCluster(name, address, map[string]string{"workerToken": token}, time.Second*180)
}

func joinControlPlaneNode(s *state.State, r *http.Request, name, address, token string) error {
m, err := microcluster.App(r.Context(), microcluster.Args{
StateDir: s.OS.StateDir,
})
if err != nil {
return fmt.Errorf("failed to get microcluster app: %w", err)
}
return m.JoinCluster(name, address, token, nil, time.Second*180)
}
17 changes: 13 additions & 4 deletions src/k8s/pkg/k8sd/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ var Endpoints = []rest.Endpoint{
Path: "k8sd/cluster",
Get: rest.EndpointAction{Handler: getClusterStatus},
},
// Worker nodes
// Clustering
// Unified token endpoint for both, control-plane and worker-node.
{
Name: "Tokens",
Path: "k8sd/cluster/tokens",
Post: rest.EndpointAction{Handler: postTokens},
},
{
Name: "WorkerToken",
Path: "k8sd/worker/tokens",
Post: rest.EndpointAction{Handler: postWorkerToken},
Name: "JoinNode",
Path: "k8sd/cluster/join",
Post: rest.EndpointAction{Handler: postClusterNode},
// Joining a node is a bootstrapping action which needs to be available before k8sd is initialized.
AllowedBeforeInit: true,
},
// Worker nodes
{
Name: "WorkerInfo",
Path: "k8sd/worker/info",
Expand Down
85 changes: 85 additions & 0 deletions src/k8s/pkg/k8sd/api/tokens.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package api

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/k8sd/database"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/microcluster"
"github.com/canonical/microcluster/state"
)

func postTokens(s *state.State, r *http.Request) response.Response {
req := apiv1.TokenRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return response.BadRequest(fmt.Errorf("failed to parse request: %w", err))
}

var token string
var err error
if req.Worker {
token, err = createWorkerToken(s, r)
} else {
token, err = createControlPlaneToken(s, r, req.Name)
}

if err != nil {
return response.SmartError(fmt.Errorf("failed to create token: %w", err))
}

return response.SyncResponse(true, &apiv1.TokensResponse{EncodedToken: token})

}

func createWorkerToken(s *state.State, r *http.Request) (string, error) {
var token string
if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
var err error
token, err = database.GetOrCreateWorkerNodeToken(ctx, tx)
if err != nil {
return fmt.Errorf("failed to create worker node token: %w", err)
}
return err
}); err != nil {
return "", fmt.Errorf("database transaction failed: %w", err)
}

remoteAddresses := s.Remotes().Addresses()
addresses := make([]string, 0, len(remoteAddresses))
for _, addrPort := range remoteAddresses {
addresses = append(addresses, addrPort.String())
}

info := &types.InternalWorkerNodeToken{
Token: token,
JoinAddresses: addresses,
}
token, err := info.Encode()
if err != nil {
return "", fmt.Errorf("failed to encode join token: %w", err)
}

return token, nil
}

func createControlPlaneToken(s *state.State, r *http.Request, name string) (string, error) {
m, err := microcluster.App(r.Context(), microcluster.Args{
StateDir: s.OS.StateDir,
})
if err != nil {
return "", fmt.Errorf("failed to get microcluster app: %w", err)
}

c, err := m.LocalClient()
if err != nil {
return "", fmt.Errorf("failed to get local microcluster client: %w", err)
}

return c.RequestToken(r.Context(), name)
}
32 changes: 0 additions & 32 deletions src/k8s/pkg/k8sd/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,12 @@ import (

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/k8sd/database"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/state"
)

func postWorkerToken(s *state.State, r *http.Request) response.Response {
var token string
if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
var err error
token, err = database.GetOrCreateWorkerNodeToken(ctx, tx)
if err != nil {
return fmt.Errorf("failed to create worker node token: %w", err)
}
return nil
}); err != nil {
return response.InternalError(fmt.Errorf("database transaction failed: %w", err))
}

remoteAddresses := s.Remotes().Addresses()
addresses := make([]string, 0, len(remoteAddresses))
for _, addrPort := range remoteAddresses {
addresses = append(addresses, addrPort.String())
}

info := &types.InternalWorkerNodeToken{
Token: token,
JoinAddresses: addresses,
}
token, err := info.Encode()
if err != nil {
return response.InternalError(fmt.Errorf("failed to encode join token: %w", err))
}

return response.SyncResponse(true, &apiv1.WorkerNodeTokenResponse{EncodedToken: token})
}

func postWorkerInfo(s *state.State, r *http.Request) response.Response {
// TODO: move authentication through the HTTP token to an AccessHandler for the endpoint.
token := r.Header.Get("k8sd-token")
Expand Down

0 comments on commit 469aefc

Please sign in to comment.