diff --git a/cmd/gpud/command/command.go b/cmd/gpud/command/command.go index 59aa6988..ad5882e5 100644 --- a/cmd/gpud/command/command.go +++ b/cmd/gpud/command/command.go @@ -19,9 +19,10 @@ sudo gpud up ` var ( - logLevel string - debug bool - uid string + logLevel string + debug bool + statusWatch bool + uid string annotations string listenAddress string @@ -82,7 +83,7 @@ sudo gpud login --token cli.StringFlag{ Name: "endpoint", Usage: "endpoint for control plane", - Value: "mothership-machine-mothership-machine-dev.cloud.lepton.ai", + Value: "mothership-machine.app.lepton.ai", }, }, }, @@ -111,7 +112,38 @@ nohup sudo gpud run &>> & cli.StringFlag{ Name: "endpoint", Usage: "endpoint for checking in", - Value: "mothership-machine-mothership-machine-dev.cloud.lepton.ai", + Value: "mothership-machine.app.lepton.ai", + }, + }, + }, + { + Name: "kubeconfig", + Usage: "Writes the kubeconfig with gpud.", + Action: cmdKubeConfig, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "file", + Usage: "file path to output the kubelet config", + }, + cli.StringFlag{ + Name: "region", + Usage: "region of target cluster", + }, + cli.StringFlag{ + Name: "cluster", + Usage: "name of target cluster", + }, + cli.StringFlag{ + Name: "role", + Usage: "role", + }, + cli.StringFlag{ + Name: "session", + Usage: "cluster session name", + }, + cli.StringFlag{ + Name: "cluster-ca", + Usage: "cluster ca file path", }, }, }, @@ -189,7 +221,7 @@ sudo rm /etc/systemd/system/gpud.service cli.StringFlag{ Name: "endpoint", Usage: "endpoint for control plane", - Value: "mothership-machine-mothership-machine-dev.cloud.lepton.ai", + Value: "mothership-machine.app.lepton.ai", }, &cli.BoolTFlag{ Name: "enable-auto-update", @@ -357,6 +389,13 @@ sudo rm /etc/systemd/system/gpud.service Usage: "checks the status of gpud", Action: cmdStatus, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "watch, w", + Usage: "watch for package install status", + Destination: &statusWatch, + }, + }, }, { Name: "logs", @@ -438,6 +477,43 @@ cat summary.txt }, }, }, + { + Name: "join", + Usage: "join gpud machine into a lepton cluster", + UsageText: `# to join gpud into a lepton cluster +sudo gpud join +`, + Action: cmdJoin, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "endpoint", + Usage: "endpoint for control plane", + Value: "mothership-machine.app.lepton.ai", + }, + cli.StringFlag{ + Name: "cluster-name", + Usage: "cluster name for control plane (e.g.: lepton-prod-0)", + Value: "lepton-prod-0", + }, + cli.StringFlag{ + Name: "provider", + Usage: "provider of the machine", + Value: "personal", + }, + cli.StringFlag{ + Name: "node-group", + Usage: "node group to join", + }, + cli.BoolFlag{ + Name: "skip-interactive", + Usage: "use detected value instead of prompting for user input", + }, + cli.StringFlag{ + Name: "extra-info", + Usage: "base64 encoded extra info to pass to control plane", + }, + }, + }, } return app diff --git a/cmd/gpud/command/join.go b/cmd/gpud/command/join.go new file mode 100644 index 00000000..ba05ce4a --- /dev/null +++ b/cmd/gpud/command/join.go @@ -0,0 +1,302 @@ +package command + +import ( + "archive/tar" + "bufio" + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/urfave/cli" + + "github.com/leptonai/gpud/components/accelerator" + "github.com/leptonai/gpud/components/state" + "github.com/leptonai/gpud/config" + "github.com/leptonai/gpud/internal/login" + "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/pkg/asn" + latency_edge "github.com/leptonai/gpud/pkg/latency/edge" + "github.com/leptonai/gpud/pkg/process" +) + +func cmdJoin(cliContext *cli.Context) (retErr error) { + rootCtx, rootCancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer rootCancel() + endpoint := cliContext.String("endpoint") + clusterName := cliContext.String("cluster-name") + provider := cliContext.String("provider") + nodeGroup := cliContext.String("node-group") + extraInfo := cliContext.String("extra-info") + + stateFile, err := config.DefaultStateFile() + if err != nil { + return fmt.Errorf("failed to get state file: %w", err) + } + db, err := state.Open(stateFile) + if err != nil { + return fmt.Errorf("failed to open state file: %w", err) + } + defer db.Close() + + uid, _, err := state.CreateMachineIDIfNotExist(rootCtx, db, "") + if err != nil { + return fmt.Errorf("failed to get machine uid: %w", err) + } + + cmd := exec.Command("nproc") + var out bytes.Buffer + cmd.Stdout = &out + if err = cmd.Run(); err != nil { + return fmt.Errorf("executing nproc: %w", err) + } + + totalCPU, err := strconv.ParseInt(strings.TrimSpace(out.String()), 10, 64) + if err != nil { + return fmt.Errorf("error parsing cpu: %w", err) + } + + _, productName, err := accelerator.DetectTypeAndProductName(rootCtx) + if err != nil { + return err + } + + // network section + publicIP, _ := login.PublicIP() + region := "unknown" + detectProvider := "unknown" + latencies, _ := latency_edge.Measure(rootCtx) + var closest int64 + for _, latency := range latencies { + if closest == 0 { + closest = latency.LatencyMilliseconds + region = latency.RegionCode + } + if latency.LatencyMilliseconds < closest { + closest = latency.LatencyMilliseconds + region = latency.RegionCode + } + } + asnResult, err := asn.GetASLookup(publicIP) + if err != nil { + log.Logger.Errorf("failed to get asn lookup: %v", err) + } else { + detectProvider = asnResult.AsnName + } + + if !cliContext.Bool("skip-interactive") { + reader := bufio.NewReader(os.Stdin) + var input string + if productName != "unknown" { + fmt.Printf("We detect your gpu type is %v, if this is corrent, press Enter. If not, please enter your gpu shape below\n", productName) + input, err = reader.ReadString('\n') + if err != nil { + fmt.Println("Error reading input:", err) + return + } + if input != "\n" { + productName = strings.TrimSpace(input) + } + } + + fmt.Printf("We detect your public IP is %v, if this is corrent, press Enter. If not, please enter your public IP below\n", publicIP) + input, err = reader.ReadString('\n') + if err != nil { + fmt.Println("Error reading input:", err) + return + } + if input != "\n" { + publicIP = strings.TrimSpace(input) + } + + if provider == "personal" { + fmt.Printf("Provider name not specified, we detected your provider is %v, if correct, press Enter. If not, please enter your provider's name below\n", detectProvider) + input, err = reader.ReadString('\n') + if err != nil { + fmt.Println("Error reading input:", err) + return + } + if input != "\n" { + provider = strings.TrimSpace(input) + } else { + provider = detectProvider + } + } + + fmt.Printf("We detect your region is %v, if this is corrent, press Enter. If not, please enter your region below\n", region) + input, err = reader.ReadString('\n') + if err != nil { + fmt.Println("Error reading input:", err) + return + } + if input != "\n" { + region = strings.TrimSpace(input) + } + } + + type payload struct { + ID string `json:"id"` + ClusterName string `json:"cluster_name"` + PublicIP string `json:"public_ip"` + Provider string `json:"provider"` + ProviderGPUShape string `json:"provider_gpu_shape"` + TotalCPU int64 `json:"total_cpu"` + NodeGroup string `json:"node_group"` + ExtraInfo string `json:"extra_info"` + Region string `json:"region"` + } + type RespErr struct { + Error string `json:"error"` + Status string `json:"status"` + } + content := payload{ + ID: uid, + ClusterName: clusterName, + PublicIP: publicIP, + Provider: provider, + ProviderGPUShape: productName, + TotalCPU: totalCPU, + NodeGroup: nodeGroup, + ExtraInfo: extraInfo, + Region: region, + } + rawPayload, _ := json.Marshal(&content) + fmt.Println("Your machine will be initialized with following configuration, please press Enter if it is ok") + prettyJSON, _ := json.MarshalIndent(content, "", " ") + fmt.Println(string(prettyJSON)) + fmt.Printf("%sWarning: GPUd will upgrade your container runtime to containerd, will affect your current running containers (if any)%s\n", "\033[33m", "\033[0m") + fmt.Printf("%sWarning: GPUd will Reboot your machine to finish necessary setup%s\n", "\033[33m", "\033[0m") + fmt.Printf("Please look carefully about the above warning, if ok, please hit Enter\n") + if !cliContext.Bool("skip-interactive") { + reader := bufio.NewReader(os.Stdin) + input, _ := reader.ReadString('\n') + if input != "\n" { + fmt.Println("Non empty input received, GPUd join aborted.") + return nil + } + } + fmt.Println("Please wait while control plane is initializing basic setup for your machine, this may take up to one minute...") + response, err := http.Post(fmt.Sprintf("https://%s/api/v1/join", endpoint), "application/json", bytes.NewBuffer(rawPayload)) + if err != nil { + return err + } + defer response.Body.Close() + if response.StatusCode != http.StatusOK { + body, err := io.ReadAll(response.Body) + if err != nil { + return fmt.Errorf("error reading response body: %w", err) + } + var errorResponse RespErr + err = json.Unmarshal(body, &errorResponse) + if err != nil { + return fmt.Errorf("Error parsing error response: %v\nResponse body: %s", err, body) + } + return fmt.Errorf("failed to join: %v", errorResponse) + } + if err := handleJoinResponse(rootCtx, response.Body); err != nil { + return err + } + fmt.Println("Basic setup finished, GPUd is installing necessary components onto your machine, this may take 10 - 15 minutes.\nYou can run `gpud status` or `gpud status -w` to check the progress of each component.") + return nil +} + +func handleJoinResponse(ctx context.Context, body io.Reader) error { + dir, err := untarFiles("/tmp/", body) + if err != nil { + return err + } + scriptPath := filepath.Join(dir, "join.sh") + return runCommand(ctx, scriptPath, nil) +} + +func untarFiles(targetDir string, body io.Reader) (string, error) { + var dir string + gzipReader, err := gzip.NewReader(body) + if err != nil { + return "", fmt.Errorf("failed to create gzip reader: %w", err) + } + defer gzipReader.Close() + + tarReader := tar.NewReader(gzipReader) + + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + return "", err + } + + fpath := filepath.Join(targetDir, header.Name) + if dir == "" { + dir = fpath + } + + switch header.Typeflag { + case tar.TypeDir: + if err := os.MkdirAll(fpath, os.ModePerm); err != nil { + panic(err) + } + case tar.TypeReg: + outFile, err := os.Create(fpath) + if err != nil { + panic(err) + } + defer outFile.Close() + + if _, err := io.Copy(outFile, tarReader); err != nil { + panic(err) + } + } + } + return dir, nil +} + +func runCommand(ctx context.Context, script string, result *string) error { + var ops []process.OpOption + + p, err := process.New(append(ops, process.WithCommand("bash", script))...) + if err != nil { + return err + } + if result != nil { + go func() { + stdoutReader := p.StdoutReader() + if stdoutReader == nil { + log.Logger.Errorf("failed to read stdout: %v", err) + return + } + rawResult, err := io.ReadAll(p.StdoutReader()) + if err != nil { + log.Logger.Errorf("failed to read stout: %v", err) + return + } + *result = strings.TrimSpace(string(rawResult)) + }() + } + if err = p.Start(ctx); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-p.Wait(): + if err != nil { + return err + } + } + if err := p.Abort(ctx); err != nil { + return err + } + return nil +} diff --git a/cmd/gpud/command/kubeconfig.go b/cmd/gpud/command/kubeconfig.go new file mode 100644 index 00000000..18b11bb5 --- /dev/null +++ b/cmd/gpud/command/kubeconfig.go @@ -0,0 +1,46 @@ +package command + +import ( + "context" + "time" + + "github.com/urfave/cli" + + "github.com/leptonai/gpud/go-pkg/aws" + "github.com/leptonai/gpud/go-pkg/aws/eks" + "github.com/leptonai/gpud/log" +) + +func cmdKubeConfig(cliContext *cli.Context) (retErr error) { + cfg, err := aws.New(&aws.Config{ + Region: cliContext.String("region"), + }) + if err != nil { + log.Logger.Warnw("failed to create aws config", + "error", err, + ) + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + cluster, err := eks.GetCluster(ctx, cfg, cliContext.String("cluster")) + cancel() + if err != nil { + log.Logger.Warnw("failed to get EKS cluster", + "error", err, + ) + return err + } + _, err = cluster.WriteKubeconfigWithAWSIAMAuthenticator( + eks.WithKubeconfigFile(cliContext.String("file")), + eks.WithClusterCAFile(cliContext.String("cluster-ca")), + eks.WithRoleARN(cliContext.String("role")), + eks.WithSessionName(cliContext.String("session")), + ) + if err != nil { + log.Logger.Warnw("failed to write kubeconfig/ca", + "error", err, + ) + return err + } + return nil +} diff --git a/cmd/gpud/command/run.go b/cmd/gpud/command/run.go index c06e509f..27215142 100644 --- a/cmd/gpud/command/run.go +++ b/cmd/gpud/command/run.go @@ -10,6 +10,7 @@ import ( "github.com/leptonai/gpud/config" lepServer "github.com/leptonai/gpud/internal/server" "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/manager" pkd_systemd "github.com/leptonai/gpud/pkg/systemd" "github.com/leptonai/gpud/version" @@ -96,8 +97,13 @@ func cmdRun(cliContext *cli.Context) error { // start the signal handler as soon as we can to make sure that // we don't miss any signals during boot signal.Notify(signals, handledSignals...) + m, err := manager.New() + if err != nil { + return err + } + m.Start(rootCtx) - server, err := lepServer.New(rootCtx, cfg, cliContext.String("endpoint"), uid, configOpts...) + server, err := lepServer.New(rootCtx, cfg, cliContext.String("endpoint"), uid, m, configOpts...) if err != nil { return err } diff --git a/cmd/gpud/command/status.go b/cmd/gpud/command/status.go index 358360e5..3065eb1b 100644 --- a/cmd/gpud/command/status.go +++ b/cmd/gpud/command/status.go @@ -2,11 +2,16 @@ package command import ( "context" + "crypto/tls" + "encoding/json" "fmt" + "io" + "net/http" "time" client "github.com/leptonai/gpud/client/v1" "github.com/leptonai/gpud/config" + "github.com/leptonai/gpud/manager/packages" "github.com/leptonai/gpud/pkg/systemd" "github.com/urfave/cli" @@ -37,5 +42,57 @@ func cmdStatus(cliContext *cli.Context) error { } fmt.Printf("%s successfully checked gpud health\n", checkMark) + for { + packageStatus, err := getStatus() + if err != nil { + fmt.Printf("%s failed to get package status: %v\n", warningSign, err) + return err + } + if statusWatch { + fmt.Print("\033[2J\033[H") + } + var totalTime int64 + var progress int64 + for _, status := range packageStatus { + totalTime += status.TotalTime.Milliseconds() + progress += status.TotalTime.Milliseconds() * int64(status.Progress) / 100 + } + + fmt.Printf("Total progress: %v%%, Estimate time left: %v\n", progress*100/totalTime, time.Duration(totalTime-progress)*time.Millisecond) + if !statusWatch { + break + } + time.Sleep(3 * time.Second) + } + return nil } + +func getStatus() ([]packages.PackageStatus, error) { + httpClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + req, err := http.NewRequest("GET", fmt.Sprintf("https://localhost:%d/admin/packages", config.DefaultGPUdPort), nil) + if err != nil { + return nil, err + } + resp, err := httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %v received", resp.StatusCode) + } + rawBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var ret []packages.PackageStatus + if err := json.Unmarshal(rawBody, &ret); err != nil { + return nil, err + } + return ret, nil +} diff --git a/components/info/component.go b/components/info/component.go index 4077ca3d..af456c37 100644 --- a/components/info/component.go +++ b/components/info/component.go @@ -3,12 +3,14 @@ package info import ( "context" + "encoding/json" "fmt" "net" "time" "github.com/leptonai/gpud/components" "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/manager" "github.com/leptonai/gpud/version" ) @@ -33,6 +35,7 @@ const ( StateKeyDaemonVersion = "daemon_version" StateKeyMacAddress = "mac_address" + StateKeyPackages = "packages" StateNameAnnotations = "annotations" ) @@ -52,6 +55,15 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { } } + var managedPackages string + if manager.GlobalController != nil { + packageStatus, err := manager.GlobalController.Status(ctx) + if err != nil { + return nil, err + } + rawPayload, _ := json.Marshal(&packageStatus) + managedPackages = string(rawPayload) + } return []components.State{ { Name: StateNameDaemon, @@ -60,6 +72,7 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { ExtraInfo: map[string]string{ StateKeyDaemonVersion: version.Version, StateKeyMacAddress: mac, + StateKeyPackages: managedPackages, }, }, { diff --git a/go-pkg/aws/aws.go b/go-pkg/aws/aws.go new file mode 100644 index 00000000..2229c4ab --- /dev/null +++ b/go-pkg/aws/aws.go @@ -0,0 +1,54 @@ +package aws + +import ( + "context" + "errors" + "fmt" + "time" + + aws_v2 "github.com/aws/aws-sdk-go-v2/aws" + config_v2 "github.com/aws/aws-sdk-go-v2/config" +) + +// Config defines a top-level AWS API configuration to create a session. +type Config struct { + // DebugAPICalls is true to log all AWS API call debugging messages. + DebugAPICalls bool + + // Region is a separate AWS geographic area for EKS service. + // Each AWS Region has multiple, isolated locations known as Availability Zones. + Region string +} + +// New creates a new AWS session. +// Specify a custom endpoint for tests. +func New(cfg *Config) (awsCfg aws_v2.Config, err error) { + if cfg == nil { + return aws_v2.Config{}, errors.New("got empty config") + } + if cfg.Region == "" { + return aws_v2.Config{}, fmt.Errorf("missing region") + } + + optFns := []func(*config_v2.LoadOptions) error{ + (func(*config_v2.LoadOptions) error)(config_v2.WithRegion(cfg.Region)), + } + if cfg.DebugAPICalls { + lvl := aws_v2.LogSigning | + aws_v2.LogRetries | + aws_v2.LogRequest | + aws_v2.LogRequestWithBody | + aws_v2.LogResponse | + aws_v2.LogResponseWithBody + optFns = append(optFns, (func(*config_v2.LoadOptions) error)(config_v2.WithClientLogMode(lvl))) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + awsCfg, err = config_v2.LoadDefaultConfig(ctx, optFns...) + cancel() + if err != nil { + return aws_v2.Config{}, fmt.Errorf("failed to load config %v", err) + } + + return awsCfg, nil +} diff --git a/go-pkg/aws/eks/eks.go b/go-pkg/aws/eks/eks.go new file mode 100644 index 00000000..66b8b753 --- /dev/null +++ b/go-pkg/aws/eks/eks.go @@ -0,0 +1,421 @@ +// Package eks implements EKS utils. +package eks + +import ( + "bytes" + "context" + "encoding/base64" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + aws_eks_v2 "github.com/aws/aws-sdk-go-v2/service/eks" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/olekukonko/tablewriter" + clientcmd_api_v1 "k8s.io/client-go/tools/clientcmd/api/v1" + "sigs.k8s.io/yaml" + + "github.com/leptonai/gpud/go-pkg/randutil" + "github.com/leptonai/gpud/log" +) + +type Op struct { + clusterName string + limit int + kubeconfigFile string + clusterCAFile string + roleARN string + sessionName string +} + +type OpOption func(*Op) + +func (op *Op) applyOpts(opts []OpOption) { + for _, opt := range opts { + opt(op) + } +} + +func WithClusterName(name string) OpOption { + return func(op *Op) { + op.clusterName = name + } +} + +func WithLimit(limit int) OpOption { + return func(op *Op) { + op.limit = limit + } +} + +// Set kubeconfig file path to write. +func WithKubeconfigFile(v string) OpOption { + return func(op *Op) { + op.kubeconfigFile = v + } +} + +// Set cluster CA file path to write. +func WithClusterCAFile(v string) OpOption { + return func(op *Op) { + op.clusterCAFile = v + } +} + +func WithRoleARN(v string) OpOption { + return func(op *Op) { + op.roleARN = v + } +} + +func WithSessionName(v string) OpOption { + return func(op *Op) { + op.sessionName = v + } +} + +func GetCluster(ctx context.Context, cfg aws.Config, clusterName string, opts ...OpOption) (Cluster, error) { + cli := aws_eks_v2.NewFromConfig(cfg) + return getClusterWithClient(ctx, cfg.Region, cli, clusterName, opts...) +} + +func getClusterWithClient(ctx context.Context, region string, cli *aws_eks_v2.Client, clusterName string, opts ...OpOption) (Cluster, error) { + cs, err := listClusters(ctx, region, cli, append(opts, WithClusterName(clusterName), WithLimit(1))...) + if err != nil { + return Cluster{}, err + } + if len(cs) != 1 { + return Cluster{}, errors.New("not found") + } + return cs[0], nil +} + +func listClusters(ctx context.Context, region string, cli *aws_eks_v2.Client, opts ...OpOption) ([]Cluster, error) { + ret := &Op{} + ret.applyOpts(opts) + + clusters := make([]Cluster, 0) + + var nextToken *string = nil +done: + for i := 0; i < 20; i++ { + clusterNames := []string{ret.clusterName} + if ret.clusterName == "" { + out, err := cli.ListClusters(ctx, &aws_eks_v2.ListClustersInput{ + NextToken: nextToken, + }) + if err != nil { + return nil, err + } + clusterNames = out.Clusters + nextToken = out.NextToken + } + + log.Logger.Infof("inspecting %d clusters", len(clusterNames)) + for _, cname := range clusterNames { + cl, err := inspectCluster(ctx, region, "UNKNOWN", cli, nil, cname, opts...) + if err != nil { + return nil, err + } + + clusters = append(clusters, cl) + if ret.limit > 0 && len(clusters) >= ret.limit { + log.Logger.Infof("already listed %d clusters with limit %d -- skipping the rest", len(clusters), ret.limit) + break done + } + } + + log.Logger.Infof("listed %d clusters so far with limit %d", len(clusters), ret.limit) + if nextToken == nil { + // no more resources are available + break + } + + // TODO: add wait to prevent api throttle (rate limit)? + } + + sort.SliceStable(clusters, func(i, j int) bool { + return clusters[i].ARN < clusters[j].ARN + }) + return clusters, nil +} + +func inspectCluster( + ctx context.Context, + region string, + mothershipState string, + eksAPI *aws_eks_v2.Client, + vpcToELBv2s map[string][]string, + clusterName string, + opts ...OpOption, +) (Cluster, error) { + ret := &Op{} + ret.applyOpts(opts) + + eksOut, err := eksAPI.DescribeCluster( + ctx, + &aws_eks_v2.DescribeClusterInput{ + Name: &clusterName, + }, + ) + if err != nil { + if IsErrClusterDeleted(err) { + log.Logger.Infof("cluster %q already deleted", clusterName) + return Cluster{Name: clusterName, Status: "DELETED"}, nil + } + return Cluster{}, err + } + + platformVeresion := "UNKNOWN" + if eksOut.Cluster.PlatformVersion != nil { + platformVeresion = *eksOut.Cluster.PlatformVersion + } + vpcID := "" + if eksOut.Cluster.ResourcesVpcConfig != nil { + vpcID = *eksOut.Cluster.ResourcesVpcConfig.VpcId + } + + oidcIssuer := "" + if eksOut.Cluster.Identity != nil && eksOut.Cluster.Identity.Oidc != nil { + oidcIssuer = *eksOut.Cluster.Identity.Oidc.Issuer + } + + endpoint := "" + if eksOut.Cluster.Endpoint != nil { + endpoint = *eksOut.Cluster.Endpoint + } + + ca := "" + if eksOut.Cluster.CertificateAuthority != nil { + ca = *eksOut.Cluster.CertificateAuthority.Data + } + + version, status, health := GetClusterStatus(eksOut) + attachedELBs := make([]string, 0) + if vpcToELBv2s != nil { + attachedELBs = vpcToELBv2s[vpcID] + } + c := Cluster{ + Name: clusterName, + ARN: *eksOut.Cluster.Arn, + Region: region, + + Version: version, + PlatformVersion: platformVeresion, + MothershipState: mothershipState, + Status: status, + Health: health, + + CreatedAt: *eksOut.Cluster.CreatedAt, + + VPCID: vpcID, + ClusterSGID: *eksOut.Cluster.ResourcesVpcConfig.ClusterSecurityGroupId, + AttachedELBv2ARNs: attachedELBs, + + Endpoint: endpoint, + CertificateAuthority: ca, + OIDCIssuer: oidcIssuer, + } + + return c, nil +} + +type Cluster struct { + Name string `json:"name"` + ARN string `json:"arn"` + Region string `json:"region"` + + Version string `json:"version"` + PlatformVersion string `json:"platform_version"` + MothershipState string `json:"mothership_state"` + Status string `json:"status"` + Health string `json:"health"` + + CreatedAt time.Time `json:"created_at"` + + VPCID string `json:"vpc_id"` + ClusterSGID string `json:"cluster_sg_id"` + AttachedELBv2ARNs []string `json:"attached_elbv2_arns,omitempty"` + + Endpoint string `json:"endpoint"` + CertificateAuthority string `json:"certificate_authority"` + OIDCIssuer string `json:"oidc_issuer"` +} + +func (c Cluster) String() string { + buf := bytes.NewBuffer(nil) + tb := tablewriter.NewWriter(buf) + tb.SetAutoWrapText(false) + tb.SetAlignment(tablewriter.ALIGN_LEFT) + tb.SetCenterSeparator("*") + tb.SetRowLine(true) + tb.Append([]string{"CLUSTER KIND", "EKS"}) + tb.Append([]string{"NAME", c.Name}) + tb.Append([]string{"ARN", c.ARN}) + tb.Append([]string{"VERSION", c.Version}) + tb.Append([]string{"PLATFORM VERSION", c.PlatformVersion}) + tb.Append([]string{"MOTHERSHIP STATE", c.MothershipState}) + tb.Append([]string{"STATUS", c.Status}) + tb.Append([]string{"HEALTH", c.Health}) + tb.Append([]string{"CREATED AT", c.CreatedAt.String()}) + tb.Append([]string{"VPC ID", c.VPCID}) + tb.Append([]string{"SG ID", c.ClusterSGID}) + for i, arn := range c.AttachedELBv2ARNs { + tb.Append([]string{fmt.Sprintf("ATTACHED ELBv2 ARN #%d", i+1), arn}) + } + tb.Render() + + rs := buf.String() + + return rs +} + +func (c Cluster) KubeconfigWithAWSIAMAuthenticator(opts ...OpOption) (clientcmd_api_v1.Config, error) { + ret := &Op{} + ret.applyOpts(opts) + + cmd, err := exec.LookPath("aws-iam-authenticator") + if err != nil { + return clientcmd_api_v1.Config{}, fmt.Errorf("aws cli not found %w", err) + } + + decoded, err := base64.StdEncoding.DecodeString(c.CertificateAuthority) + if err != nil { + return clientcmd_api_v1.Config{}, fmt.Errorf("failed to decode certificate authority %w", err) + } + + args := []string{ + "token", + "--region", + c.Region, + "--cluster-id", + c.Name, + } + if ret.roleARN != "" { + args = append(args, "--role", ret.roleARN) + } + if ret.sessionName != "" { + args = append(args, "--session-name", ret.sessionName) + } + + kcfg := clientcmd_api_v1.Config{ + Clusters: []clientcmd_api_v1.NamedCluster{ + { + Name: c.ARN, + Cluster: clientcmd_api_v1.Cluster{ + Server: c.Endpoint, + CertificateAuthorityData: decoded, + }, + }, + }, + Contexts: []clientcmd_api_v1.NamedContext{ + { + Name: c.ARN, + Context: clientcmd_api_v1.Context{ + Cluster: c.ARN, + AuthInfo: c.ARN, + }, + }, + }, + CurrentContext: c.ARN, + AuthInfos: []clientcmd_api_v1.NamedAuthInfo{ + { + Name: c.ARN, + AuthInfo: clientcmd_api_v1.AuthInfo{ + Exec: &clientcmd_api_v1.ExecConfig{ + APIVersion: "client.authentication.k8s.io/v1beta1", + Command: cmd, + Args: args, + }, + }, + }, + }, + } + return kcfg, nil +} + +func (c Cluster) WriteKubeconfigWithAWSIAMAuthenticator(opts ...OpOption) (string, error) { + kcfg, err := c.KubeconfigWithAWSIAMAuthenticator(opts...) + if err != nil { + return "", err + } + return c.writeKubeconfigYAML(kcfg, opts...) +} + +func (c Cluster) writeKubeconfigYAML(kcfg clientcmd_api_v1.Config, opts ...OpOption) (string, error) { + ret := &Op{} + ret.applyOpts(opts) + + b, err := yaml.Marshal(kcfg) + if err != nil { + return "", err + } + + if ret.kubeconfigFile == "" { + ret.kubeconfigFile = filepath.Join(os.TempDir(), fmt.Sprintf("kubeconfig-%s", randutil.AlphabetsLowerCase(32))) + } + + if _, err := os.Stat(filepath.Dir(ret.kubeconfigFile)); os.IsNotExist(err) { + if err = os.MkdirAll(filepath.Dir(ret.kubeconfigFile), 0755); err != nil { + return "", err + } + } + + log.Logger.Infow("writing kubeconfig", "file", ret.kubeconfigFile) + if err = os.WriteFile(ret.kubeconfigFile, b, 0644); err != nil { + return "", err + } + + if ret.clusterCAFile != "" { + if _, err := os.Stat(filepath.Dir(ret.clusterCAFile)); os.IsNotExist(err) { + if err = os.MkdirAll(filepath.Dir(ret.clusterCAFile), 0755); err != nil { + return "", err + } + } + decoded, err := base64.StdEncoding.DecodeString(c.CertificateAuthority) + if err != nil { + return "", fmt.Errorf("failed to decode certificate authority %w", err) + } + log.Logger.Infow("writing cluster ca", "file", ret.clusterCAFile) + if err = os.WriteFile(ret.clusterCAFile, decoded, 0644); err != nil { + return "", err + } + } + + return ret.kubeconfigFile, nil +} + +// Returns version, status, and health information. +func GetClusterStatus(out *aws_eks_v2.DescribeClusterOutput) (string, string, string) { + version := *out.Cluster.Version + status := string(out.Cluster.Status) + + health := "OK" + if out.Cluster.Health != nil && out.Cluster.Health.Issues != nil && len(out.Cluster.Health.Issues) > 0 { + health = fmt.Sprintf("%+v", out.Cluster.Health.Issues) + } + + return version, status, health +} + +func IsErrClusterDeleted(err error) bool { + if err == nil { + return false + } + awsErr, ok := err.(awserr.Error) + if ok && awsErr.Code() == "ResourceNotFoundException" && + strings.HasPrefix(awsErr.Message(), "No cluster found for") { + // ResourceNotFoundException: No cluster found for name: aws-k8s-tester-155468BC717E03B003\n\tstatus code: 404, request id: 1e3fe41c-b878-11e8-adca-b503e0ba731d + return true + } + + // must check the string + // sometimes EKS API returns untyped error value + return strings.Contains(err.Error(), "No cluster found for") +} diff --git a/go-pkg/randutil/randutil.go b/go-pkg/randutil/randutil.go new file mode 100644 index 00000000..39ceff4b --- /dev/null +++ b/go-pkg/randutil/randutil.go @@ -0,0 +1,31 @@ +package randutil + +import ( + "math/rand" +) + +const ( + alphabetsLowerCase = "abcdefghijklmnopqrstuvwxyz" + alphaLowerNumerics = "0123456789abcdefghijklmnopqrstuvwxyz" + alphaNumericsWithSpecialCharacters = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ?!@#$%^&*()-=_+[]{}';:,.<>" +) + +func AlphabetsLowerCase(n int) string { + return string(randBytes(alphabetsLowerCase, n)) +} + +func StringAlphaNumeric(n int) string { + return string(randBytes(alphaLowerNumerics, n)) +} + +func StringAlphaNumericWithSpecialCharacters(n int) string { + return string(randBytes(alphaNumericsWithSpecialCharacters, n)) +} + +func randBytes(pattern string, n int) []byte { + b := make([]byte, n) + for i := range b { + b[i] = pattern[rand.Intn(len(pattern))] + } + return b +} diff --git a/go.mod b/go.mod index d6f24b6b..94c2de0f 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,14 @@ toolchain go1.23.2 require ( github.com/NVIDIA/go-nvlib v0.7.0 github.com/NVIDIA/go-nvml v0.12.4-0 + github.com/aws/aws-sdk-go v1.55.5 + github.com/aws/aws-sdk-go-v2 v1.30.4 + github.com/aws/aws-sdk-go-v2/config v1.26.5 + github.com/aws/aws-sdk-go-v2/service/eks v1.48.2 github.com/coreos/go-systemd/v22 v22.5.0 github.com/docker/docker v25.0.6+incompatible github.com/dustin/go-humanize v1.0.1 + github.com/fsnotify/fsnotify v1.7.0 github.com/gin-contrib/gzip v1.0.1 github.com/gin-contrib/requestid v1.0.2 github.com/gin-contrib/zap v1.1.3 @@ -33,6 +38,7 @@ require ( google.golang.org/grpc v1.65.0 k8s.io/api v0.32.0-alpha.0 k8s.io/apimachinery v0.32.0-alpha.0 + k8s.io/client-go v0.29.1 k8s.io/cri-api v0.32.0-alpha.0 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/yaml v1.4.0 @@ -45,6 +51,17 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/akutz/memconn v0.1.0 // indirect github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.16.16 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 // indirect + github.com/aws/smithy-go v1.20.4 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect @@ -59,7 +76,6 @@ require ( github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect @@ -80,6 +96,7 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/nftables v0.2.1-0.20240414091927-5e242ec57806 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 // indirect github.com/jsimonetti/rtnetlink v1.4.0 // indirect diff --git a/go.sum b/go.sum index 7e73b472..eff83e7b 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,36 @@ github.com/akutz/memconn v0.1.0 h1:NawI0TORU4hcOMsMr11g7vwlCdkYeLKXBcxWu2W/P8A= github.com/akutz/memconn v0.1.0/go.mod h1:Jo8rI7m0NieZyLI5e2CDlRdRqRRB4S7Xp77ukDjH+Fw= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4= +github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= +github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/config v1.26.5 h1:lodGSevz7d+kkFJodfauThRxK9mdJbyutUxGq1NNhvw= +github.com/aws/aws-sdk-go-v2/config v1.26.5/go.mod h1:DxHrz6diQJOc9EwDslVRh84VjjrE17g+pVZXUeSxaDU= +github.com/aws/aws-sdk-go-v2/credentials v1.16.16 h1:8q6Rliyv0aUFAVtzaldUEcS+T5gbadPbWdV1WcAddK8= +github.com/aws/aws-sdk-go-v2/credentials v1.16.16/go.mod h1:UHVZrdUsv63hPXFo1H7c5fEneoVo9UXiz36QG1GEPi0= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11 h1:c5I5iH+DZcH3xOIMlz3/tCKJDaHFwYEmxvlh2fAcFo8= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11/go.mod h1:cRrYDYAMUohBJUtUnOhydaMHtiK/1NZ0Otc9lIb6O0Y= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 h1:GrSw8s0Gs/5zZ0SX+gX4zQjRnRsMJDJ2sLur1gRBhEM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY= +github.com/aws/aws-sdk-go-v2/service/eks v1.48.2 h1:EFjJfHrl7/2qh/ZawUXtl9juOPAUUOTFDLOmov5KSgM= +github.com/aws/aws-sdk-go-v2/service/eks v1.48.2/go.mod h1:fff5mmwLCVxyXCojYjPY34sUGvWtXCD325yRL5qHAVs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 h1:/b31bi3YVNlkzkBrm9LfpaKoaYZUxIAj4sHfOTmLfqw= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4/go.mod h1:2aGXHFmbInwgP9ZfpmdIfOELL79zhdNYNmReK8qDfdQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10 h1:DBYTXwIGQSGs9w4jKm60F5dmCQ3EEruxdc0MFh+3EY4= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10/go.mod h1:wohMUQiFdzo0NtxbBg0mSRGZ4vL3n0dKjLTINdcIino= +github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 h1:eajuO3nykDPdYicLlP3AGgOyVN3MOlFmZv7WGTuJPow= +github.com/aws/aws-sdk-go-v2/service/sso v1.18.7/go.mod h1:+mJNDdF+qiUlNKNC3fxn74WWNN+sOiGOEImje+3ScPM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7 h1:QPMJf+Jw8E1l7zqhZmMlFw6w1NmfkfiSK8mS4zOx3BA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7/go.mod h1:ykf3COxYI0UJmxcfcxcVuz7b6uADi1FkiUz6Eb7AgM8= +github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 h1:NzO4Vrau795RkUdSHKEwiR01FaGzGOH1EETJ+5QHnm0= +github.com/aws/aws-sdk-go-v2/service/sts v1.26.7/go.mod h1:6h2YuIoxaMSCFf5fi1EgZAwdfkGMgDY+DVfa61uLe4U= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= @@ -60,6 +90,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/emicklei/go-restful/v3 v3.11.2 h1:1onLa9DcsMYO9P+CXaL0dStDqQ2EHHXLiz+BtnqkLAU= +github.com/emicklei/go-restful/v3 v3.11.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -119,6 +151,10 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 h1:0VpGH+cDhbDtdcweoyCVsF3fhN8kejK6rFe/2FFX2nU= +github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49/go.mod h1:BkkQ4L1KS1xMt2aWSPStnn55ChGC0DPOn2FQYj+f25M= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -134,6 +170,10 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1 github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= github.com/hdevalence/ed25519consensus v0.2.0 h1:37ICyZqdyj0lAZ8P4D1d1id3HqbbG1N3iBb1Tb4rdcU= github.com/hdevalence/ed25519consensus v0.2.0/go.mod h1:w3BHWjwJbFU29IRHL1Iqkw3sus+7FctEyM4RqDxYNzo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 h1:elKwZS1OcdQ0WwEDBeqxKwb7WB62QX8bvZ/FJnVXIfk= @@ -419,10 +459,14 @@ k8s.io/api v0.32.0-alpha.0 h1:gK97a97Onqa3IJ4id0ZDRG7DEaB0ZFdhSz5tL4hPLEo= k8s.io/api v0.32.0-alpha.0/go.mod h1:2zVWBoCpfiUaKnR/J4otJ85V+Uw/wb6/CLOi8IlNZQ4= k8s.io/apimachinery v0.32.0-alpha.0 h1:bN/xQXi4xnFw/22UblQqrwUXgRv1lSVumOA81qAWF4Y= k8s.io/apimachinery v0.32.0-alpha.0/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= +k8s.io/client-go v0.29.1 h1:19B/+2NGEwnFLzt0uB5kNJnfTsbV8w6TgQRz9l7ti7A= +k8s.io/client-go v0.29.1/go.mod h1:TDG/psL9hdet0TI9mGyHJSgRkW3H9JZk2dNEUS7bRks= k8s.io/cri-api v0.32.0-alpha.0 h1:Rs9prajcHWZAdy9ueQdD2R+OOnDD3rKYbM9hQ90iEQU= k8s.io/cri-api v0.32.0-alpha.0/go.mod h1:Po3TMAYH/+KrZabi7QiwQI4a692oZcUOUThd/rqwxrI= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 961b299f..d90b4be7 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -12,6 +12,7 @@ import ( lep_components "github.com/leptonai/gpud/components" lep_config "github.com/leptonai/gpud/config" + "github.com/leptonai/gpud/manager" "github.com/gin-gonic/gin" "sigs.k8s.io/yaml" @@ -145,3 +146,19 @@ func createConfigHandler(cfg *lep_config.Config) func(c *gin.Context) { } } } + +const ( + URLPathPackages = "/packages" + URLPathPackagesDesc = "Get the status of gpud managed packages" +) + +func createPackageHandler(m *manager.Manager) func(c *gin.Context) { + return func(c *gin.Context) { + packageStatus, err := m.Status(c) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"code": http.StatusInternalServerError, "message": "failed to get package status " + err.Error()}) + return + } + c.JSON(http.StatusOK, packageStatus) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index c9f59d0b..bccee802 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -86,6 +86,7 @@ import ( "github.com/leptonai/gpud/internal/login" "github.com/leptonai/gpud/internal/session" "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/manager" ) // Server is the gpud main daemon @@ -100,7 +101,7 @@ type Server struct { autoUpdateExitCode int } -func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID string, opts ...gpud_config.OpOption) (_ *Server, retErr error) { +func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID string, packageManager *manager.Manager, opts ...gpud_config.OpOption) (_ *Server, retErr error) { options := &gpud_config.Op{} if err := options.ApplyOpts(opts); err != nil { return nil, err @@ -921,6 +922,11 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID Path: path.Join("/admin", URLPathConfig), Desc: URLPathConfigDesc, }) + admin.GET(URLPathPackages, createPackageHandler(packageManager)) + registeredPaths = append(registeredPaths, componentHandlerDescription{ + Path: path.Join("/admin", URLPathPackages), + Desc: URLPathPackagesDesc, + }) if config.Pprof { log.Logger.Debugw("registering pprof handlers") diff --git a/internal/session/serve.go b/internal/session/serve.go index b1c0bc6a..3a6f8d8b 100644 --- a/internal/session/serve.go +++ b/internal/session/serve.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "os" + "strings" "time" v1 "github.com/leptonai/gpud/api/v1" @@ -73,35 +74,40 @@ func (s *Session) serve() { response.Events = events case "update": - if !s.enableAutoUpdate { - log.Logger.Warnw("auto update is disabled -- skipping update") - response.Error = errors.New("auto update is disabled") - break - } + if targetVersion := strings.Split(payload.UpdateVersion, ":"); len(targetVersion) == 2 { + err := update.PackageUpdate(targetVersion[0], targetVersion[1], update.DefaultUpdateURL) + log.Logger.Infow("Update received for machine", "version", targetVersion[1], "package", targetVersion[0], "error", err) + } else { + if !s.enableAutoUpdate { + log.Logger.Warnw("auto update is disabled -- skipping update") + response.Error = errors.New("auto update is disabled") + break + } - systemdManaged, _ := systemd.IsActive("gpud.service") - if s.autoUpdateExitCode == -1 && !systemdManaged { - log.Logger.Warnw("gpud is not managed with systemd and auto update by exit code is not set -- skipping update") - response.Error = errors.New("gpud is not managed with systemd") - break - } + systemdManaged, _ := systemd.IsActive("gpud.service") + if s.autoUpdateExitCode == -1 && !systemdManaged { + log.Logger.Warnw("gpud is not managed with systemd and auto update by exit code is not set -- skipping update") + response.Error = errors.New("gpud is not managed with systemd") + break + } - nextVersion := payload.UpdateVersion - if nextVersion == "" { - log.Logger.Warnw("target update_version is empty -- skipping update") - response.Error = errors.New("update_version is empty") - break - } + nextVersion := payload.UpdateVersion + if nextVersion == "" { + log.Logger.Warnw("target update_version is empty -- skipping update") + response.Error = errors.New("update_version is empty") + break + } - if systemdManaged { - response.Error = update.Update(nextVersion, update.DefaultUpdateURL) - break - } + if systemdManaged { + response.Error = update.Update(nextVersion, update.DefaultUpdateURL) + break + } - if s.autoUpdateExitCode != -1 { - response.Error = update.UpdateOnlyBinary(nextVersion, update.DefaultUpdateURL) - if response.Error == nil { - needExit = s.autoUpdateExitCode + if s.autoUpdateExitCode != -1 { + response.Error = update.UpdateOnlyBinary(nextVersion, update.DefaultUpdateURL) + if response.Error == nil { + needExit = s.autoUpdateExitCode + } } } } diff --git a/manager/controllers/package_controller.go b/manager/controllers/package_controller.go new file mode 100644 index 00000000..e77f2291 --- /dev/null +++ b/manager/controllers/package_controller.go @@ -0,0 +1,321 @@ +package controllers + +import ( + "context" + "io" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" + + "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/manager/packages" + "github.com/leptonai/gpud/pkg/process" +) + +type PackageController struct { + fileWatcher chan packages.PackageInfo + packageStatus map[string]*packages.PackageStatus + syncPeriod time.Duration + sync.RWMutex +} + +func NewPackageController(watcher chan packages.PackageInfo) *PackageController { + r := &PackageController{ + fileWatcher: watcher, + packageStatus: make(map[string]*packages.PackageStatus), + syncPeriod: 3 * time.Second, + } + return r +} + +func (c *PackageController) Status(ctx context.Context) ([]packages.PackageStatus, error) { + c.RLock() + defer c.RUnlock() + var ret []packages.PackageStatus + for _, pkg := range c.packageStatus { + ret = append(ret, *pkg) + } + sort.Sort(packages.PackageStatuses(ret)) + return ret, nil +} + +func (c *PackageController) Run(ctx context.Context) error { + go c.reconcileLoop(ctx) + go c.updateRunner(ctx) + go c.installRunner(ctx) + go c.statusRunner(ctx) + return nil +} + +func (c *PackageController) reconcileLoop(ctx context.Context) { + for { + select { + case packageInfo := <-c.fileWatcher: + c.Lock() + log.Logger.Infof("[package controller]: received package info: %v", packageInfo) + if _, ok := c.packageStatus[packageInfo.Name]; !ok { + c.packageStatus[packageInfo.Name] = &packages.PackageStatus{ + Name: packageInfo.Name, + IsInstalled: false, + Installing: false, + Progress: 0, + Status: false, + TargetVersion: "", + CurrentVersion: "", + ScriptPath: "", + Dependency: packageInfo.Dependency, + TotalTime: packageInfo.TotalTime, + } + } + c.packageStatus[packageInfo.Name].TotalTime = packageInfo.TotalTime + c.packageStatus[packageInfo.Name].Dependency = packageInfo.Dependency + c.packageStatus[packageInfo.Name].TargetVersion = packageInfo.TargetVersion + c.packageStatus[packageInfo.Name].ScriptPath = packageInfo.ScriptPath + c.Unlock() + case <-ctx.Done(): + return + } + } +} + +func (c *PackageController) updateRunner(ctx context.Context) { + ticker := time.NewTicker(c.syncPeriod) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ticker.Reset(c.syncPeriod) + } + for _, pkg := range c.packageStatus { + if !pkg.IsInstalled { + continue + } + var version string + err := runCommand(ctx, pkg.ScriptPath, "version", &version) + if err != nil || version == "" { + log.Logger.Errorf("[package controller]: %v unexpected version failure: %v, version: %s", pkg.Name, err, version) + continue + } + log.Logger.Infof("[package controller]: %v version is %v, target is %v", pkg.Name, version, pkg.TargetVersion) + c.Lock() + c.packageStatus[pkg.Name].CurrentVersion = version + c.Unlock() + if version == pkg.TargetVersion { + continue + } + var eta time.Duration + c.Lock() + c.packageStatus[pkg.Name].Installing = true + c.packageStatus[pkg.Name].Progress = 0 + eta = c.packageStatus[pkg.Name].TotalTime + c.Unlock() + done := make(chan any) + go func() { + startTime := time.Now() + localTicker := time.NewTicker(2 * time.Second) + defer localTicker.Stop() + for { + select { + case <-done: + return + case <-localTicker.C: + c.Lock() + progress := int(time.Since(startTime).Seconds() / eta.Seconds() * 100) + if progress >= 100 { + progress = 98 + } + c.packageStatus[pkg.Name].Progress = progress + c.Unlock() + } + } + }() + err = runCommand(ctx, pkg.ScriptPath, "upgrade", nil) + close(done) + c.Lock() + c.packageStatus[pkg.Name].Installing = false + c.packageStatus[pkg.Name].Progress = 100 + c.Unlock() + if err != nil { + log.Logger.Errorf("[package controller]: %v unexpected upgrade failure: %v", pkg.Name, err) + } + } + } +} + +func (c *PackageController) installRunner(ctx context.Context) { + ticker := time.NewTicker(c.syncPeriod) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ticker.Reset(c.syncPeriod) + } + for _, pkg := range c.packageStatus { + var skipCheck bool + for _, dep := range pkg.Dependency { + if _, ok := c.packageStatus[dep[0]]; !ok { + log.Logger.Infof("[package controller]: %v dependency %v not found, skipping", pkg.Name, dep[0]) + skipCheck = true + break + } + if !c.packageStatus[dep[0]].IsInstalled { + log.Logger.Infof("[package controller]: %v dependency %v not installed, skipping", pkg.Name, dep[0]) + skipCheck = true + break + } + if c.packageStatus[dep[0]].CurrentVersion == "" || c.packageStatus[dep[0]].CurrentVersion < dep[1] { + log.Logger.Infof("[package controller]: %v dependency %v version %v does not meet required %v, skipping", pkg.Name, dep[0], c.packageStatus[dep[0]].CurrentVersion, dep[1]) + skipCheck = true + break + } + } + if skipCheck { + continue + } + if pkg.Installing { + log.Logger.Infof("[package controller]: %v installing...", pkg.Name) + continue + } + // if installing, then skip + err := runCommand(ctx, pkg.ScriptPath, "isInstalled", nil) + if err == nil { + c.Lock() + c.packageStatus[pkg.Name].Progress = 100 + c.packageStatus[pkg.Name].IsInstalled = true + c.Unlock() + log.Logger.Infof("[package controller]: %v already installed", pkg.Name) + continue + } + log.Logger.Errorf("[package controller]: %v not installed, installing", pkg.Name) + go func() { + var eta time.Duration + c.Lock() + c.packageStatus[pkg.Name].Installing = true + c.packageStatus[pkg.Name].Progress = 0 + eta = c.packageStatus[pkg.Name].TotalTime + c.Unlock() + done := make(chan any) + go func() { + startTime := time.Now() + localTicker := time.NewTicker(2 * time.Second) + defer localTicker.Stop() + for { + select { + case <-done: + return + case <-localTicker.C: + progress := int(time.Since(startTime).Seconds() / eta.Seconds() * 100) + if progress >= 100 { + progress = 98 + } + c.Lock() + c.packageStatus[pkg.Name].Progress = progress + c.Unlock() + } + } + }() + err = runCommand(ctx, pkg.ScriptPath, "install", nil) + close(done) + if err != nil { + log.Logger.Errorf("[package controller]: %v unexpected install failure: %v", pkg.Name, err) + } else { + if err = runCommand(ctx, pkg.ScriptPath, "start", nil); err != nil { + log.Logger.Errorf("[package controller]: %v failed to start after installing: %v", pkg.Name, err) + } + } + c.Lock() + c.packageStatus[pkg.Name].Installing = false + c.packageStatus[pkg.Name].Progress = 100 + c.Unlock() + }() + } + } +} + +func (c *PackageController) statusRunner(ctx context.Context) { + ticker := time.NewTicker(c.syncPeriod) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ticker.Reset(c.syncPeriod) + } + for _, pkg := range c.packageStatus { + if !pkg.IsInstalled { + continue + } + err := runCommand(ctx, pkg.ScriptPath, "status", nil) + if err == nil { + c.Lock() + c.packageStatus[pkg.Name].Status = true + c.Unlock() + log.Logger.Infof("[package controller]: %v status ok", pkg.Name) + continue + } + log.Logger.Errorf("[package controller]: %v status not ok, restarting", pkg.Name) + if err = runCommand(ctx, pkg.ScriptPath, "stop", nil); err != nil { + log.Logger.Errorf("[package controller]: %v unexpected stop failure: %v", pkg.Name, err) + continue + } + if err = runCommand(ctx, pkg.ScriptPath, "start", nil); err != nil { + log.Logger.Errorf("[package controller]: %v unexpected start failure: %v", pkg.Name, err) + } + } + } +} + +func runCommand(ctx context.Context, script, arg string, result *string) error { + var ops []process.OpOption + if result == nil { + f, err := os.OpenFile(filepath.Join(filepath.Dir(script), arg+".log"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer f.Close() + ops = append(ops, process.WithOutputFile(f)) + } + + p, err := process.New(append(ops, process.WithCommand("bash", script, arg))...) + if err != nil { + return err + } + if result != nil { + go func() { + stdoutReader := p.StdoutReader() + if stdoutReader == nil { + log.Logger.Errorf("failed to read stdout: %v", err) + return + } + rawResult, err := io.ReadAll(p.StdoutReader()) + if err != nil { + log.Logger.Errorf("failed to read stout: %v", err) + return + } + *result = strings.TrimSpace(string(rawResult)) + }() + } + if err = p.Start(ctx); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-p.Wait(): + if err != nil { + return err + } + } + if err := p.Abort(ctx); err != nil { + return err + } + return nil +} diff --git a/manager/informer/file_informer.go b/manager/informer/file_informer.go new file mode 100644 index 00000000..3c80a475 --- /dev/null +++ b/manager/informer/file_informer.go @@ -0,0 +1,183 @@ +package informer + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/fsnotify/fsnotify" + + "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/manager/packages" +) + +type FileInformer struct { +} + +func NewFileInformer() chan packages.PackageInfo { + i := &FileInformer{} + return i.Start() +} + +func (f *FileInformer) Start() chan packages.PackageInfo { + c := make(chan packages.PackageInfo) + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Logger.Fatal(err) + } + + go func() { + defer func() { + if err = watcher.Close(); err != nil { + log.Logger.Error(err) + } + }() + out, err := exec.Command("ls", "/var/lib/gpud/packages").CombinedOutput() + if err == nil { + for _, pkgName := range strings.Split(string(out), "\n") { + if pkgName == "" { + continue + } + scriptPath := fmt.Sprintf("/var/lib/gpud/packages/%s/init.sh", pkgName) + version, dependencies, totalTime, err := resolvePackage(scriptPath) + if err != nil { + log.Logger.Errorf("resolve package failed: %v", err) + continue + } + + c <- packages.PackageInfo{ + Name: pkgName, + ScriptPath: scriptPath, + TargetVersion: version, + Dependency: dependencies, + TotalTime: totalTime, + } + } + } + for { + select { + case event, ok := <-watcher.Events: + if !ok { + continue + } + if event.Op&fsnotify.Create == fsnotify.Create { + fileInfo, err := os.Stat(event.Name) + if err == nil && fileInfo.IsDir() { + log.Logger.Infof("New directory created: %s", event.Name) + if aErr := addDirectory(watcher, event.Name); aErr != nil { + log.Logger.Error(aErr) + } + } + continue + } + if event.Op&fsnotify.Remove == fsnotify.Remove { + fileInfo, err := os.Stat(event.Name) + if os.IsNotExist(err) || (err == nil && fileInfo.IsDir()) { + log.Logger.Infof("Directory removed: %s", event.Name) + if rErr := watcher.Remove(event.Name); rErr != nil { + log.Logger.Error(rErr) + } + } + } + + if event.Op&fsnotify.Write != fsnotify.Write { + continue + } + path := event.Name + if !strings.HasPrefix(path, "/var/lib/gpud/packages") { + continue + } + elems := strings.Split(path, "/") + if len(elems) != 7 { + continue + } + fileName := elems[6] + if fileName != "init.sh" { + continue + } + + version, dependencies, totalTime, err := resolvePackage(path) + if err != nil { + log.Logger.Errorf("resolve package failed: %v", err) + continue + } + c <- packages.PackageInfo{ + Name: elems[5], + ScriptPath: path, + TargetVersion: version, + Dependency: dependencies, + TotalTime: totalTime, + } + case wErr, ok := <-watcher.Errors: + if !ok { + continue + } + log.Logger.Errorf("Error: %s", wErr) + } + } + }() + + rootDir := "/var/lib/gpud/" + err = addDirectory(watcher, rootDir) + if err != nil { + log.Logger.Error(err) + } + return c +} + +func addDirectory(watcher *fsnotify.Watcher, dir string) error { + return filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + log.Logger.Infof("Watching directory: %s", path) + if err := watcher.Add(path); err != nil { + return err + } + } + return nil + }) +} + +func resolvePackage(path string) (string, [][]string, time.Duration, error) { + var version string + var dependencies [][]string + var totalTime time.Duration + if _, err := exec.Command("stat", path).CombinedOutput(); err != nil { + return "", nil, 0, fmt.Errorf("stat failed: %v", err) + } + if out, err := exec.Command("bash", "-c", fmt.Sprintf("grep \"#GPUD_PACKAGE_VERSION\" %s | awk -F \"=\" '{print $2}'", path)).CombinedOutput(); err != nil { + return "", nil, 0, fmt.Errorf("get version failed: %v output: %s", err, out) + } else { + version = strings.TrimSpace(string(out)) + } + if out, err := exec.Command("bash", "-c", fmt.Sprintf("grep \"#GPUD_PACKAGE_DEPENDENCY\" %s | awk -F \"=\" '{print $2}'", path)).CombinedOutput(); err != nil { + return "", nil, 0, fmt.Errorf("get dependencies failed: %v output: %s", err, out) + } else { + dependencies = resolveDependencies(string(out)) + } + if out, err := exec.Command("bash", "-c", fmt.Sprintf("grep \"#GPUD_PACKAGE_INSTALL_TIME\" %s | awk -F \"=\" '{print $2}'", path)).CombinedOutput(); err != nil { + return "", nil, 0, fmt.Errorf("get dependencies failed: %v output: %s", err, out) + } else { + totalTime, _ = time.ParseDuration(strings.TrimSpace(string(out))) + } + return version, dependencies, totalTime, nil +} + +func resolveDependencies(raw string) [][]string { + raw = strings.TrimSpace(raw) + var dependencies [][]string + rawDependencies := strings.Split(raw, ",") + for _, rawDependency := range rawDependencies { + dependency := strings.Split(rawDependency, ":") + if len(dependency) != 2 { + continue + } + dependencies = append(dependencies, dependency) + } + return dependencies +} diff --git a/manager/informer/informer.go b/manager/informer/informer.go new file mode 100644 index 00000000..ef587fb6 --- /dev/null +++ b/manager/informer/informer.go @@ -0,0 +1,5 @@ +package informer + +type Informer interface { + Start(<-chan struct{}) +} diff --git a/manager/manager.go b/manager/manager.go new file mode 100644 index 00000000..9cb9b878 --- /dev/null +++ b/manager/manager.go @@ -0,0 +1,31 @@ +package manager + +import ( + "context" + + "github.com/leptonai/gpud/manager/controllers" + "github.com/leptonai/gpud/manager/informer" + "github.com/leptonai/gpud/manager/packages" +) + +type Manager struct { + packageController *controllers.PackageController +} + +var GlobalController *controllers.PackageController + +func New() (*Manager, error) { + return &Manager{}, nil +} + +func (a *Manager) Start(ctx context.Context) { + watcher := informer.NewFileInformer() + packageController := controllers.NewPackageController(watcher) + _ = packageController.Run(ctx) + a.packageController = packageController + GlobalController = packageController +} + +func (a *Manager) Status(ctx context.Context) ([]packages.PackageStatus, error) { + return a.packageController.Status(ctx) +} diff --git a/manager/packages/packages.go b/manager/packages/packages.go new file mode 100644 index 00000000..f60290b6 --- /dev/null +++ b/manager/packages/packages.go @@ -0,0 +1,32 @@ +package packages + +import "time" + +type PackageInfo struct { + Name string + ScriptPath string + TargetVersion string + Dependency [][]string + TotalTime time.Duration +} + +type PackageStatus struct { + Name string `json:"name"` + IsInstalled bool `json:"is_installed"` + Installing bool `json:"installing"` + Progress int `json:"progress"` + TotalTime time.Duration `json:"total_time"` + Status bool `json:"status"` + TargetVersion string `json:"target_version"` + CurrentVersion string `json:"current_version"` + ScriptPath string `json:"script_path"` + Dependency [][]string `json:"dependency"` +} + +type PackageStatuses []PackageStatus + +func (a PackageStatuses) Len() int { return len(a) } + +func (a PackageStatuses) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (a PackageStatuses) Less(i, j int) bool { return a[i].Name < a[j].Name } diff --git a/pkg/asn/asn.go b/pkg/asn/asn.go new file mode 100644 index 00000000..ddb7b259 --- /dev/null +++ b/pkg/asn/asn.go @@ -0,0 +1,41 @@ +package asn + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strings" +) + +type ASLookupResponse struct { + Asn string `json:"asn"` + AsnName string `json:"asn_name"` + AsnRange string `json:"asn_range"` + Country string `json:"country"` + IP string `json:"ip"` +} + +func GetASLookup(ip string) (*ASLookupResponse, error) { + url := fmt.Sprintf("https://api.hackertarget.com/aslookup/?q=%s&output=json", ip) + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var result ASLookupResponse + err = json.Unmarshal(body, &result) + if err != nil { + return nil, err + } + asnResults := strings.Split(result.AsnName, ",") + result.AsnName = strings.ToLower(strings.TrimSpace(asnResults[0])) + if len(asnResults) > 1 { + result.Country = strings.ToLower(strings.TrimSpace(asnResults[1])) + } + return &result, nil +} diff --git a/pkg/host/host.go b/pkg/host/host.go index d980b2ac..ed4e003b 100644 --- a/pkg/host/host.go +++ b/pkg/host/host.go @@ -8,6 +8,8 @@ import ( "fmt" "os/exec" "strings" + + "github.com/google/uuid" ) // Fetches the UUIF of the machine host, using the "dmidecode". @@ -26,7 +28,12 @@ func UUID(ctx context.Context) (string, error) { for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) if len(line) > 6 && line[:6] == "UUID: " { - return line[6:], nil + uid := line[6:] + if strings.Contains(uid, "Not Settable") { + generateUUID, _ := uuid.NewUUID() + uid = generateUUID.String() + } + return uid, nil } } diff --git a/update/update.go b/update/update.go index 217e87ae..cafb8b17 100644 --- a/update/update.go +++ b/update/update.go @@ -9,6 +9,8 @@ import ( "fmt" "io" "maps" + "net/http" + url2 "net/url" "os" "os/exec" "path" @@ -242,3 +244,71 @@ func update(ver, url string, requireRoot bool, useSystemd bool) error { return nil } + +func PackageUpdate(targetPackage, ver, baseUrl string) error { + dlDir, err := os.UserCacheDir() + if err != nil { + dlDir = os.TempDir() + } + if err = os.MkdirAll(dlDir, 0700); err != nil { + return err + } + dlPath := filepath.Join(dlDir, targetPackage+ver) + downloadUrl, err := url2.JoinPath(baseUrl, "packages", targetPackage, ver) + if err != nil { + return err + } + err = downloadFile(downloadUrl, dlPath) + if err != nil { + return err + } + defer os.Remove(dlPath) + + if err = copyFile(dlPath, fmt.Sprintf("/var/lib/gpud/packages/%s/init.sh", targetPackage)); err != nil { + return err + } + return nil +} + +func downloadFile(url, filepath string) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("bad status: %s", resp.Status) + } + + out, err := os.Create(filepath) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, resp.Body) + return err +} + +func copyFile(src, dst string) error { + sourceFile, err := os.Open(src) + if err != nil { + return err + } + defer sourceFile.Close() + + destinationFile, err := os.Create(dst) + if err != nil { + return err + } + defer destinationFile.Close() + + _, err = io.Copy(destinationFile, sourceFile) + if err != nil { + return err + } + + err = destinationFile.Sync() + return err +}