diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 571975de7206..5f69848c42e3 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -163,6 +163,8 @@ go_library( "ruby_pg_blocklist.go", "rust_postgres.go", "rust_postgres_blocklist.go", + "s3_clone_backup_restore.go", + "s3_microceph.go", "schemachange.go", "schemachange_random_load.go", "scrub.go", diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index f20139267a56..8a16d6aac755 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -23,6 +23,7 @@ func RegisterTests(r registry.Registry) { registerBackupNodeShutdown(r) registerBackupRestoreRoundTrip(r) registerBackupFixtures(r) + registerBackupS3Clones(r) registerCDC(r) registerCDCBench(r) registerCDCFiltering(r) diff --git a/pkg/cmd/roachtest/tests/s3_clone_backup_restore.go b/pkg/cmd/roachtest/tests/s3_clone_backup_restore.go new file mode 100644 index 000000000000..4e64ad1e55c7 --- /dev/null +++ b/pkg/cmd/roachtest/tests/s3_clone_backup_restore.go @@ -0,0 +1,239 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package tests + +import ( + "context" + gosql "database/sql" + "fmt" + "math/rand" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/errors" +) + +// registerBackupS3Clones validates backup/restore compatibility with S3 clones. +func registerBackupS3Clones(r registry.Registry) { + // Running against a microceph cluster deployed on a GCE instance. + for _, cephVersion := range []string{"reef", "squid"} { + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("backup/ceph/%s", cephVersion), + Owner: registry.OwnerFieldEng, + Cluster: r.MakeClusterSpec(4, spec.WorkloadNodeCount(1)), + EncryptionSupport: registry.EncryptionMetamorphic, + Leases: registry.MetamorphicLeases, + CompatibleClouds: registry.Clouds(spec.GCE), + Suites: registry.Suites(registry.Nightly), + TestSelectionOptOutSuites: registry.Suites(registry.Nightly), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + v := s3BackupRestoreValidator{ + t: t, + c: c, + crdbNodes: c.CRDBNodes(), + csvPort: 8081, + importNode: c.Node(1), + rows: 1000, + workloadNode: c.WorkloadNode(), + } + v.startCluster(ctx) + ceph := cephManager{ + t: t, + c: c, + bucket: backupTestingBucket, + // For now, we use the workload node as the cephNode + cephNodes: c.Node(c.Spec().NodeCount), + key: randomString(32), + secret: randomString(64), + // reef `microceph enable rgw` does not support `--ssl-certificate` + // so we'll test a non-secure version. + secure: cephVersion != "reef", + version: cephVersion, + } + ceph.install(ctx) + v.validateBackupRestore(ctx, ceph) + }, + }) + } +} + +// s3Provider defines the methods that the S3 object store has to provide +// in order to run the backup/restore validation tests. +type s3Provider interface { + // getBackupURI returns the storage specific destination URI + getBackupURI(ctx context.Context, dest string) (string, error) +} + +// s3BackupRestoreValidator verifies backup/restore functionality against +// an S3 vendor. +type s3BackupRestoreValidator struct { + t test.Test + c cluster.Cluster + crdbNodes option.NodeListOption + csvPort int + importNode option.NodeListOption + rows int + workloadNode option.NodeListOption +} + +// checkBackups verifies that there is exactly one full and one incremental backup. +func (v *s3BackupRestoreValidator) checkBackups(ctx context.Context, conn *gosql.DB) { + backups := conn.QueryRowContext(ctx, "SHOW BACKUPS IN 'external://backup_bucket'") + var path string + if err := backups.Scan(&path); err != nil { + v.t.Fatal(err) + } + + rows, err := conn.QueryContext(ctx, + "SELECT backup_type from [SHOW BACKUP $1 IN 'external://backup_bucket'] WHERE object_type='table'", + path) + + if err != nil { + v.t.Fatal(err) + } + var foundFull, foundIncr bool + var rowCount int + for rows.Next() { + var backupType string + if err := rows.Scan(&backupType); err != nil { + v.t.Fatal(err) + } + if backupType == "full" { + foundFull = true + } + if backupType == "incremental" { + foundIncr = true + } + rowCount++ + } + if !foundFull { + v.t.Fatal(errors.Errorf("full backup not found")) + } + if !foundIncr { + v.t.Fatal(errors.Errorf("incremental backup not found")) + } + if rowCount > 2 { + v.t.Fatal(errors.Errorf("found more than 2 backups")) + } + +} + +// runImportForS3CloneTesting import the data used to test the S3 clone backup/restore +// functionality. +func (v *s3BackupRestoreValidator) importData(ctx context.Context) { + csvCmd := importBankCSVServerCommand("./cockroach", v.csvPort) + v.c.Run(ctx, option.WithNodes(v.crdbNodes), csvCmd+` &> logs/workload-csv-server.log < /dev/null &`) + if err := waitForPort(ctx, v.t.L(), v.crdbNodes, v.csvPort, v.c); err != nil { + v.t.Fatal(err) + } + v.c.Run(ctx, option.WithNodes(v.importNode), + importBankCommand("./cockroach", v.rows, 0, v.csvPort, v.importNode[0])) +} + +// startCluster starts the Cockroach cluster. +func (v *s3BackupRestoreValidator) startCluster(ctx context.Context) { + settings := install.MakeClusterSettings() + settings.Secure = true + v.c.Start(ctx, v.t.L(), option.NewStartOpts(option.NoBackupSchedule), settings, v.crdbNodes) +} + +// validateS3BackupRestore performs a backup/restore against a storage provider +// to asses minimum compatibility at the functional level. +// This does not imply that a storage provider passing the test is supported. +func (v *s3BackupRestoreValidator) validateBackupRestore(ctx context.Context, s s3Provider) { + dest := destinationName(v.c) + v.importData(ctx) + + var backupPath string + var err error + if backupPath, err = s.getBackupURI(ctx, dest); err != nil { + v.t.Fatal(err) + } + conn := v.c.Conn(ctx, v.t.L(), 1) + defer conn.Close() + + if _, err := conn.ExecContext(ctx, + fmt.Sprintf("CREATE EXTERNAL CONNECTION backup_bucket AS '%s'", + backupPath)); err != nil { + v.t.Fatal(err) + } + + // Run a full backup while running the workload + m := v.c.NewMonitor(ctx, v.c.CRDBNodes()) + m.Go(func(ctx context.Context) error { + v.t.Status(`running backup `) + _, err := conn.ExecContext(ctx, + "BACKUP bank.bank INTO 'external://backup_bucket'") + return err + }) + m.Go(func(ctx context.Context) error { + v.t.Status(`running workload`) + return v.runWorload(ctx, 10*time.Second) + }) + m.Wait() + + // Run an incremental backup + v.t.Status(`running incremental backup `) + if _, err := conn.ExecContext(ctx, + "BACKUP bank.bank INTO LATEST IN 'external://backup_bucket'"); err != nil { + v.t.Fatal(err) + } + + // Verify that we have the backups, then restore in a separate database. + v.checkBackups(ctx, conn) + v.t.Status(`restoring from backup`) + if _, err := conn.ExecContext(ctx, "CREATE DATABASE restoreDB"); err != nil { + v.t.Fatal(err) + } + + if _, err := conn.ExecContext(ctx, + `RESTORE bank.bank FROM LATEST IN 'external://backup_bucket' WITH into_db=restoreDB`); err != nil { + v.t.Fatal(err) + } + + // Check that the content of the original database and the restored database + // are the same. + table := "bank" + originalBank, err := fingerprint(ctx, conn, "bank" /* db */, table) + if err != nil { + v.t.Fatal(err) + } + restore, err := fingerprint(ctx, conn, "restoreDB" /* db */, table) + if err != nil { + v.t.Fatal(err) + } + + if originalBank != restore { + v.t.Fatal(errors.Errorf("got %s, expected %s while comparing restoreDB with originalBank", + restore, originalBank)) + } +} + +// runWorload runs the bank workload for the specified duration. +func (v *s3BackupRestoreValidator) runWorload(ctx context.Context, duration time.Duration) error { + cmd := roachtestutil. + NewCommand("./cockroach workload run bank"). + Arg("{pgurl%s}", v.crdbNodes). + Flag("duration", duration.String()). + String() + return v.c.RunE(ctx, option.WithNodes(v.workloadNode), cmd) +} + +// randomString returns a random string with the given size. +func randomString(size int) string { + var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + b := make([]rune, size) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} diff --git a/pkg/cmd/roachtest/tests/s3_microceph.go b/pkg/cmd/roachtest/tests/s3_microceph.go new file mode 100644 index 000000000000..6f9850fb2f8b --- /dev/null +++ b/pkg/cmd/roachtest/tests/s3_microceph.go @@ -0,0 +1,170 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package tests + +import ( + "context" + "fmt" + "net/url" + "os" + "path" + "path/filepath" + + "github.com/cockroachdb/cockroach/pkg/cloud/amazon" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" +) + +// cephDisksScript creates 3 4GB loop devices, e.g. virtual block devices that allows +// a computer file to be accessed as if it were a physical disk or partition. +// These loop devices will be used by the Ceph Object Storage Daemon (OSD) as +// disks. +const cephDisksScript = ` +#!/bin/bash +for l in a b c; do + loop_file="$(sudo mktemp -p /mnt/data1 XXXX.img)" + sudo truncate -s 4G "${loop_file}" + loop_dev="$(sudo losetup --show -f "${loop_file}")" + # the block-devices plug doesn't allow accessing /dev/loopX + # devices so we make those same devices available under alternate + # names (/dev/sdiY) + minor="${loop_dev##/dev/loop}" + sudo mknod -m 0660 "/dev/sdi${l}" b 7 "${minor}" + sudo microceph disk add --wipe "/dev/sdi${l}" +done` + +const s3cmdSsl = `sudo s3cmd --host localhost --host-bucket="localhost/%%(bucket)" \ + --access_key=%s --secret_key=%s --ca-certs=./certs/ca.crt %s` + +const s3cmdNoSsl = `sudo s3cmd --host localhost --host-bucket="localhost/%%(bucket)" \ + --access_key=%s --secret_key=%s --no-ssl %s` + +// cephManager manages a single node microCeph cluster, used to +// validate the backup and restore functionality. +type cephManager struct { + t test.Test + c cluster.Cluster + bucket string + cephNodes option.NodeListOption // The nodes within the cluster used by Ceph. + key string + secret string + secure bool + version string +} + +// cephManager implements s3Provider +var _ s3Provider = &cephManager{} + +// getBackupURI implements s3Provider. +func (m cephManager) getBackupURI(ctx context.Context, dest string) (string, error) { + addr, err := m.c.InternalIP(ctx, m.t.L(), m.cephNodes) + if err != nil { + return "", err + } + m.t.Status("cephNode: ", addr) + endpointURL := `http://` + addr[0] + if m.secure { + endpointURL = `https://` + addr[0] + } + q := make(url.Values) + q.Add(amazon.AWSAccessKeyParam, m.key) + q.Add(amazon.AWSSecretParam, m.secret) + q.Add(amazon.AWSUsePathStyle, "true") + // Region is required in the URL, but not used in Ceph. + q.Add(amazon.S3RegionParam, "dummy") + q.Add(amazon.AWSEndpointParam, endpointURL) + uri := fmt.Sprintf("s3://%s/%s?%s", m.bucket, dest, q.Encode()) + return uri, nil +} + +// install a single node microCeph cluster. +// See https://canonical-microceph.readthedocs-hosted.com/en/squid-stable/how-to/single-node/ +// It is fatal on errors. +func (m cephManager) install(ctx context.Context) { + tmpDir := "/tmp/" + m.run(ctx, `installing microceph`, + fmt.Sprintf(`sudo snap install microceph --channel %s/stable`, m.version)) + m.run(ctx, `preventing upgrades`, `sudo snap refresh --hold microceph`) + m.run(ctx, `initialize microceph`, `sudo microceph cluster bootstrap`) + + cephDisksScriptPath := filepath.Join(tmpDir, "cephDisks.sh") + m.put(ctx, cephDisksScript, cephDisksScriptPath) + m.run(ctx, "adding disks", cephDisksScriptPath, tmpDir) + + // Start the Ceph Object Gateway, also known as RADOS Gateway (RGW). RGW is + // an object storage interface to provide applications with a RESTful + // gateway to Ceph storage clusters, compatible with the S3 APIs. + // We are leveraging the node certificates created by cockroach. + rgwCmd := "sudo microceph enable rgw " + if m.secure { + rgwCmd = rgwCmd + ` --ssl-certificate="$(base64 -w0 certs/node.crt)" --ssl-private-key="$(base64 -w0 certs/node.key)"` + } + m.run(ctx, `starting object gateway`, rgwCmd) + + m.run(ctx, `creating backup user`, + `sudo radosgw-admin user create --uid=backup --display-name=backup`) + m.run(ctx, `add keys to the user`, + fmt.Sprintf(`sudo radosgw-admin key create --uid=backup --key-type=s3 --access-key=%s --secret-key=%s`, + m.key, m.secret)) + + m.run(ctx, `install s3cmd`, `sudo apt install -y s3cmd`) + s3cmd := s3cmdNoSsl + if m.secure { + s3cmd = s3cmdSsl + } + m.run(ctx, `creating bucket`, + fmt.Sprintf(s3cmd, m.key, m.secret, "mb s3://"+m.bucket)) + if err := m.maybeInstallCa(ctx); err != nil { + m.t.Fatal(err) + } +} + +// maybeInstallCa adds a custom ca in the CockroachDB cluster. +func (m cephManager) maybeInstallCa(ctx context.Context) error { + if !m.secure { + return nil + } + localCertsDir, err := os.MkdirTemp("", "roachtest-certs") + if err != nil { + return err + } + // get the ca file from one of the nodes. + caFile := path.Join(localCertsDir, "ca.crt") + conn := m.c.Conn(ctx, m.t.L(), 1) + defer conn.Close() + if err := m.c.Get(ctx, m.t.L(), "certs/ca.crt", caFile, m.c.Node(1)); err != nil { + return err + } + caCert, err := os.ReadFile(caFile) + if err != nil { + return err + } + // Disabling caching for Custom CA, see https://github.com/cockroachdb/cockroach/issues/125051. + if _, err := conn.ExecContext(ctx, "set cluster setting cloudstorage.s3.session_reuse.enabled = false"); err != nil { + return err + } + if _, err := conn.ExecContext(ctx, "set cluster setting cloudstorage.http.custom_ca=$1", caCert); err != nil { + return err + } + return nil +} + +// put creates a file in the ceph node with the given content. +func (m cephManager) put(ctx context.Context, content string, dest string) { + err := m.c.PutString(ctx, content, dest, 0700, m.cephNodes) + if err != nil { + m.t.Fatal(err) + } +} + +// run the given command on the ceph node. +func (m cephManager) run(ctx context.Context, msg string, cmd ...string) { + m.t.Status(msg, "...") + m.t.Status(cmd) + m.c.Run(ctx, option.WithNodes(m.cephNodes), cmd...) + m.t.Status(msg, " done") +}