From 453910c0e7c23c9194ac818de8fc51d2977dd35a Mon Sep 17 00:00:00 2001 From: David Aronchick Date: Sun, 13 Oct 2024 10:02:14 -0700 Subject: [PATCH] supporting .pem files --- .cspell/custom-dictionary.txt | 1 + cmd/beta/azure/create_deployment.go | 37 ------ cmd/beta/gcp/create_deployment.go | 41 ------- .../interfaces/common/cluster_deployerer.go | 7 +- pkg/providers/azure/deploy_bacalhau_test.go | 114 ++++++++++++++++-- pkg/providers/azure/integration_test.go | 2 +- pkg/providers/common/cluster_deployer.go | 21 ++-- pkg/providers/common/cluster_deployer_test.go | 18 +++ pkg/providers/gcp/integration_test.go | 5 - pkg/sshutils/sshutils.go | 70 +++++++++++ 10 files changed, 213 insertions(+), 103 deletions(-) diff --git a/.cspell/custom-dictionary.txt b/.cspell/custom-dictionary.txt index 9672d27c..3de7c641 100644 --- a/.cspell/custom-dictionary.txt +++ b/.cspell/custom-dictionary.txt @@ -248,6 +248,7 @@ predeclared presignedurldisabled presignedurlexpiration privateip +privkey probeexec probehttp Progammer diff --git a/cmd/beta/azure/create_deployment.go b/cmd/beta/azure/create_deployment.go index a74fee77..25116fe3 100644 --- a/cmd/beta/azure/create_deployment.go +++ b/cmd/beta/azure/create_deployment.go @@ -195,10 +195,6 @@ func runDeployment(ctx context.Context, azureProvider *azure.AzureProvider) erro return err } - if err := provisionMachines(ctx, azureProvider, m); err != nil { - return err - } - if err := provisionBacalhauCluster(ctx, azureProvider, m); err != nil { return err } @@ -241,39 +237,6 @@ func createResources( return nil } -func provisionMachines( - ctx context.Context, - azureProvider *azure.AzureProvider, - m *display.DisplayModel, -) error { - var wg sync.WaitGroup - errChan := make(chan error, len(m.Deployment.Machines)) - - for _, machine := range m.Deployment.Machines { - wg.Add(1) - go func(machineName string) { - defer wg.Done() - err := azureProvider.GetClusterDeployer().ProvisionPackagesOnMachine(ctx, machineName) - if err != nil { - errChan <- fmt.Errorf("failed to provision machine %s: %w", machineName, err) - return - } - go updateMachineConfig(m.Deployment, machineName) - }(machine.GetName()) - } - - wg.Wait() - close(errChan) - - for err := range errChan { - if err != nil { - return err - } - } - - return nil -} - func provisionBacalhauCluster( ctx context.Context, azureProvider *azure.AzureProvider, diff --git a/cmd/beta/gcp/create_deployment.go b/cmd/beta/gcp/create_deployment.go index b34a8625..59a28607 100644 --- a/cmd/beta/gcp/create_deployment.go +++ b/cmd/beta/gcp/create_deployment.go @@ -3,7 +3,6 @@ package gcp import ( "context" "fmt" - "sync" "time" "github.com/bacalhau-project/andaime/pkg/display" @@ -186,10 +185,6 @@ func runDeployment(ctx context.Context, gcpProvider *gcp_provider.GCPProvider) e return err } - if err := provisionMachines(ctx, gcpProvider, m); err != nil { - return err - } - if err := provisionBacalhauCluster(ctx, gcpProvider, m); err != nil { return err } @@ -243,42 +238,6 @@ func createResources( return nil } -func provisionMachines( - ctx context.Context, - gcpProvider *gcp_provider.GCPProvider, - m *display.DisplayModel, -) error { - var wg sync.WaitGroup - errChan := make(chan error, len(m.Deployment.Machines)) - - for _, machine := range m.Deployment.Machines { - wg.Add(1) - go func(machineName string) { - defer wg.Done() - err := gcpProvider.GetClusterDeployer().ProvisionPackagesOnMachine(ctx, machineName) - if err != nil { - errChan <- fmt.Errorf("failed to provision machine %s: %w", machineName, err) - return - } - machine := m.Deployment.GetMachine(machineName) - machine.SetServiceState(models.ServiceTypeSSH.Name, models.ServiceStateSucceeded) - machine.SetServiceState(models.ServiceTypeDocker.Name, models.ServiceStateSucceeded) - updateMachineConfig(m.Deployment, machineName) - }(machine.GetName()) - } - - wg.Wait() - close(errChan) - - for err := range errChan { - if err != nil { - return err - } - } - - return nil -} - func provisionBacalhauCluster( ctx context.Context, gcpProvider *gcp_provider.GCPProvider, diff --git a/pkg/models/interfaces/common/cluster_deployerer.go b/pkg/models/interfaces/common/cluster_deployerer.go index f32679bb..2576ae44 100644 --- a/pkg/models/interfaces/common/cluster_deployerer.go +++ b/pkg/models/interfaces/common/cluster_deployerer.go @@ -8,14 +8,17 @@ import ( ) type ClusterDeployerer interface { + ProvisionMachine( + ctx context.Context, + sshConfig sshutils.SSHConfiger, + machine models.Machiner, + ) error WaitForAllMachinesToReachState( ctx context.Context, resourceType string, state models.MachineResourceState, ) error - // ProvisionAllMachinesWithPackages(ctx context.Context) error - ProvisionPackagesOnMachine(ctx context.Context, machineName string) error ExecuteCustomScript( ctx context.Context, sshConfig sshutils.SSHConfiger, diff --git a/pkg/providers/azure/deploy_bacalhau_test.go b/pkg/providers/azure/deploy_bacalhau_test.go index 0e392822..2a04e7f3 100644 --- a/pkg/providers/azure/deploy_bacalhau_test.go +++ b/pkg/providers/azure/deploy_bacalhau_test.go @@ -310,9 +310,27 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployBacalhauNode() { nodeType: "requester", sshBehavior: sshutils.ExpectedSSHBehavior{ PushFileExpectations: []sshutils.PushFileExpectation{ - {Dst: mock.Anything, Executable: true, Error: nil, Times: 3}, + {Dst: mock.Anything, Executable: true, Error: nil, Times: 6}, }, ExecuteCommandExpectations: []sshutils.ExecuteCommandExpectation{ + { + Cmd: "sudo /tmp/install-docker.sh", + Output: "", + Error: nil, + Times: 1, + }, + { + Cmd: "sudo docker run hello-world", + Output: "Hello from Docker!", + Error: nil, + Times: 1, + }, + { + Cmd: "sudo /tmp/install-core-packages.sh", + Output: "", + Error: nil, + Times: 1, + }, {Cmd: mock.Anything, Output: "", Error: nil, Times: 3}, { Cmd: "bacalhau node list --output json --api-host 0.0.0.0", @@ -367,7 +385,7 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployBacalhauNode() { NodeType: models.BacalhauNodeTypeOrchestrator, }, }, - expectedError: "failed to push node config metadata script", + expectedError: "push file error", }, { name: "Successful worker deployment", @@ -378,10 +396,28 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployBacalhauNode() { Dst: mock.Anything, Executable: true, Error: nil, - Times: 3, + Times: 5, }, }, ExecuteCommandExpectations: []sshutils.ExecuteCommandExpectation{ + { + Cmd: "sudo /tmp/install-docker.sh", + Output: "", + Error: nil, + Times: 2, + }, + { + Cmd: "sudo docker run hello-world", + Output: "Hello from Docker!", + Error: nil, + Times: 2, + }, + { + Cmd: "sudo /tmp/install-core-packages.sh", + Output: "", + Error: nil, + Times: 2, + }, {Cmd: mock.Anything, Output: "", Error: nil, Times: 3}, { Cmd: "bacalhau node list --output json --api-host 1.2.3.4", @@ -451,6 +487,14 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployOrchestrator() { nodeType := models.BacalhauNodeTypeOrchestrator expectedLines := map[string][]string{ + "install-docker.sh": { + "sudo apt-get update", + "sudo apt-get install -y docker.io", + }, + "install-core-packages.sh": { + "sudo apt-get update && \\", + "sudo apt-get install -y sudo", + }, "get-node-config-metadata.sh": { `cat << EOF > /etc/node-config`, fmt.Sprintf(`MACHINE_TYPE="%s"`, vmSize), @@ -484,14 +528,20 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployOrchestrator() { }) sshBehavior := sshutils.ExpectedSSHBehavior{ - PushFileExpectations: []sshutils.PushFileExpectation{}, // Not setting behaviors here because we want to get the script being pushed + PushFileExpectations: []sshutils.PushFileExpectation{}, // We aren't setting push files here - it's down below ExecuteCommandExpectations: []sshutils.ExecuteCommandExpectation{ + { + Cmd: "sudo docker run hello-world", + Output: "Hello from Docker!", + Error: nil, + Times: 1, + }, { Cmd: mock.Anything, ProgressCallback: mock.Anything, Output: "", Error: nil, - Times: 3, + Times: 5, }, { Cmd: fmt.Sprintf( @@ -525,6 +575,18 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployOrchestrator() { s.mockSSHConfig.On("WaitForSSH", s.ctx, mock.Anything, mock.Anything).Return(nil).Once() renderedScripts := make(map[string][]byte) + s.mockSSHConfig.On("PushFile", s.ctx, "/tmp/install-docker.sh", mock.Anything, true, mock.Anything). + Run(func(args mock.Arguments) { + renderedScripts["install-docker.sh"] = args.Get(2).([]byte) + }). + Return(nil). + Once() + s.mockSSHConfig.On("PushFile", s.ctx, "/tmp/install-core-packages.sh", mock.Anything, true, mock.Anything). + Run(func(args mock.Arguments) { + renderedScripts["install-core-packages.sh"] = args.Get(2).([]byte) + }). + Return(nil). + Once() s.mockSSHConfig.On("PushFile", s.ctx, "/tmp/get-node-config-metadata.sh", mock.Anything, true, mock.Anything). Run(func(args mock.Arguments) { renderedScripts["get-node-config-metadata.sh"] = args.Get(2).([]byte) @@ -544,6 +606,10 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployOrchestrator() { Return(nil). Once() + sshutils.NewSSHConfigFunc = func(host string, port int, user string, sshPrivateKeyPath string) (sshutils.SSHConfiger, error) { + return s.mockSSHConfig, nil + } + err := s.deployer.ProvisionOrchestrator(s.ctx, "orch") s.NoError(err) @@ -554,6 +620,7 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployOrchestrator() { s.NotEmpty(s.deployment.OrchestratorIP) filesToTest := map[string]string{ + "install-docker.sh": fileToTestInstallDocker, "get-node-config-metadata.sh": fmt.Sprintf(fileToTestMetadata, location, ip, orchestrators), "install-bacalhau.sh": fileToTestInstall, "install-run-bacalhau.sh": fileToTestServe, @@ -568,23 +635,46 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployOrchestrator() { func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployWorkers() { s.SetupTest() machines := map[string]models.Machiner{ - "orch": &models.Machine{Name: "orch", Orchestrator: true, PublicIP: "1.2.3.4"}, + "orch": &models.Machine{ + Name: "orch", + Orchestrator: true, + PublicIP: "1.2.3.4", + NodeType: models.BacalhauNodeTypeOrchestrator, + }, "worker1": &models.Machine{ Name: "worker1", Orchestrator: false, PublicIP: "2.3.4.5", OrchestratorIP: "1.2.3.4", + NodeType: models.BacalhauNodeTypeCompute, }, "worker2": &models.Machine{ Name: "worker2", Orchestrator: false, PublicIP: "3.4.5.6", OrchestratorIP: "1.2.3.4", + NodeType: models.BacalhauNodeTypeCompute, }, } sshBehavior := sshutils.ExpectedSSHBehavior{ PushFileExpectations: []sshutils.PushFileExpectation{ + { + Dst: "/tmp/install-docker.sh", + FileContents: []byte(""), + Executable: true, + ProgressCallback: mock.Anything, + Error: nil, + Times: 3, + }, + { + Dst: "/tmp/install-core-packages.sh", + FileContents: []byte(""), + Executable: true, + ProgressCallback: mock.Anything, + Error: nil, + Times: 3, + }, { Dst: "/tmp/get-node-config-metadata.sh", FileContents: []byte(""), @@ -611,6 +701,13 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployWorkers() { }, }, ExecuteCommandExpectations: []sshutils.ExecuteCommandExpectation{ + { + Cmd: "sudo docker run hello-world", + ProgressCallback: mock.Anything, + Output: "Hello from Docker!", + Error: nil, + Times: 3, + }, { Cmd: "bacalhau node list --output json --api-host 1.2.3.4", ProgressCallback: mock.Anything, @@ -636,7 +733,7 @@ func (s *PkgProvidersAzureDeployBacalhauTestSuite) TestDeployWorkers() { ProgressCallback: mock.Anything, Output: "", Error: nil, - Times: 9, + Times: 15, }, }, InstallSystemdServiceExpectation: &sshutils.Expectation{ @@ -688,6 +785,9 @@ TOKEN="" NODE_TYPE="requester" ` +const fileToTestInstallDocker = `sudo apt-get update +sudo apt-get install -y docker.io` + const fileToTestInstall = `sudo curl -sSL https://get.bacalhau.org/install.sh?dl="${BACALHAU_INSTALL_ID}" | sudo bash` const fileToTestServe = `/usr/local/bin/bacalhau serve \` diff --git a/pkg/providers/azure/integration_test.go b/pkg/providers/azure/integration_test.go index 9dc8d9e1..f4b0c474 100644 --- a/pkg/providers/azure/integration_test.go +++ b/pkg/providers/azure/integration_test.go @@ -355,7 +355,7 @@ func (s *PkgProvidersAzureIntegrationTest) TestProvisionResourcesSuccess() { s.provider.SetClusterDeployer(s.clusterDeployer) m := display.GetGlobalModelFunc() for _, machine := range m.Deployment.Machines { - err := s.provider.GetClusterDeployer().ProvisionPackagesOnMachine(ctx, machine.GetName()) + err := s.provider.GetClusterDeployer().ProvisionMachine(ctx, s.mockSSHConfig, machine) s.Require().NoError(err) } diff --git a/pkg/providers/common/cluster_deployer.go b/pkg/providers/common/cluster_deployer.go index fb5d2eb4..cdacf077 100644 --- a/pkg/providers/common/cluster_deployer.go +++ b/pkg/providers/common/cluster_deployer.go @@ -250,6 +250,10 @@ func (cd *ClusterDeployer) ProvisionBacalhauNode( return cd.HandleDeploymentError(ctx, machine, fmt.Errorf("no orchestrator IP found")) } + if err := cd.ProvisionMachine(ctx, sshConfig, machine); err != nil { + return err + } + if err := cd.SetupNodeConfigMetadata(ctx, machine, sshConfig); err != nil { return cd.HandleDeploymentError(ctx, machine, err) } @@ -644,30 +648,27 @@ func (cd *ClusterDeployer) HandleDeploymentError( return err } -func (cd *ClusterDeployer) ProvisionPackagesOnMachine( +func (cd *ClusterDeployer) ProvisionMachine( ctx context.Context, - machineName string, + sshConfig sshutils.SSHConfiger, + machine models.Machiner, ) error { m := display.GetGlobalModelFunc() if m == nil { return fmt.Errorf("global display model is not initialized") } - mach := m.Deployment.GetMachine(machineName) - if mach == nil { - return fmt.Errorf("machine %s not found", machineName) - } status := models.NewDisplayStatusWithText( - machineName, + machine.GetName(), models.AzureResourceTypeVM, models.ResourceStatePending, "Provisioning Docker & packages on machine", ) m.UpdateStatus(status) - err := mach.InstallDockerAndCorePackages(ctx) + err := machine.InstallDockerAndCorePackages(ctx) if err != nil { m.UpdateStatus(models.NewDisplayStatusWithText( - machineName, + machine.GetName(), models.AzureResourceTypeVM, models.ResourceStateFailed, fmt.Sprintf("Failed to provision Docker & packages on machine: %v", err), @@ -675,7 +676,7 @@ func (cd *ClusterDeployer) ProvisionPackagesOnMachine( return err } m.UpdateStatus(models.NewDisplayStatusWithText( - machineName, + machine.GetName(), models.AzureResourceTypeVM, models.ResourceStateSucceeded, "Provisioned Docker & packages on machine", diff --git a/pkg/providers/common/cluster_deployer_test.go b/pkg/providers/common/cluster_deployer_test.go index 0b5b702c..4e03d050 100644 --- a/pkg/providers/common/cluster_deployer_test.go +++ b/pkg/providers/common/cluster_deployer_test.go @@ -135,6 +135,24 @@ func (s *PkgProvidersCommonClusterDeployerTestSuite) TestProvisionBacalhauCluste {Dst: mock.Anything, Executable: true, Error: nil, Times: 3}, }, ExecuteCommandExpectations: []sshutils.ExecuteCommandExpectation{ + { + Cmd: "sudo /tmp/install-docker.sh", + Output: "", + Error: nil, + Times: 2, + }, + { + Cmd: "sudo docker run hello-world", + Output: "Hello from Docker!", + Error: nil, + Times: 2, + }, + { + Cmd: "sudo /tmp/install-core-packages.sh", + Output: "", + Error: nil, + Times: 2, + }, { Cmd: "sudo /tmp/get-node-config-metadata.sh", Output: "", diff --git a/pkg/providers/gcp/integration_test.go b/pkg/providers/gcp/integration_test.go index 925d5bc5..4e950cf7 100644 --- a/pkg/providers/gcp/integration_test.go +++ b/pkg/providers/gcp/integration_test.go @@ -347,11 +347,6 @@ func (s *PkgProvidersGCPIntegrationTest) TestProvisionResourcesSuccess() { s.provider.SetClusterDeployer(s.clusterDeployer) m := display.GetGlobalModelFunc() - for _, machine := range m.Deployment.Machines { - err := s.provider.GetClusterDeployer().ProvisionPackagesOnMachine(ctx, machine.GetName()) - s.Require().NoError(err) - } - err = s.provider.GetClusterDeployer().ProvisionOrchestrator(ctx, "orchestrator") s.Require().NoError(err) diff --git a/pkg/sshutils/sshutils.go b/pkg/sshutils/sshutils.go index a35e1945..6da773c5 100644 --- a/pkg/sshutils/sshutils.go +++ b/pkg/sshutils/sshutils.go @@ -2,6 +2,11 @@ package sshutils import ( "bufio" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/pem" + "errors" "fmt" "io" "os" @@ -220,3 +225,68 @@ func ReadPublicKey(path string) ([]byte, error) { return publicKeyBytes, nil } + +func GenerateRsaKeyPair() (*rsa.PrivateKey, *rsa.PublicKey) { + privkey, _ := rsa.GenerateKey(rand.Reader, 4096) + return privkey, &privkey.PublicKey +} + +func ExportRsaPrivateKeyAsPemStr(privkey *rsa.PrivateKey) string { + privkeyBytes := x509.MarshalPKCS1PrivateKey(privkey) + privkeyPem := pem.EncodeToMemory( + &pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: privkeyBytes, + }, + ) + return string(privkeyPem) +} + +func ParseRsaPrivateKeyFromPemStr(privPEM string) (*rsa.PrivateKey, error) { + block, _ := pem.Decode([]byte(privPEM)) + if block == nil { + return nil, errors.New("failed to parse PEM block containing the key") + } + + priv, err := x509.ParsePKCS1PrivateKey(block.Bytes) + if err != nil { + return nil, err + } + + return priv, nil +} + +func ExportRsaPublicKeyAsPemStr(pubkey *rsa.PublicKey) (string, error) { + pubkeyBytes, err := x509.MarshalPKIXPublicKey(pubkey) + if err != nil { + return "", err + } + pubkeyPem := pem.EncodeToMemory( + &pem.Block{ + Type: "RSA PUBLIC KEY", + Bytes: pubkeyBytes, + }, + ) + + return string(pubkeyPem), nil +} + +func ParseRsaPublicKeyFromPemStr(pubPEM string) (*rsa.PublicKey, error) { + block, _ := pem.Decode([]byte(pubPEM)) + if block == nil { + return nil, errors.New("failed to parse PEM block containing the key") + } + + pub, err := x509.ParsePKIXPublicKey(block.Bytes) + if err != nil { + return nil, err + } + + switch pub := pub.(type) { + case *rsa.PublicKey: + return pub, nil + default: + break // fall through + } + return nil, errors.New("key type is not RSA") +}