From f863ca22b1cf88f37d2544ef6bf8ef105461467a Mon Sep 17 00:00:00 2001 From: Brennan Vincent Date: Thu, 12 Dec 2024 16:01:16 -0500 Subject: [PATCH] offline mode --- flags/flags.go | 18 +- flags/grpc.go | 3 +- go.mod | 3 +- go.sum | 6 +- main.go | 53 ++- reporter/parca_reporter.go | 345 ++++++++++++++++++- uploader/log_uploader.go | 658 +++++++++++++++++++++++++++++++++++++ 7 files changed, 1064 insertions(+), 22 deletions(-) create mode 100644 uploader/log_uploader.go diff --git a/flags/flags.go b/flags/flags.go index b4155cf435..fbe274e25b 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -22,8 +22,8 @@ import ( "time" "github.com/alecthomas/kong" - "go.opentelemetry.io/ebpf-profiler/tracer" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/ebpf-profiler/tracer" _ "google.golang.org/grpc/encoding/proto" ) @@ -121,6 +121,8 @@ type Flags struct { Hidden FlagsHidden `embed:"" hidden:"" prefix:""` BPF FlagsBPF `embed:"" prefix:"bpf-"` + + OfflineMode FlagsOfflineMode `embed:"" prefix:"offline-mode-"` } type ExitCode int @@ -192,6 +194,14 @@ func (f Flags) Validate() ExitCode { } } + if len(f.OfflineMode.StoragePath) > 0 && !f.OfflineMode.Upload && (len(f.RemoteStore.Address) > 0 || len(f.OTLP.Address) > 0) { + return ParseError("Specified both offline mode and a remote store; this configuration is invalid.") + } + + if f.OfflineMode.Upload && len(f.OfflineMode.StoragePath) == 0 { + return ParseError("Specified --offline-mode-upload without --offline-mode-storage-path.") + } + return ExitSuccess } @@ -343,3 +353,9 @@ type FlagsBPF struct { VerifierLogLevel uint32 `default:"0" help:"Log level of the eBPF verifier output (0,1,2). Default is 0."` VerifierLogSize int `default:"0" help:"[deprecated] Unused."` } + +type FlagsOfflineMode struct { + StoragePath string `help:"Enables offline mode, with the data stored at the given path."` + RotationInterval time.Duration `default:"10m" help:"How often to rotate and compress the offline mode log."` + Upload bool `help:"Run the uploader for data written in offline mode."` +} diff --git a/flags/grpc.go b/flags/grpc.go index f28be5760c..9eb5d1965c 100644 --- a/flags/grpc.go +++ b/flags/grpc.go @@ -12,10 +12,10 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/timeout" - "go.opentelemetry.io/ebpf-profiler/libpf" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" tracing "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" @@ -79,7 +79,6 @@ func (f FlagsRemoteStore) setupGrpcConnection(parent context.Context, metrics *g grpc.WithReturnConnectionError(), } - // TLS if f.Insecure { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } else { diff --git a/go.mod b/go.mod index f32557a437..ef54ae691b 100644 --- a/go.mod +++ b/go.mod @@ -15,12 +15,13 @@ require ( github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be github.com/containerd/containerd v1.7.20 github.com/docker/docker v27.1.1+incompatible + github.com/dustin/go-humanize v1.0.1 github.com/elastic/go-freelru v0.15.0 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 - github.com/klauspost/compress v1.17.9 + github.com/klauspost/compress v1.17.11 github.com/prometheus/client_golang v1.19.1 github.com/prometheus/common v0.54.0 github.com/prometheus/prometheus v0.53.1 diff --git a/go.sum b/go.sum index 5313ce3c88..c649a6a8f1 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,8 @@ github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c h1:+pKlWGMw7gf6bQ github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/go-freelru v0.15.0 h1:Jo1aY8JAvpyxbTDJEudrsBfjFDaALpfVv8mxuh9sfvI= github.com/elastic/go-freelru v0.15.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a h1:ymmtaN4bVCmKKeu4XEf6JEWNZKRXPMng1zjpKd+8rCU= @@ -197,8 +199,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= diff --git a/main.go b/main.go index 7f6240585f..bd9b697eb4 100644 --- a/main.go +++ b/main.go @@ -46,11 +46,13 @@ import ( "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "golang.org/x/sys/unix" + "google.golang.org/grpc" "github.com/parca-dev/parca-agent/analytics" "github.com/parca-dev/parca-agent/config" "github.com/parca-dev/parca-agent/flags" "github.com/parca-dev/parca-agent/reporter" + "github.com/parca-dev/parca-agent/uploader" ) var ( @@ -136,6 +138,14 @@ func mainWithExitCode() flags.ExitCode { return code } + if f.OfflineMode.Upload { + code, err := uploader.OfflineModeDoUpload(f) + if err != nil { + log.Errorf("failed to upload offline mode logs: %v", err) + } + return code + } + reg := prometheus.NewRegistry() reg.MustRegister( collectors.NewBuildInfoCollector(), @@ -164,12 +174,17 @@ func mainWithExitCode() flags.ExitCode { } } - grpcConn, err := f.RemoteStore.WaitGrpcEndpoint(ctx, reg, tp) - if err != nil { - log.Errorf("failed to connect to server: %v", err) - return flags.ExitFailure + isOfflineMode := len(f.OfflineMode.StoragePath) > 0 + + var grpcConn *grpc.ClientConn + if !isOfflineMode { + grpcConn, err = f.RemoteStore.WaitGrpcEndpoint(ctx, reg, tp) + if err != nil { + log.Errorf("failed to connect to server: %v", err) + return flags.ExitFailure + } + defer grpcConn.Close() } - defer grpcConn.Close() presentCores, err := numcpus.GetPresent() if err != nil { @@ -291,17 +306,32 @@ func mainWithExitCode() flags.ExitCode { intervals := times.New(5*time.Second, f.Profiling.Duration, f.Profiling.ProbabilisticInterval) times.StartRealtimeSync(mainCtx, f.ClockSyncInterval) + var client profilestoregrpc.ProfileStoreServiceClient + var debuginfoClient debuginfogrpc.DebuginfoServiceClient + if grpcConn != nil { + client = profilestoregrpc.NewProfileStoreServiceClient(grpcConn) + debuginfoClient = debuginfogrpc.NewDebuginfoServiceClient(grpcConn) + } + + var offlineModeConfig *reporter.OfflineModeConfig + if isOfflineMode { + offlineModeConfig = &reporter.OfflineModeConfig{ + StoragePath: f.OfflineMode.StoragePath, + RotationInterval: f.OfflineMode.RotationInterval, + } + } + // Network operations to CA start here // Connect to the collection agent parcaReporter, err := reporter.New( memory.DefaultAllocator, - profilestoregrpc.NewProfileStoreServiceClient(grpcConn), - debuginfogrpc.NewDebuginfoServiceClient(grpcConn), + client, + debuginfoClient, externalLabels, f.Profiling.Duration, f.Debuginfo.Strip, f.Debuginfo.UploadMaxParallel, - f.Debuginfo.UploadDisable, + f.Debuginfo.UploadDisable || isOfflineMode, int64(f.Profiling.CPUSamplingFrequency), traceHandlerCacheSize, f.Debuginfo.UploadQueueSize, @@ -310,6 +340,7 @@ func mainWithExitCode() flags.ExitCode { relabelConfigs, buildInfo.VcsRevision, reg, + offlineModeConfig, ) if err != nil { return flags.Failure("Failed to start reporting: %v", err) @@ -413,8 +444,10 @@ func mainWithExitCode() flags.ExitCode { log.Info("Stop processing ...") rep.Stop() - if err := grpcConn.Close(); err != nil { - log.Fatalf("Stopping connection of OTLP client client failed: %v", err) + if grpcConn != nil { + if err := grpcConn.Close(); err != nil { + log.Fatalf("Stopping connection of OTLP client client failed: %v", err) + } } log.Info("Exiting ...") diff --git a/reporter/parca_reporter.go b/reporter/parca_reporter.go index a90ba97952..644b518672 100644 --- a/reporter/parca_reporter.go +++ b/reporter/parca_reporter.go @@ -10,9 +10,12 @@ import ( "bytes" "context" "debug/elf" + "encoding/binary" "errors" "fmt" "io" + "os" + "path" "strings" "sync" "time" @@ -25,6 +28,7 @@ import ( "github.com/apache/arrow/go/v16/arrow/ipc" "github.com/apache/arrow/go/v16/arrow/memory" lru "github.com/elastic/go-freelru" + "github.com/klauspost/compress/zstd" "github.com/parca-dev/parca-agent/metrics" "github.com/parca-dev/parca-agent/reporter/metadata" "github.com/prometheus/client_golang/prometheus" @@ -134,6 +138,22 @@ type ParcaReporter struct { // Our own metrics sampleWriteRequestBytes prometheus.Counter stacktraceWriteRequestBytes prometheus.Counter + + offlineModeConfig *OfflineModeConfig + + // Protects the log file, + // which is accessed from both the main reporter loop + // and the rotator + offlineModeLogMu sync.Mutex + + offlineModeLogFile *os.File + offlineModeLogPath string + + offlineModeNBatchesInCurrentFile uint16 + + // Set of stacks that are already in the current log, + // meaning we don't need to log them again. + offlineModeLoggedStacks *lru.SyncedLRU[libpf.TraceHash, struct{}] } // hashString is a helper function for LRUs that use string as a key. @@ -425,6 +445,11 @@ func (l Labels) String() string { return buf.String() } +type OfflineModeConfig struct { + StoragePath string + RotationInterval time.Duration +} + // New creates a ParcaReporter. func New( mem memory.Allocator, @@ -443,7 +468,11 @@ func New( relabelConfigs []*relabel.Config, agentRevision string, reg prometheus.Registerer, + offlineModeConfig *OfflineModeConfig, ) (*ParcaReporter, error) { + if offlineModeConfig != nil && !disableSymbolUpload { + return nil, errors.New("Illogical configuration: offline mode with symbol upload enabled") + } executables, err := lru.NewSynced[libpf.FileID, metadata.ExecInfo](cacheSize, libpf.FileID.Hash32) if err != nil { return nil, err @@ -465,6 +494,14 @@ func New( return nil, err } + var loggedStacks *lru.SyncedLRU[libpf.TraceHash, struct{}] + if offlineModeConfig != nil { + loggedStacks, err = lru.NewSynced[libpf.TraceHash, struct{}](cacheSize, libpf.TraceHash.Hash32) + if err != nil { + return nil, err + } + } + cmp, err := metadata.NewContainerMetadataProvider(context.TODO(), nodeName) if err != nil { return nil, err @@ -513,6 +550,8 @@ func New( otelLibraryMetrics: make(map[string]prometheus.Metric), sampleWriteRequestBytes: sampleWriteRequestBytes, stacktraceWriteRequestBytes: stacktraceWriteRequestBytes, + offlineModeConfig: offlineModeConfig, + offlineModeLoggedStacks: loggedStacks, } r.client = client @@ -536,6 +575,145 @@ func New( return r, nil } +const DATA_FILE_EXTENSION string = ".padata" +const DATA_FILE_COMPRESSED_EXTENSION string = ".padata.zst" + +// initialScan inspects the storage directory to determine its size, and whether there are any +// uncompressed files lying around. +// It returns a map of filenames to sizes, a list of uncompressed files, and the total size. +func initialScan(storagePath string) (map[string]uint64, []string, uint64, error) { + existingFileSizes := make(map[string]uint64) + uncompressedFiles := make([]string, 0) + totalSize := uint64(0) + + files, err := os.ReadDir(storagePath) + if err != nil { + return nil, nil, 0, err + } + + for _, file := range files { + fname := file.Name() + if !file.Type().IsRegular() { + log.Warnf("Directory or special file %s in storage path; skipping", fname) + continue + } + if strings.HasSuffix(fname, DATA_FILE_COMPRESSED_EXTENSION) { + info, err := file.Info() + if err != nil { + return nil, nil, 0, fmt.Errorf("failed stat of file %s: %w", fname, err) + } + sz := uint64(info.Size()) + existingFileSizes[fname] = sz + totalSize += sz + } else if strings.HasSuffix(fname, DATA_FILE_EXTENSION) { + uncompressedFiles = append(uncompressedFiles, fname) + } else { + log.Warnf("Unrecognized file %s; skipping", fname) + } + } + return existingFileSizes, uncompressedFiles, totalSize, nil +} + +func compressFile(file io.Reader, fpath, compressedFpath string) error { + compressedLog, err := os.OpenFile(compressedFpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0660) + if err != nil { + return fmt.Errorf("Failed to create compressed file %s for log rotation: %w", compressedFpath, err) + } + zstdWriter, err := zstd.NewWriter(compressedLog) + if err != nil { + return fmt.Errorf("Failed to create zstd writer for file %s: %w", compressedFpath, err) + } + if _, err = io.Copy(zstdWriter, file); err != nil { + return fmt.Errorf("Failed to write compressed log %s: %w", compressedFpath, err) + } + zstdWriter.Close() + if err = compressedLog.Close(); err != nil { + return fmt.Errorf("Failed to close compressed file %s: %w", compressedFpath, err) + } + log.Debugf("Successfully wrote compressed file %s", compressedFpath) + + err = os.Remove(fpath) + if err != nil { + return fmt.Errorf("Failed to remove uncompressed file: %w", err) + } + return nil +} + +func setupOfflineModeLog(fpath string) (*os.File, error) { + // Open the log file + file, err := os.OpenFile(fpath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0660) + if err != nil { + return nil, fmt.Errorf("failed to create new offline mode file %s: %w", fpath, err) + } + + // magic number (4 bytes, 0xA6E7CCCA), followed by format version (2 bytes), + // followed by number of batches (2 bytes) + if _, err = file.Write([]byte{0xA6, 0xE7, 0xCC, 0xCA, 0, 0, 0, 0}); err != nil { + return nil, fmt.Errorf("failed to write to offline mode file %s: %w", fpath, err) + } + + return file, nil +} + +func (r *ParcaReporter) rotateOfflineModeLog() error { + fpath := fmt.Sprintf("%s/%d-%d%s", r.offlineModeConfig.StoragePath, time.Now().Unix(), os.Getpid(), DATA_FILE_EXTENSION) + + logFile, err := setupOfflineModeLog(fpath) + if err != nil { + return fmt.Errorf("Failed to create new log %s for offline mode: %w", fpath, err) + + } + // We are connected to the new log, let's take the old one and compress it + r.offlineModeLogMu.Lock() + oldLog := r.offlineModeLogFile + r.offlineModeLogFile = logFile + oldFpath := r.offlineModeLogPath + r.offlineModeLogPath = fpath + r.offlineModeLoggedStacks.Purge() + r.offlineModeNBatchesInCurrentFile = 0 + r.offlineModeLogMu.Unlock() + defer oldLog.Close() + _, err = oldLog.Seek(0, 0) + if err != nil { + return errors.New("Failed to seek to beginning of file") + } + compressedFpath := fmt.Sprintf("%s.zst", oldFpath) + return compressFile(oldLog, oldFpath, compressedFpath) +} + +func (r *ParcaReporter) runOfflineModeRotation(ctx context.Context) error { + _, uncompressedFiles, _, err := initialScan(r.offlineModeConfig.StoragePath) + if err != nil { + return err + } + + for _, fname := range uncompressedFiles { + fpath := path.Join(r.offlineModeConfig.StoragePath, fname) + compressedFpath := fmt.Sprintf("%s.zst", fpath) + f, err := os.Open(fpath) + if err != nil { + return err + } + + err = compressFile(f, fpath, compressedFpath) + if err != nil { + return err + } + } + tick := time.NewTicker(r.offlineModeConfig.RotationInterval) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-r.stopSignal: + return nil + case <-tick.C: + r.rotateOfflineModeLog() + } + } +} + func (r *ParcaReporter) Start(mainCtx context.Context) error { // Create a child context for reporting features ctx, cancelReporting := context.WithCancel(mainCtx) @@ -548,6 +726,17 @@ func (r *ParcaReporter) Start(mainCtx context.Context) error { }() } + if r.offlineModeConfig != nil { + if err := os.MkdirAll(r.offlineModeConfig.StoragePath, 0770); err != nil { + return fmt.Errorf("error creating offline mode storage: %v", err) + } + go func() { + if err := r.runOfflineModeRotation(ctx); err != nil { + log.Fatalf("Running offline mode rotation failed: %v", err) + } + }() + } + go func() { tick := time.NewTicker(r.reportInterval) buf := bytes.NewBuffer(nil) @@ -559,8 +748,17 @@ func (r *ParcaReporter) Start(mainCtx context.Context) error { case <-r.stopSignal: return case <-tick.C: - if err := r.reportDataToBackend(ctx, buf); err != nil { - log.Errorf("Request failed: %v", err) + if r.offlineModeConfig != nil { + if err := r.logDataForOfflineMode(ctx, buf); err != nil { + log.Errorf("error producing offline mode file: %v.\nForcing rotation as the file might be corrupt.", err) + if err := r.rotateOfflineModeLog(); err != nil { + log.Errorf("failed to rotate log: %v", err) + } + } + } else { + if err := r.reportDataToBackend(ctx, buf); err != nil { + log.Errorf("Request failed: %v", err) + } } tick.Reset(libpf.AddJitter(r.reportInterval, 0.2)) } @@ -577,9 +775,143 @@ func (r *ParcaReporter) Start(mainCtx context.Context) error { return nil } +func (r *ParcaReporter) logDataForOfflineMode(ctx context.Context, buf *bytes.Buffer) error { + record, nLabelCols := r.buildSampleRecord(ctx) + defer record.Release() + + if record.NumRows() == 0 { + log.Debugf("Skip logging batch with no samples") + return nil + } + + buf.Reset() + + w := ipc.NewWriter(buf, + ipc.WithSchema(record.Schema()), + ipc.WithAllocator(r.mem), + ) + + if err := w.Write(record); err != nil { + return fmt.Errorf("failed to write samples: %w", err) + } + + if err := w.Close(); err != nil { + return fmt.Errorf("failed to close samples writer: %w", err) + } + + r.offlineModeLogMu.Lock() + defer r.offlineModeLogMu.Unlock() + if r.offlineModeLogFile == nil { + fpath := fmt.Sprintf("%s/%d-%d%s", r.offlineModeConfig.StoragePath, time.Now().Unix(), os.Getpid(), DATA_FILE_EXTENSION) + + logFile, err := setupOfflineModeLog(fpath) + if err != nil { + return fmt.Errorf("failed to set up offline mode log file: %w", err) + } + r.offlineModeLogFile = logFile + r.offlineModeLogPath = fpath + r.offlineModeLoggedStacks.Purge() + r.offlineModeNBatchesInCurrentFile = 0 + } + + sz := uint32(buf.Len()) + if err := binary.Write(r.offlineModeLogFile, binary.BigEndian, sz); err != nil { + return fmt.Errorf("failed to write to log %s: %w", r.offlineModeLogPath, err) + } + + if _, err := r.offlineModeLogFile.Write(buf.Bytes()); err != nil { + return fmt.Errorf("Failed to write to log %s: %v", r.offlineModeLogPath, err) + } + + r.sampleWriteRequestBytes.Add(float64(buf.Len())) + + sidFieldIdx := nLabelCols + sidField := record.Schema().Field(sidFieldIdx) + if sidField.Name != "stacktrace_id" { + panic("mismatched schema: last field is named " + sidField.Name) + } + + // we don't use the two-value variant because if + // panics happen here, it can only represent a programming bug + // (schema of the record we just created doesn't match our expectations) + ree := record.Column(sidFieldIdx).(*array.RunEndEncoded) + dict := ree.Values().(*array.Dictionary) + b := array.NewBuilder(r.mem, dict.DataType()).(*array.BinaryDictionaryBuilder) + defer b.Release() + + binDict := dict.Dictionary().(*array.Binary) + runEnds := ree.RunEndsArr().(*array.Int32) + for i := 0; i < runEnds.Len(); i++ { + if !dict.IsNull(i) { + v := binDict.Value(dict.GetValueIndex(i)) + hash, err := libpf.TraceHashFromBytes(v) + if err != nil { + return fmt.Errorf("Failed to construct hash from bytes: %w", err) + } + _, exists := r.offlineModeLoggedStacks.Get(hash) + r.offlineModeLoggedStacks.Add(hash, struct{}{}) + if exists { + continue + } + if err := b.Append(v); err != nil { + // how can appending to an in-memory buffer ever fail? + // From a brief glance at the Arrow source code, it doesn't seem like it can. + return fmt.Errorf("failed to construct IDs record; this should never happen. err: %w", err) + } + } + } + idsDict := b.NewArray().(*array.Dictionary) + defer idsDict.Release() + idsBinary := idsDict.Dictionary().(*array.Binary) + + rec, err := r.buildStacktraceRecord(ctx, idsBinary) + if err != nil { + return fmt.Errorf("Failed to build stacktrace record: %v", err) + } + + buf.Reset() + w = ipc.NewWriter(buf, + ipc.WithSchema(rec.Schema()), + ipc.WithAllocator(r.mem), + ) + + if err := w.Write(rec); err != nil { + return fmt.Errorf("Failed to write stacktrace record: %v", err) + } + + if err := w.Close(); err != nil { + return fmt.Errorf("Failed to close stacktrace writer: %v", err) + } + + sz = uint32(buf.Len()) + if err := binary.Write(r.offlineModeLogFile, binary.BigEndian, sz); err != nil { + return fmt.Errorf("Failed to write to log %s: %v", r.offlineModeLogPath, err) + } + if _, err := r.offlineModeLogFile.Write(buf.Bytes()); err != nil { + return fmt.Errorf("Failed to write to log %s: %v", r.offlineModeLogPath, err) + } + r.stacktraceWriteRequestBytes.Add(float64(buf.Len())) + // We need to fsync before updating the number of records at the head of the file. Otherwise, + // the kernel might persist that update before persisting the record we just wrote, and we might + // read a corrupt file. + if err := r.offlineModeLogFile.Sync(); err != nil { + return fmt.Errorf("Failed to fsync log %s: %v", r.offlineModeLogPath, err) + } + + r.offlineModeNBatchesInCurrentFile += 1 + n := r.offlineModeNBatchesInCurrentFile + log.Debugf("wrote batch %d", n) + + if _, err = r.offlineModeLogFile.WriteAt([]byte{byte(n / 256), byte(n)}, 6); err != nil { + return fmt.Errorf("Failed to write to log %s: %v", r.offlineModeLogPath, err) + } + + return nil +} + // reportDataToBackend creates and sends out an arrow record for a Parca backend. func (r *ParcaReporter) reportDataToBackend(ctx context.Context, buf *bytes.Buffer) error { - record := r.buildSampleRecord(ctx) + record, _ := r.buildSampleRecord(ctx) defer record.Release() if record.NumRows() == 0 { @@ -699,12 +1031,13 @@ func (r *ParcaReporter) writeCommonLabels(w *SampleWriter, rows uint64) { } // buildSampleRecord returns an apache arrow record containing all collected -// samples up to this moment. It does not contain the full stacktraces, only +// samples up to this moment, as well as the number of label columns. +// The arrow record does not contain the full stacktraces, only // the stacktrace IDs, depending on whether the backend already knows the // stacktrace ID, it might request the full stacktrace from the agent. The // second return value contains all the raw samples, which can be used to // resolve the stacktraces. -func (r *ParcaReporter) buildSampleRecord(ctx context.Context) arrow.Record { +func (r *ParcaReporter) buildSampleRecord(ctx context.Context) (arrow.Record, int) { newWriter := NewSampleWriter(r.mem) r.sampleWriterMu.Lock() @@ -736,7 +1069,7 @@ func (r *ParcaReporter) buildSampleRecord(ctx context.Context) arrow.Record { w.Duration.ree.Append(rows) w.Duration.ib.Append(time.Second.Nanoseconds()) - return w.NewRecord() + return w.NewRecord(), len(w.labelBuilders) } func (r *ParcaReporter) buildStacktraceRecord(ctx context.Context, stacktraceIDs *array.Binary) (arrow.Record, error) { diff --git a/uploader/log_uploader.go b/uploader/log_uploader.go new file mode 100644 index 0000000000..8eb776c5c0 --- /dev/null +++ b/uploader/log_uploader.go @@ -0,0 +1,658 @@ +package uploader + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + profilestoregrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/profilestore/v1alpha1/profilestorev1alpha1grpc" + profilestorepb "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/profilestore/v1alpha1" + "github.com/apache/arrow/go/v16/arrow" + "github.com/apache/arrow/go/v16/arrow/array" + "github.com/apache/arrow/go/v16/arrow/ipc" + "github.com/apache/arrow/go/v16/arrow/memory" + "github.com/dustin/go-humanize" + "github.com/klauspost/compress/zstd" + "github.com/parca-dev/parca-agent/flags" + "github.com/parca-dev/parca-agent/reporter" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/otel/trace/noop" +) + +type stacktraceCursor struct { + batchIdx int + idx int +} + +type locationsReader struct { + Locations *array.List + Location *array.Struct + Address *array.Uint64 + FrameType *array.RunEndEncoded + FrameTypeDict *array.Dictionary + FrameTypeDictValues *array.Binary + MappingStart *array.RunEndEncoded + MappingStartValues *array.Uint64 + MappingLimit *array.RunEndEncoded + MappingLimitValues *array.Uint64 + MappingOffset *array.RunEndEncoded + MappingOffsetValues *array.Uint64 + MappingFile *array.RunEndEncoded + MappingFileDict *array.Dictionary + MappingFileDictValues *array.Binary + MappingBuildID *array.RunEndEncoded + MappingBuildIDDict *array.Dictionary + MappingBuildIDDictValues *array.Binary + Lines *array.List + Line *array.Struct + LineNumber *array.Int64 + LineFunctionName *array.Dictionary + LineFunctionNameDict *array.Binary + LineFunctionSystemName *array.Dictionary + LineFunctionSystemNameDict *array.Binary + LineFunctionFilename *array.RunEndEncoded + LineFunctionFilenameDict *array.Dictionary + LineFunctionFilenameDictValues *array.Binary + LineFunctionStartLine *array.Int64 +} + +func getREEUint64(arr arrow.Array, fieldName string) (*array.RunEndEncoded, *array.Uint64, error) { + ree, ok := arr.(*array.RunEndEncoded) + if !ok { + return nil, nil, fmt.Errorf("expected column %q to be of type RunEndEncoded, got %T", fieldName, arr) + } + + uint64Arr, ok := ree.Values().(*array.Uint64) + if !ok { + return nil, nil, fmt.Errorf("expected column %q to be of type RunEndEncoded with Uint64 Values, got %T", fieldName, arr) + } + + return ree, uint64Arr, nil +} + +func getREEBinaryDict(arr arrow.Array, fieldName string) (*array.RunEndEncoded, *array.Dictionary, *array.Binary, error) { + ree, ok := arr.(*array.RunEndEncoded) + if !ok { + return nil, nil, nil, fmt.Errorf("expected column %q to be of type RunEndEncoded, got %T", fieldName, arr) + } + + dict, ok := ree.Values().(*array.Dictionary) + if !ok { + return nil, nil, nil, fmt.Errorf("expected column %q to be of type RunEndEncedod with Dictionary Values, got %T", fieldName, arr) + } + + binDict, ok := dict.Dictionary().(*array.Binary) + if !ok { + return nil, nil, nil, fmt.Errorf("expected column %q to be a RunEndEncoded with Dictionary Values of type Binary, got %T", fieldName, dict.Dictionary()) + } + + return ree, dict, binDict, nil +} + +func getBinaryDict(arr arrow.Array, fieldName string) (*array.Dictionary, *array.Binary, error) { + dict, ok := arr.(*array.Dictionary) + if !ok { + return nil, nil, fmt.Errorf("expected column %q to be of type Dictionary, got %T", fieldName, arr) + } + + binDict, ok := dict.Dictionary().(*array.Binary) + if !ok { + return nil, nil, fmt.Errorf("expected column %q to be a Dictionary with Values of type Binary, got %T", fieldName, dict.Dictionary()) + } + + return dict, binDict, nil +} + +func getLocationsReader(locations *array.List) (*locationsReader, error) { + location, ok := locations.ListValues().(*array.Struct) + if !ok { + return nil, fmt.Errorf("expected column %q to be of type Struct, got %T", "locations", locations.ListValues()) + } + + const expectedLocationFields = 8 + if location.NumField() != expectedLocationFields { + return nil, fmt.Errorf("expected location struct column to have %d fields, got %d", expectedLocationFields, location.NumField()) + } + + address, ok := location.Field(0).(*array.Uint64) + if !ok { + return nil, fmt.Errorf("expected column address to be of type Uint64, got %T", location.Field(0)) + } + + frameType, frameTypeDict, frameTypeDictValues, err := getREEBinaryDict(location.Field(1), "frame_type") + + mappingStart, mappingStartValues, err := getREEUint64(location.Field(2), "mapping_start") + if err != nil { + return nil, err + } + + mappingLimit, mappingLimitValues, err := getREEUint64(location.Field(3), "mapping_limit") + if err != nil { + return nil, err + } + + mappingOffset, mappingOffsetValues, err := getREEUint64(location.Field(4), "mapping_offset") + if err != nil { + return nil, err + } + + mappingFile, mappingFileDict, mappingFileDictValues, err := getREEBinaryDict(location.Field(5), "mapping_file") + if err != nil { + return nil, err + } + + mappingBuildID, mappingBuildIDDict, mappingBuildIDValues, err := getREEBinaryDict(location.Field(6), "mapping_build_id") + if err != nil { + return nil, err + } + + lines, ok := location.Field(7).(*array.List) + if !ok { + return nil, fmt.Errorf("expected column lines to be of type List, got %T", location.Field(7)) + } + + line, ok := lines.ListValues().(*array.Struct) + if !ok { + return nil, fmt.Errorf("expected column line to be of type Struct, got %T", lines.ListValues()) + } + + const expectedLineFields = 5 + if line.NumField() != expectedLineFields { + return nil, fmt.Errorf("expected line struct column to have %d fields, got %d", expectedLineFields, line.NumField()) + } + + lineNumber, ok := line.Field(0).(*array.Int64) + if !ok { + return nil, fmt.Errorf("expected column line_number to be of type Int64, got %T", line.Field(0)) + } + + lineFunctionName, lineFunctionNameDict, err := getBinaryDict(line.Field(1), "line_function_name") + if err != nil { + return nil, err + } + + lineFunctionSystemName, lineFunctionSystemNameDict, err := getBinaryDict(line.Field(2), "line_function_system_name") + if err != nil { + return nil, err + } + + lineFunctionFilename, lineFunctionFilenameDict, lineFunctionFilenameDictValues, err := getREEBinaryDict(line.Field(3), "line_function_filename") + if err != nil { + return nil, err + } + + lineFunctionStartLine, ok := line.Field(4).(*array.Int64) + if !ok { + return nil, fmt.Errorf("expected column line_function_start_line to be of type Int64, got %T", line.Field(4)) + } + + return &locationsReader{ + Locations: locations, + Location: location, + Address: address, + FrameType: frameType, + FrameTypeDict: frameTypeDict, + FrameTypeDictValues: frameTypeDictValues, + MappingStart: mappingStart, + MappingStartValues: mappingStartValues, + MappingLimit: mappingLimit, + MappingLimitValues: mappingLimitValues, + MappingOffset: mappingOffset, + MappingOffsetValues: mappingOffsetValues, + MappingFile: mappingFile, + MappingFileDict: mappingFileDict, + MappingFileDictValues: mappingFileDictValues, + MappingBuildID: mappingBuildID, + MappingBuildIDDict: mappingBuildIDDict, + MappingBuildIDDictValues: mappingBuildIDValues, + Lines: lines, + Line: line, + LineNumber: lineNumber, + LineFunctionName: lineFunctionName, + LineFunctionNameDict: lineFunctionNameDict, + LineFunctionSystemName: lineFunctionSystemName, + LineFunctionSystemNameDict: lineFunctionSystemNameDict, + LineFunctionFilename: lineFunctionFilename, + LineFunctionFilenameDict: lineFunctionFilenameDict, + LineFunctionFilenameDictValues: lineFunctionFilenameDictValues, + LineFunctionStartLine: lineFunctionStartLine, + }, nil +} + +func reeDictValueString(i int, ree *array.RunEndEncoded, dict *array.Dictionary, values *array.Binary) string { + return values.ValueString(dict.GetValueIndex(ree.GetPhysicalIndex(int(i)))) +} + +func (rdr *locationsReader) frameString(i int) string { + return reeDictValueString(i, rdr.FrameType, rdr.FrameTypeDict, rdr.FrameTypeDictValues) +} + +func (rdr *locationsReader) mappingFileString(i int) string { + return reeDictValueString(i, rdr.MappingFile, rdr.MappingFileDict, rdr.MappingFileDictValues) +} + +func (rdr *locationsReader) mappingBuildIDString(i int) string { + return reeDictValueString(i, rdr.MappingBuildID, rdr.MappingBuildIDDict, rdr.MappingBuildIDDictValues) +} + +func (rdr *locationsReader) functionFilenameString(i int) string { + return reeDictValueString(i, rdr.LineFunctionFilename, rdr.LineFunctionFilenameDict, rdr.LineFunctionFilenameDictValues) +} + +func (rdr *locationsReader) functionNameString(i int) string { + return rdr.LineFunctionNameDict.ValueString(rdr.LineFunctionName.GetValueIndex(i)) +} + +func (rdr *locationsReader) functionSystemNameString(i int) string { + return rdr.LineFunctionSystemNameDict.ValueString(rdr.LineFunctionSystemName.GetValueIndex(i)) +} + +type stacktraceReader struct { + record arrow.Record + ids *array.Binary + locations *locationsReader + isComplete *array.Boolean +} + +func newStacktraceReader(rec arrow.Record) (stacktraceReader, error) { + schema := rec.Schema() + var ( + stacktraceIDs *array.Binary + locations *array.List + isComplete *array.Boolean + ok bool + ) + + for i, field := range schema.Fields() { + switch field.Name { + case "stacktrace_id": + stacktraceIDs, ok = rec.Column(i).(*array.Binary) + if !ok { + return stacktraceReader{}, fmt.Errorf("expected column %q to be of type Binary, got %T", field.Name, rec.Column(i)) + } + + case "locations": + locations, ok = rec.Column(i).(*array.List) + if !ok { + return stacktraceReader{}, fmt.Errorf("expected column %q to be of type List, got %T", field.Name, rec.Column(i)) + } + } + + if field.Name == "is_complete" { + isComplete, ok = rec.Column(i).(*array.Boolean) + if !ok { + return stacktraceReader{}, fmt.Errorf("expected column %q to be of type Boolean, got %T", field.Name, rec.Column(i)) + } + } + } + + if stacktraceIDs == nil { + return stacktraceReader{}, errors.New("missing column stacktrace_id") + } + + if locations == nil { + return stacktraceReader{}, errors.New("missing column locations") + } + + if isComplete == nil { + return stacktraceReader{}, errors.New("missing column is_complete") + } + + rdr, err := getLocationsReader(locations) + if err != nil { + return stacktraceReader{}, err + } + return stacktraceReader{ + record: rec, + ids: stacktraceIDs, + isComplete: isComplete, + locations: rdr, + }, nil +} + +func filterTraces(stacktraceIds *array.Binary, stacktraceReaders []stacktraceReader, idToStacktrace map[libpf.TraceHash]stacktraceCursor, mem memory.Allocator) (arrow.Record, error) { + w := reporter.NewLocationsWriter(mem) + + for i := 0; i < stacktraceIds.Len(); i++ { + if !stacktraceIds.IsValid(i) { + w.LocationsList.Append(false) + w.IsComplete.Append(false) + continue + } + stacktraceId, err := libpf.TraceHashFromBytes(stacktraceIds.Value(i)) + if err != nil { + return nil, err + } + cur, ok := idToStacktrace[stacktraceId] + if !ok { + w.LocationsList.Append(false) + w.IsComplete.Append(false) + log.Errorf("Location not found for id: %v", stacktraceId) + continue + } + + rdr := stacktraceReaders[cur.batchIdx] + + if !rdr.locations.Locations.IsValid(cur.idx) { + w.LocationsList.Append(false) + w.IsComplete.Append(false) + continue + } + w.IsComplete.Append(rdr.isComplete.Value(cur.idx)) + locStart, locEnd := rdr.locations.Locations.ValueOffsets(cur.idx) + if locEnd-locStart <= 0 { + w.LocationsList.Append(false) + } else { + w.LocationsList.Append(true) + for j := locStart; j < locEnd; j++ { + w.Locations.Append(true) + w.Address.Append(rdr.locations.Address.Value(int(j))) + w.FrameType.AppendString(rdr.locations.frameString(int(j))) + w.MappingFile.AppendString(rdr.locations.mappingFileString(int(j))) + w.MappingBuildID.AppendString(rdr.locations.mappingBuildIDString(int(j))) + + // there are actually possibly N lines per location, + // but we only produce at most one today. + lineStart, lineEnd := rdr.locations.Lines.ValueOffsets(int(j)) + hasLine := lineEnd > lineStart + if hasLine { + w.Lines.Append(true) + w.Line.Append(true) + + w.FunctionFilename.AppendString(rdr.locations.functionFilenameString(int(lineStart))) + w.LineNumber.Append(rdr.locations.LineNumber.Value(int(lineStart))) + w.FunctionName.AppendString(rdr.locations.functionNameString(int(lineStart))) + w.FunctionSystemName.AppendString(rdr.locations.functionSystemNameString(int(lineStart))) + w.FunctionStartLine.Append(rdr.locations.LineFunctionStartLine.Value(int(lineStart))) + } else { + w.Lines.Append(false) + } + } + } + } + return w.NewRecord(stacktraceIds), nil +} + +// like io.Reader, but lets you +// skip forward. +type readSkipper interface { + io.Reader + Skip(uint) error +} + +type skippableFile struct { + f *os.File +} + +func (f skippableFile) Read(p []byte) (n int, err error) { + return f.f.Read(p) +} + +func (f skippableFile) Skip(distance uint) error { + _, err := f.f.Seek(int64(distance), io.SeekCurrent) + return err +} + +type skippableZstdStream struct { + s *zstd.Decoder +} + +func (s skippableZstdStream) Read(p []byte) (n int, err error) { + return s.s.Read(p) +} + +func (s skippableZstdStream) Skip(distance uint) error { + // we could refactor this to avoid an allocation, + // but who cares -- it will only be called at most twice per + // batch. + ignored := make([]byte, distance) + _, err := s.s.Read(ignored) + return err +} + +func UploadLog(ctx context.Context, r readSkipper, rpc profilestoregrpc.ProfileStoreServiceClient, buf *bytes.Buffer, mem memory.Allocator) (error, uint64, uint64) { + // buf := make([]byte, 4) + var magic uint32 + if err := binary.Read(r, binary.BigEndian, &magic); err != nil { + return fmt.Errorf("err reading magic: %w", err), 0, 0 + } + if magic != 0xA6E7CCCA { + return errors.New("Incorrect magic number"), 0, 0 + } + + var formatVersion uint16 + if err := binary.Read(r, binary.BigEndian, &formatVersion); err != nil { + return fmt.Errorf("err reading format version: %w", err), 0, 0 + } + if formatVersion != 0 { + return fmt.Errorf("unexpected format version: %d", formatVersion), 0, 0 + } + + var nBatches uint16 + if err := binary.Read(r, binary.BigEndian, &nBatches); err != nil { + return fmt.Errorf("err reading num of batches: %w", err), 0, 0 + } + log.Infof("uploading %d batches", nBatches) + + stacktraceReaders := make([]stacktraceReader, 0) + idToStacktrace := make(map[libpf.TraceHash]stacktraceCursor) + + var bytesSamples, bytesSts uint64 + for i := 0; i < int(nBatches); i++ { + var sz uint32 + log.Debugf("reading batch %d/%d", i+1, nBatches) + if err := binary.Read(r, binary.BigEndian, &sz); err != nil { + return fmt.Errorf("err reading samples size: %w", err), bytesSamples, bytesSts + } + + buf.Reset() + if _, err := io.CopyN(buf, r, int64(sz)); err != nil { + return fmt.Errorf("err reading %d bytes for samples: %w", sz, err), bytesSamples, bytesSts + } + + client, err := rpc.Write(ctx) + if err != nil { + return fmt.Errorf("err getting write request client: %w", err), bytesSamples, bytesSts + } + if err := client.Send(&profilestorepb.WriteRequest{ + Record: buf.Bytes(), + }); err != nil { + return fmt.Errorf("err making write request for samples: %w", err), bytesSamples, bytesSts + } + bytesSamples += uint64(sz) + + resp, err := client.Recv() + if err != nil && err != io.EOF { + return fmt.Errorf("err on recv: %w", err), bytesSamples, bytesSts + } + reader, err := ipc.NewReader( + bytes.NewReader(resp.Record), + ipc.WithAllocator(mem), + ) + if err != nil { + return err, bytesSamples, bytesSts + } + defer reader.Release() + + if !reader.Next() { + return errors.New("arrow/ipc: could not read record from stream"), bytesSamples, bytesSts + } + + if reader.Err() != nil { + return fmt.Errorf("err reading response: %w", reader.Err()), bytesSamples, bytesSts + } + + rec := reader.Record() + defer rec.Release() + + fields := rec.Schema().Fields() + if len(fields) != 1 { + return fmt.Errorf("arrow/ipc: invalid number of fields in record (got=%d, want=1)", len(fields)), bytesSamples, bytesSts + } + + if fields[0].Name != "stacktrace_id" { + return fmt.Errorf("arrow/ipc: invalid field name in record (got=%s, want=stacktrace_id)", fields[0].Name), bytesSamples, bytesSts + } + + stacktraceIDs, ok := rec.Column(0).(*array.Binary) + if !ok { + return fmt.Errorf("arrow/ipc: invalid column type in record (got=%T, want=*array.Binary)", rec.Column(0)), bytesSamples, bytesSts + } + + if err := binary.Read(r, binary.BigEndian, &sz); err != nil { + return fmt.Errorf("err reading stacktraces size: %w", err), bytesSamples, bytesSts + } + + lim := io.LimitReader(r, int64(sz)) + stsReader, err := ipc.NewReader( + lim, + ipc.WithAllocator(mem), + ) + if err != nil { + return fmt.Errorf("err creating stacktraces reader: %w", err), bytesSamples, bytesSts + } + + defer stsReader.Release() + + if !stsReader.Next() { + return errors.New("arrow/ipc: could not read stacktraces from file"), bytesSamples, bytesSts + } + + if stsReader.Err() != nil { + return fmt.Errorf("err from stacktraces reader: %w", stsReader.Err()), bytesSamples, bytesSts + } + + stsRec := stsReader.Record() + stReader, err := newStacktraceReader(stsRec) + if err != nil { + return fmt.Errorf("err constructing stacktrace reader: %w", err), bytesSamples, bytesSts + } + stacktraceReaders = append(stacktraceReaders, stReader) + defer stsRec.Release() + + r.Skip(uint(lim.(*io.LimitedReader).N)) + + idsInStacktracesRecord, ok := stsRec.Column(0).(*array.Binary) + if !ok { + return fmt.Errorf("arrow/ipc: invalid column type in record (got=%T, want=*array.Binary)", stsRec.Column(0)), bytesSamples, bytesSts + } + + for j := 0; j < idsInStacktracesRecord.Len(); j++ { + if idsInStacktracesRecord.IsValid(j) { + hash, err := libpf.TraceHashFromBytes(idsInStacktracesRecord.Value(j)) + if err != nil { + return fmt.Errorf("err computing stacktrace ID: %w", err), bytesSamples, bytesSts + } + idToStacktrace[hash] = stacktraceCursor{i, j} + } + } + filtered, err := filterTraces(stacktraceIDs, stacktraceReaders, idToStacktrace, mem) + if err != nil { + return fmt.Errorf("err filtering traces: %w", err), bytesSamples, bytesSts + } + defer filtered.Release() + + buf.Reset() + w := ipc.NewWriter(buf, + ipc.WithSchema(filtered.Schema()), + ipc.WithAllocator(mem), + ) + + if err := w.Write(filtered); err != nil { + return fmt.Errorf("err writing stacktraces to buffer: %w", err), bytesSamples, bytesSts + } + if err := w.Close(); err != nil { + return fmt.Errorf("err closing ipc writer for stacktraces: %w", err), bytesSamples, bytesSts + } + + if err := client.Send(&profilestorepb.WriteRequest{ + Record: buf.Bytes(), + }); err != nil { + return fmt.Errorf("err making write request for stacktraces: %w", err), bytesSamples, bytesSts + } + + bytesSts += uint64(buf.Len()) + + if err := client.CloseSend(); err != nil { + return fmt.Errorf("err closing send channel: %w", err), bytesSamples, bytesSts + } + } + return nil, bytesSamples, bytesSts +} + +func OfflineModeDoUpload(f flags.Flags) (flags.ExitCode, error) { + mem := memory.DefaultAllocator + ctx := context.TODO() + reg := prometheus.NewRegistry() + tp := noop.NewTracerProvider() + log.SetLevel(log.TraceLevel) + grpcConn, err := f.RemoteStore.WaitGrpcEndpoint(ctx, reg, tp) + if err != nil { + return flags.ExitFailure, err + } + defer grpcConn.Close() + client := profilestoregrpc.NewProfileStoreServiceClient(grpcConn) + files, err := os.ReadDir(f.OfflineMode.StoragePath) + if err != nil { + return flags.ExitFailure, fmt.Errorf("failed to enumerate files in storage path: %w", err) + } + var buf bytes.Buffer + + var totalBytesSamples, totalBytesSts uint64 + var doneFiles uint + for _, file := range files { + var r readSkipper + fname := file.Name() + if !file.Type().IsRegular() { + log.Warnf("Directory or special file %s in storage path. Skipping", fname) + continue + } + if strings.HasSuffix(fname, reporter.DATA_FILE_COMPRESSED_EXTENSION) { + f, err := os.Open(filepath.Join(f.OfflineMode.StoragePath, fname)) + if err != nil { + log.Errorf("Failed to open file %s: %v. Skipping.", fname, err) + continue + } + s, err := zstd.NewReader(f) + if err != nil { + log.Errorf("Failed to decode zstd file %s: %v. Skipping.", fname, err) + continue + } + r = skippableZstdStream{s} + } else if strings.HasSuffix(fname, reporter.DATA_FILE_EXTENSION) { + f, err := os.Open(filepath.Join(f.OfflineMode.StoragePath, fname)) + if err != nil { + log.Errorf("Failed to open file %s: %v. Skipping.", fname, err) + continue + } + r = skippableFile{f} + } else { + log.Warnf("Unrecognized file %s. Skipping", fname) + continue + } + log.Infof("Uploading %s", fname) + err, bytesSamples, bytesSts := UploadLog(ctx, r, client, &buf, mem) + if err != nil { + return flags.ExitFailure, err + } + doneFiles += 1 + log.Infof("successfully uploaded %s. Bytes in samples: %d; in stacktraces: %d", fname, bytesSamples, bytesSts) + totalBytesSamples += bytesSamples + totalBytesSts += bytesSts + + err = os.Remove(filepath.Join(f.OfflineMode.StoragePath, fname)) + if err != nil { + log.Errorf("failed to remove file %s.", fname) + } + } + log.Infof("uploaded %d files. Total bytes in samples: %s; in stacktraces: %s\n", doneFiles, humanize.IBytes(totalBytesSamples), humanize.IBytes(totalBytesSts)) + return flags.ExitSuccess, nil +}