Skip to content

Commit

Permalink
godev: simplify local development of the worker
Browse files Browse the repository at this point in the history
To make it easier to develop the worker, add a new
./godev/devtools/cmd/copyuploads command to copy uploaded reports from a
GCS bucket to the local filesystem. After running this script, running
./godev/cmd/worker with no arguments will have access to recent uploaded
reports in the local filesystem environment.

Also update the README for local development, and update the default
config to set the GCP ProjectID (without this setting, I assume it was
using whatever was returned by gcloud config get project).

Finally, remove the unused Config.StorageEmulatorHost. This had confused
me due to the overlap with the STORAGE_EMULATOR_HOST environment
variable read by the cloud.google.com/storage package.

Change-Id: Ia0565f67fa763ac3d74aba0631a92e8f21b506f8
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/615776
Reviewed-by: Hyang-Ah Hana Kim <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
  • Loading branch information
findleyr committed Sep 27, 2024
1 parent 1967543 commit e9e6960
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 29 deletions.
2 changes: 2 additions & 0 deletions godev/cmd/telemetrygodev/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func TestPaths(t *testing.T) {
ctx := context.Background()
cfg := config.NewConfig()
cfg.LocalStorage = t.TempDir()
cfg.ProjectID = "" // defensive: don't use a real project ID for tests.

// NewConfig assumes that the command is run from the repo root, but tests
// run from their test directory. We should fix this, but for now just
// fix up the config path.
Expand Down
54 changes: 47 additions & 7 deletions godev/cmd/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ dev environment with data.
Similar to the /chart endpoint, /copy also supports the following query
parameters:

- `/?date=<YYYY-MM-DD>``: Copies reports for a specific date.
- `/?start=<YYYY-MM-DD>&end=<YYYY-MM-DD>``: Copies reports within a specified date range.
- `/copy/?date=<YYYY-MM-DD>`: Copies reports for a specific date.
- `/copy/?start=<YYYY-MM-DD>&end=<YYYY-MM-DD>`: Copies reports within a
specified date range.

### `/queue-tasks`

Expand All @@ -51,17 +52,56 @@ triggers the following actions:

## Local Development

For local development, simply build and run. It serves on localhost:8082.
The preferred method of local develoment is to simply build and run the worker
binary. Use PORT= to customize the default hosting port.

go run ./godev/cmd/worker

By default, the server will use the filesystem for storage object I/O. Use the
-gcs flag to use the Cloud Storage API.
By default, the server will use the filesystem for storage object I/O (see
[`GO_TELEMETRY_LOCAL_STORAGE`](#environment-variables)). Unless you have also
uploaded reports through a local instance of the telemetry frontend, this local
storage will be empty. To copy uploads from GCS to the local environment, run:

go run ./godev/devtools/cmd/copyuploads -v

Note that this command requires read permission to our GCS buckets.

So, this is a complete end-to-end test of the merge endpoint:

1. First, copy data with:

```
go run ./godev/devtools/cmd/copyuploads -v
```

2. Then, run the worker:

```
go run ./godev/cmd/worker
```

3. Finally, in a separate terminal, trigger the merge operation:

```
curl http://localhost:8082/merge/?date=2024-09-26
```

After doing this, you should see the resulting merged reports in the
`./localstorage/local-telemetry-merged` directory.

Note: the `/queue-tasks/` endpoint does not currently work locally: by default
it tries to enqueue tasks in the associated GCP project, which will fail unless
you have escalated permissions on GCP.

### Local development using GCS

Alternatively, you can use the -gcs flag to use the Cloud Storage API:

go run ./godev/cmd/worker --gcs

Optionally, use the localstorage devtool the emulate the GCS server on your
machine.
However, the above command requires write permissions to our public GCS buckets,
which one should in general not request. Instead, use the localstorage devtool
the emulate the GCS server on your machine.

./godev/devtools/localstorage.sh
STORAGE_EMULATOR_HOST=localhost:8081 go run ./godev/cmd/worker --gcs
Expand Down
2 changes: 2 additions & 0 deletions godev/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func handleCopy(cfg *config.Config, dest *storage.API) content.HandlerFunc {
// - Weekly chart: encompasses 7 days of data, concluding on the specified date.
// TODO(golang/go#62575): adjust the date range to align with report
// upload cutoff.
//
// TODO(rfindley): use a local task queue when not run with -gcs.
func handleTasks(cfg *config.Config) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
now := time.Now().UTC()
Expand Down
99 changes: 99 additions & 0 deletions godev/devtools/cmd/copyuploads/copyuploads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// The copyuploads command copies uploads from GCS to the local filesystem
// storage, for use with local development of the worker.
//
// By default, this command copies the last 3 days of uploads from the
// dev-telemetry-uploaded bucket in GCS to the local filesystem bucket
// local-telemetry-uploaded, at which point this data will be available when
// running ./godev/cmd/worker with no arguments.
//
// This command requires read permission to the go-telemetry GCS buckets.
// TODO(rfindley): we could avoid the need for read permission by instead
// downloading the public merged reports, and reassembling the individual
// uploads.
//
// See --help for more details.
package main

import (
"context"
"errors"
"flag"
"log"
"os"
"strings"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/telemetry/godev/internal/config"
"golang.org/x/telemetry/godev/internal/storage"
)

var (
bucket = flag.String("bucket", "dev-telemetry-uploaded", "The bucket to copy from.")
daysBack = flag.Int("days_back", 3, "The number of days back to copy")
verbose = flag.Bool("v", false, "If set, enable verbose logging.")
)

func main() {
flag.Parse()

if !strings.HasSuffix(*bucket, "-uploaded") {
log.Fatal("-bucket must end in -uploaded")
}

cfg := config.NewConfig()
ctx := context.Background()

gcs, err := storage.NewGCSBucket(ctx, cfg.ProjectID, *bucket)
if err != nil {
log.Fatal(err)
}
fs, err := storage.NewFSBucket(ctx, cfg.LocalStorage, "local-telemetry-uploaded")
if err != nil {
log.Fatal(err)
}

// Copy files concurrently.
const concurrency = 5
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)

start := time.Now()
for dayOffset := range *daysBack {
date := start.AddDate(0, 0, -dayOffset)
it := gcs.Objects(ctx, date.Format(time.DateOnly))
for {
name, err := it.Next()
if errors.Is(err, storage.ErrObjectIteratorDone) {
break
}

// Skip objects that already exist in local storage.
dest := fs.Object(name)
if _, err := os.Stat(dest.(*storage.FSObject).Filename()); err == nil {
if *verbose {
log.Printf("Skipping existing object %s", name)
}
continue
}
if *verbose {
log.Printf("Starting copying object %s", name)
}

g.Go(func() error {
if err != nil {
return err
}
return storage.Copy(ctx, dest, gcs.Object(name))
})
}
}

if err := g.Wait(); err != nil {
log.Fatal(err)
}
}
40 changes: 18 additions & 22 deletions godev/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ type Config struct {
// ClientID is the OAuth client used in authentication for queue tasks.
ClientID string

// StorageEmulatorHost is a network address for a Cloud Storage emulator.
StorageEmulatorHost string

// LocalStorage is a directory for storage I/O used when the using the filesystem
// or storage emulator modes.
LocalStorage string
Expand Down Expand Up @@ -85,25 +82,24 @@ var (
func NewConfig() *Config {
environment := env("GO_TELEMETRY_ENV", "local")
return &Config{
ServerPort: env("PORT", "8080"),
WorkerPort: env("PORT", "8082"),
WorkerURL: env("GO_TELEMETRY_WORKER_URL", "http://localhost:8082"),
ProjectID: env("GO_TELEMETRY_PROJECT_ID", ""),
LocationID: env("GO_TELEMETRY_LOCATION_ID", ""),
QueueID: environment + "-worker-tasks",
IAPServiceAccount: env("GO_TELEMETRY_IAP_SERVICE_ACCOUNT", ""),
ClientID: env("GO_TELEMETRY_CLIENT_ID", ""),
StorageEmulatorHost: env("GO_TELEMETRY_STORAGE_EMULATOR_HOST", "localhost:8081"),
LocalStorage: env("GO_TELEMETRY_LOCAL_STORAGE", ".localstorage"),
ChartDataBucket: environment + "-telemetry-charted",
Env: environment,
MergedBucket: environment + "-telemetry-merged",
UploadBucket: environment + "-telemetry-uploaded",
UploadConfig: env("GO_TELEMETRY_UPLOAD_CONFIG", "./config/config.json"),
MaxRequestBytes: env("GO_TELEMETRY_MAX_REQUEST_BYTES", int64(100*1024)),
RequestTimeout: 10 * time.Duration(time.Minute),
UseGCS: *useGCS,
DevMode: *devMode,
ServerPort: env("PORT", "8080"),
WorkerPort: env("PORT", "8082"),
WorkerURL: env("GO_TELEMETRY_WORKER_URL", "http://localhost:8082"),
ProjectID: env("GO_TELEMETRY_PROJECT_ID", "go-telemetry"),
LocationID: env("GO_TELEMETRY_LOCATION_ID", ""),
QueueID: environment + "-worker-tasks",
IAPServiceAccount: env("GO_TELEMETRY_IAP_SERVICE_ACCOUNT", ""),
ClientID: env("GO_TELEMETRY_CLIENT_ID", ""),
LocalStorage: env("GO_TELEMETRY_LOCAL_STORAGE", ".localstorage"),
ChartDataBucket: environment + "-telemetry-charted",
Env: environment,
MergedBucket: environment + "-telemetry-merged",
UploadBucket: environment + "-telemetry-uploaded",
UploadConfig: env("GO_TELEMETRY_UPLOAD_CONFIG", "./config/config.json"),
MaxRequestBytes: env("GO_TELEMETRY_MAX_REQUEST_BYTES", int64(100*1024)),
RequestTimeout: 10 * time.Duration(time.Minute),
UseGCS: *useGCS,
DevMode: *devMode,
}
}

Expand Down
4 changes: 4 additions & 0 deletions godev/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func NewFSObject(b *FSBucket, name string) ObjectHandle {
return &FSObject{filename}
}

func (o *FSObject) Filename() string {
return o.filename
}

func (o *FSObject) NewReader(ctx context.Context) (io.ReadCloser, error) {
r, err := os.Open(o.filename)
if errors.Is(err, os.ErrNotExist) {
Expand Down

0 comments on commit e9e6960

Please sign in to comment.