From f9e9de7fa62215a03717c7daee58aae8eff7ddc5 Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Thu, 27 Jul 2023 17:49:46 +0300 Subject: [PATCH] feat(faker): add basic request issuer --- internal/faker/faker.go | 9 +- internal/faker/model.go | 215 +++++++++++++++++++++++++++++------ internal/faker/model_test.go | 60 +++++++++- 3 files changed, 247 insertions(+), 37 deletions(-) diff --git a/internal/faker/faker.go b/internal/faker/faker.go index d56b2164..7caf725a 100644 --- a/internal/faker/faker.go +++ b/internal/faker/faker.go @@ -1,7 +1,11 @@ // Package faker implement a fake telemetry generator. package faker -import "math/rand" +import ( + "math/rand" + + tracesdk "go.opentelemetry.io/otel/sdk/trace" +) // Config models a single cluster of multiple nodes, where services are // deployed on each node. @@ -17,7 +21,8 @@ type Config struct { // Services configuration. Services Services `json:"services" yaml:"services"` // Random number generator. - Rand *rand.Rand + Rand *rand.Rand + TracerProvider *tracesdk.TracerProvider } // Services wraps all services configuration, describing topology of the diff --git a/internal/faker/model.go b/internal/faker/model.go index 669b0755..f9b90095 100644 --- a/internal/faker/model.go +++ b/internal/faker/model.go @@ -1,9 +1,14 @@ package faker import ( + "context" "math/rand" "net/netip" "strconv" + + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "go.opentelemetry.io/otel/trace" ) type cluster struct { @@ -11,6 +16,12 @@ type cluster struct { servers []server } +func (c *cluster) Attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + semconv.K8SClusterName(c.name), + } +} + func (c *cluster) getRandomServer(source *rand.Rand) int { return source.Intn(len(c.servers)) } @@ -20,6 +31,7 @@ func (c *cluster) addServer(s server) { } type service interface { + routerHandler Name() string } @@ -30,6 +42,13 @@ type server struct { services []service } +func (s *server) Attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + semconv.NetHostName(s.name), + attribute.Int("faker.server.id", s.id), + } +} + func (s *server) addService(service service) { s.services = append(s.services, service) } @@ -38,62 +57,182 @@ type model struct { rps int cluster cluster frontends []frontendService + router Router rand *rand.Rand + tp trace.TracerProvider + tracer trace.Tracer +} + +// Request of the client. +type Request struct { + TraceID trace.TraceID + ParentID trace.SpanID +} + +func (m *model) IssueRequest() { + ctx, span := m.tracer.Start(context.Background(), "request") + defer span.End() + m.router.API(ctx) } type apiService struct { - id int - ip netip.Addr - port int + router Router + tracer trace.Tracer + id int + ip netip.Addr + port int +} + +func (s apiService) Handle(ctx context.Context) { + ctx, span := s.tracer.Start(ctx, "request", trace.WithAttributes(s.Attributes()...)) + defer span.End() + s.router.Backend(ctx) +} + +func (s apiService) Attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + semconv.ServiceName(s.Name()), + } } func (s apiService) Name() string { return "api" } type dbService struct { - id int - ip netip.Addr - port int + router Router + tracer trace.Tracer + id int + ip netip.Addr + port int +} + +func (s dbService) Handle(ctx context.Context) { + ctx, span := s.tracer.Start(ctx, "request") + defer span.End() + _ = ctx } func (s dbService) Name() string { return "db" } type cacheService struct { - id int - ip netip.Addr - port int + router Router + tracer trace.Tracer + id int + ip netip.Addr + port int +} + +func (s cacheService) Handle(ctx context.Context) { + ctx, span := s.tracer.Start(ctx, "request") + defer span.End() + _ = ctx } func (s cacheService) Name() string { return "cache" } type backendService struct { - id int - ip netip.Addr - port int + id int + ip netip.Addr + port int + router Router + tracer trace.Tracer +} + +func (s backendService) Handle(ctx context.Context) { + ctx, span := s.tracer.Start(ctx, "request") + defer span.End() + s.router.Cache(ctx) + s.router.DB(ctx) + s.router.Cache(ctx) } func (s backendService) Name() string { return "backend" } type frontendService struct { - id int - ip netip.Addr + router Router + tracer trace.Tracer + id int + ip netip.Addr } func (s frontendService) Name() string { return "frontend" } +func (s frontendService) Handle(ctx context.Context) { + s.router.API(ctx) +} + +// Router routes request to services. +type Router interface { + Frontend(ctx context.Context) + API(ctx context.Context) + Backend(ctx context.Context) + Cache(ctx context.Context) + DB(ctx context.Context) +} + +type routerHandler interface { + Handle(ctx context.Context) +} + +type clusterRouter struct { + random *rand.Rand + routes map[string][]routerHandler +} + +func (r *clusterRouter) addRoute(name string, handler routerHandler) { + r.routes[name] = append(r.routes[name], handler) +} + +func (r *clusterRouter) handle(ctx context.Context, name string) { + routes := r.routes[name] + // Pick random. + routes[r.random.Intn(len(routes))].Handle(ctx) +} + +func (r *clusterRouter) API(ctx context.Context) { + r.handle(ctx, "api") +} + +func (r *clusterRouter) Backend(ctx context.Context) { + r.handle(ctx, "backend") +} + +func (r *clusterRouter) Cache(ctx context.Context) { + r.handle(ctx, "cache") +} + +func (r *clusterRouter) DB(ctx context.Context) { + r.handle(ctx, "db") +} + +func (r *clusterRouter) Frontend(ctx context.Context) { + r.handle(ctx, "frontend") +} + func modelFromConfig(c Config) model { + router := &clusterRouter{ + routes: map[string][]routerHandler{}, + random: c.Rand, + } m := model{ - rps: c.RPS, - rand: c.Rand, + rps: c.RPS, + rand: c.Rand, + router: router, + tracer: c.TracerProvider.Tracer("faker"), + tp: c.TracerProvider, } m.cluster.name = "msk1" // Generate clients. residentialPool := newIPAllocator(netip.MustParseAddr("95.24.0.0")) for i := 0; i < c.Services.Frontend.Replicas; i++ { - m.frontends = append(m.frontends, frontendService{ - id: i, - ip: residentialPool.Next(), - }) + f := frontendService{ + router: router, + id: i, + ip: residentialPool.Next(), + tracer: c.TracerProvider.Tracer("client"), + } + m.frontends = append(m.frontends, f) + router.addRoute("frontend", f) } // Pool of external IP addresses. @@ -112,11 +251,14 @@ func modelFromConfig(c Config) model { j := m.cluster.getRandomServer(m.rand) // Using note IP address as being exposed on node 80 port. s := apiService{ - id: i, - ip: m.cluster.servers[j].ip, - port: 80, + router: router, + tracer: c.TracerProvider.Tracer("api"), + id: i, + ip: m.cluster.servers[j].ip, + port: 80, } m.cluster.servers[j].addService(s) + router.addRoute("api", s) } // Distribute internal services. @@ -125,37 +267,46 @@ func modelFromConfig(c Config) model { // Distribute Backend. for i := 0; i < c.Services.Backend.Replicas; i++ { s := backendService{ - id: i, - ip: pool.Next(), - port: 8080, + router: router, + tracer: c.TracerProvider.Tracer("backend"), + id: i, + ip: pool.Next(), + port: 8080, } // Select random node. j := m.cluster.getRandomServer(m.rand) m.cluster.servers[j].addService(s) + router.addRoute("backend", s) } // Distribute DB. for i := 0; i < c.Services.DB.Replicas; i++ { s := dbService{ - id: i, - ip: pool.Next(), - port: 5432, + router: router, + tracer: c.TracerProvider.Tracer("db"), + id: i, + ip: pool.Next(), + port: 5432, } // Select random node. j := m.cluster.getRandomServer(m.rand) m.cluster.servers[j].addService(s) + router.addRoute("db", s) } // Distribute Cache. for i := 0; i < c.Services.Cache.Replicas; i++ { s := cacheService{ - id: i, - ip: pool.Next(), - port: 6379, + router: router, + tracer: c.TracerProvider.Tracer("cache"), + id: i, + ip: pool.Next(), + port: 6379, } // Select random node. j := m.cluster.getRandomServer(m.rand) m.cluster.servers[j].addService(s) + router.addRoute("cache", s) } return m diff --git a/internal/faker/model_test.go b/internal/faker/model_test.go index 58d995fd..b2bc07ed 100644 --- a/internal/faker/model_test.go +++ b/internal/faker/model_test.go @@ -1,17 +1,60 @@ package faker import ( + "context" "math/rand" + "sync" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" ) +type randomIDGenerator struct { + sync.Mutex + rand *rand.Rand +} + +// NewSpanID returns a non-zero span ID from a randomly-chosen sequence. +func (gen *randomIDGenerator) NewSpanID(_ context.Context, _ trace.TraceID) (sid trace.SpanID) { + gen.Lock() + defer gen.Unlock() + gen.rand.Read(sid[:]) + return sid +} + +// NewIDs returns a non-zero trace ID and a non-zero span ID from a +// randomly-chosen sequence. +func (gen *randomIDGenerator) NewIDs(_ context.Context) (tid trace.TraceID, sid trace.SpanID) { + gen.Lock() + defer gen.Unlock() + gen.rand.Read(tid[:]) + gen.rand.Read(sid[:]) + return tid, sid +} + func TestModel(t *testing.T) { + // Initialize test tracer. + exporter := tracetest.NewInMemoryExporter() + randSource := rand.NewSource(42) + randInstance := rand.New(randSource) + tp := tracesdk.NewTracerProvider( + // Using deterministic random ids. + tracesdk.WithIDGenerator(&randomIDGenerator{ + rand: randInstance, + }), + tracesdk.WithBatcher(exporter, + tracesdk.WithBatchTimeout(0), // instant + ), + ) m := modelFromConfig(Config{ - Rand: rand.New(rand.NewSource(42)), - Nodes: 10, - RPS: 1000, + Rand: randInstance, + Nodes: 10, + RPS: 1000, + TracerProvider: tp, Services: Services{ API: API{ Replicas: 2, @@ -39,4 +82,15 @@ func TestModel(t *testing.T) { services += len(s.services) } assert.Equal(t, 9, services) + t.Run("Request", func(t *testing.T) { + // Issue request. + m.IssueRequest() + t.Run("Spans", func(t *testing.T) { + // Force flushing. + require.NoError(t, tp.ForceFlush(context.Background())) + spans := exporter.GetSpans() + require.NotEmpty(t, spans) + require.Len(t, spans, 6) + }) + }) }