Skip to content

Commit

Permalink
uploader now supports multiple fragments in flight at the same time
Browse files Browse the repository at this point in the history
  • Loading branch information
muir committed Nov 23, 2022
1 parent 28e5500 commit 289e773
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 167 deletions.
18 changes: 13 additions & 5 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@

# Just do it (build ready)

- improve performance of xopup

- replace locks with atomics

- sampling can be based on Boring() in which case the flags need to
change before the "traceresponse" is set. That means top logger
must know if base loggers honored the boring. Change xopbase.Boring
Expand Down Expand Up @@ -100,9 +96,13 @@
- Console (emphasis on readable, but still retains full data)
- xopup
- enforce memory limit
- enforce outstanding packet limit
- xopjson
- respect per-request buffer limit
- additional features:
- change time of timestamp key
Expand Down Expand Up @@ -130,6 +130,14 @@
- Performmance
- replace locks with atomics where possible
- replace call to time.Format in xopjson with custom code
- switch to a faster JSON encoder when encoding arbitrary data
- too much of xopup is single-threaded
- switch almost everything to atomics instead
- shard the uploader on request id
- mark all places in the code where an allocation happens `// allocate`
- Use sync.Pool aggressively to reduce allocations
Expand Down
119 changes: 119 additions & 0 deletions xopup/collect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package xopup

import (
"sync"

"github.com/xoplog/xop-go/xopat"
"github.com/xoplog/xop-go/xopbytes"
"github.com/xoplog/xop-go/xopproto"
)

type definitionComplete struct {
once sync.Once
}

func (r *Request) AttributeReferenced(*xopat.Attribute) error { return nil } // TODO

func (u *Uploader) DefineAttribute(a *xopat.Attribute) {
attributeKey := attributeKey{
key: a.Key(),
namespace: a.Namespace(),
}
// We keep a pool of unused definition complete structs
// so that when we get a hit, we're not creating another
// throw-away object in the heap
n := u.definitionsComplete.Get()
v, ok := u.attributesDefined.LoadOrStore(attributeKey, n)
if ok {
u.definitionsComplete.Put(n)
return
}

v.(*definitionComplete).once.Do(func() {
definition := xopproto.AttributeDefinition{
Key: a.Key(),
Description: a.Description(),
Namespace: a.Namespace(),
NamespaceSemver: a.SemverString(),
Type: xopproto.AttributeType(a.SubType()),
ShouldIndex: a.Indexed(),
Prominance: int32(a.Prominence()),
Locked: a.Locked(),
Distinct: a.Distinct(),
Multiple: a.Multiple(),
}
u.lock.Lock()
defer u.lock.Unlock()
fragment := u.getFragment()
fragment.AttributeDefinitions = append(fragment.AttributeDefinitions, &definition)
})
}

func (u *Uploader) DefineEnum(a *xopat.EnumAttribute, e xopat.Enum) {
enumKey := enumKey{
attributeKey: attributeKey{
key: a.Key(),
namespace: a.Namespace(),
},
value: e.Int64(),
}
// We keep a pool of unused definition complete structs
// so that when we get a hit, we're not creating another
// throw-away object in the heap
n := u.definitionsComplete.Get()
v, ok := u.enumsDefined.LoadOrStore(enumKey, n)
if ok {
u.definitionsComplete.Put(n)
return
}

v.(*definitionComplete).once.Do(func() {
enum := xopproto.EnumDefinition{
AttributeKey: a.Key(),
Namespace: a.Namespace(),
NamespaceSemver: a.SemverString(),
String_: e.String(),
IntValue: e.Int64(),
}
u.lock.Lock()
defer u.lock.Unlock()
fragment := u.getFragment()
fragment.EnumDefinitions = append(fragment.EnumDefinitions, &enum)
})
}

func (r *Request) Span(span xopbytes.Span, buffer xopbytes.Buffer) error {
bundle := span.GetBundle()
pbSpan := xopproto.Span{
SpanID: bundle.Trace.GetSpanID().Bytes(),
ParentID: bundle.Parent.GetSpanID().Bytes(),
JsonData: buffer.AsBytes(),
StartTime: span.GetStartTime().UnixNano(),
EndTime: pointerToInt64OrNil(span.GetEndTimeNano()),
}
if span.IsRequest() {
pbSpan.IsRequest = true
pbSpan.Baggage = bundle.Baggage.Bytes()
pbSpan.TraceState = bundle.State.Bytes()
}
r.uploader.lock.Lock()
defer r.uploader.lock.Unlock()
request, byteCount := r.uploader.getRequest(r, true)
request.Spans = append(request.Spans, &pbSpan)
return r.uploader.noteBytes(byteCount + sizeOfSpan + len(pbSpan.JsonData) + len(pbSpan.Baggage) + len(pbSpan.TraceState))
}

func (r *Request) Line(line xopbytes.Line) error {
pbLine := xopproto.Line{
SpanID: line.GetSpanID().Bytes(),
LogLevel: int32(line.GetLevel()),
Timestamp: line.GetTime().UnixNano(),
JsonData: line.AsBytes(),
}
r.uploader.lock.Lock()
defer r.uploader.lock.Unlock()
request, byteCount := r.uploader.getRequest(r, true)
r.lineCount++
request.Lines = append(request.Lines, &pbLine)
return r.uploader.noteBytes(byteCount + sizeOfLine + len(pbLine.JsonData))
}
24 changes: 24 additions & 0 deletions xopup/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package xopup

import (
"context"

"github.com/xoplog/xop-go/xopjson"
)

type UploadLogger struct {
*xopjson.Logger
Uploader *Uploader
}

func New(ctx context.Context, config Config) UploadLogger {
uploader := newUploader(ctx, config)
jsonLogger := xopjson.New(uploader,
xopjson.WithAttributesObject(true),
xopjson.WithSpanStarts(false),
)
return UploadLogger{
Uploader: uploader,
Logger: jsonLogger,
}
}
Loading

0 comments on commit 289e773

Please sign in to comment.