Skip to content

Commit

Permalink
fix: log aggregation
Browse files Browse the repository at this point in the history
fix missing batch size on high load logs
  • Loading branch information
jlangy committed Sep 6, 2024
1 parent 5e96da6 commit e06eb20
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 7 deletions.
2 changes: 2 additions & 0 deletions aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ A lightweight Go server running scheduled jobs. There are two cronjobs it contro

- the entrypoint for `compactor` is `compactor.go`.

1. When running the aggregator server, if you need a version of keycloak and promtail to test the log ingestion, there is containerized environment for them. From the podman directory, run `podman-compose up`. This will start our build of keycloak (see the image tag for specific build), postgres and promtail. Promtail is configured to connect to the local development setup of the aggregator, i.e you can run the aggregator as above with `make dev`. See podman/promtail-config.yaml if you want to test out different configurations, for example increasing the batch time or size. For the configuration used in our deployments, see [the values file's](../helm/promtail-aggregator/values.yaml) ssoScrapeConfigs section.
## Database migration
It uses `Alembic`, which is a lightweight database migration tool, to migrate database schema. The step to create a migration script is:
Expand Down
6 changes: 3 additions & 3 deletions aggregator/model/client_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type ClientEvent struct {
Date string `pg:",notnull"`
}

func UpsertClientEvent(environment string, realmID string, clientID string, eventType string, date time.Time) error {
query := "INSERT INTO client_events (environment, realm_id, client_id, event_type, date, count) VALUES(?,?,?,?,?,1) ON CONFLICT (environment, realm_id, client_id, event_type, date) DO UPDATE SET count = client_events.count + 1"
_, err := pgdb.Query(nil, query, environment, realmID, clientID, eventType, date)
func UpsertClientEvent(environment string, realmID string, clientID string, eventType string, date time.Time, length int) error {
query := "INSERT INTO client_events (environment, realm_id, client_id, event_type, date, count) VALUES(?,?,?,?,?,?5) ON CONFLICT (environment, realm_id, client_id, event_type, date) DO UPDATE SET count = client_events.count + ?5;"
_, err := pgdb.Query(nil, query, environment, realmID, clientID, eventType, date, length)
if err != nil {
log.Println(err)
return err
Expand Down
62 changes: 62 additions & 0 deletions aggregator/podman/podman-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
version: '3.8'

services:
keycloak:
container_name: keycloak
image: ghcr.io/bcgov/sso:24.0.6-build.2
depends_on:
- postgres
ports:
- 9080:9080
command: start-dev --http-port=9080
environment:
DB_POSTGRESQL_SERVICE_HOST: postgres
DB_POSTGRESQL_SERVICE_PORT: 5433
KC_DB: postgres
KC_DB_URL_DATABASE: keycloak
KC_DB_URL: jdbc:postgresql://postgres:5433/keycloak
KC_DB_USERNAME: keycloak
KC_DB_PASSWORD: keycloak
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: admin
KC_HOSTNAME_STRICT_HTTPS: 'false'
DB_MIN_POOL_SIZE: 1
DB_MAX_POOL_SIZE: 5
JGROUPS_CLUSTER_PASSWORD: password
JAVA_OPTS_APPEND: -Dkeycloak.profile.feature.impersonation=disabled -Djboss.socket.binding.port-offset=1000 -Djboss.persistent.log.dir=/var/log/eap -Djgroups.dns.query=sso-keycloak-ping.c6af30-test.svc.cluster.local
KC_HTTP_RELATIVE_PATH: /auth
DB_JNDI: java:jboss/datasources/KeycloakDS
DB_SERVICE_PREFIX_MAPPING: db-postgresql=DB
TX_DATABASE_PREFIX_MAPPING: db-postgresql=DB
TZ: America/Vancouver
volumes:
- logs_data:/var/log/eap

postgres:
image: docker.io/postgres:13
container_name: postgres
environment:
POSTGRES_DB: keycloak
POSTGRES_USER: keycloak
POSTGRES_PASSWORD: keycloak
tty: true
ports:
- 5433:5433
command: -p 5433
volumes:
- postgres_data:/var/lib/postgresql/data

promtail:
image: docker.io/grafana/promtail
depends_on: keycloak
network_mode: host
volumes:
- logs_data:/keycloak/logs
- ./promtail-config.yml:/etc/promtail/promtail-config.yml
command: -config.file=/etc/promtail/promtail-config.yml

volumes:
postgres_data:
driver: local
logs_data:
driver: local
43 changes: 43 additions & 0 deletions aggregator/podman/promtail-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
server:
http_listen_port: 9081
log_level: debug

positions:
filename: /var/log/promtail_positions.yaml

clients:
- url: http://localhost:8080/api/promtail/push
# Can increase batch interval below to make log batches easier to test
# batchwait: 10s

scrape_configs:
- job_name: local
static_configs:
- targets:
- localhost
labels:
job: keycloak
environment: dev
__path__: /keycloak/logs/*.log
pipeline_stages:
- match:
selector: '{job="keycloak"}'
stages:
- json:
expressions:
timestamp: '"@timestamp"'
level: level
- labels:
level:
- timestamp:
format: RFC3339Nano
source: timestamp
- match:
selector: '{job="keycloak"}'
stages:
- regex:
expression: type=(?P<type>\S+),.*realmId=(?P<realm_id>\S+),.*clientId=(?P<client_id>\S+),
- labels:
event_type: type
realm_id:
client_id:
13 changes: 9 additions & 4 deletions aggregator/promtail/index.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package promtail

import (
"log"
"net/http"
"time"

Expand All @@ -27,6 +26,12 @@ func PromtailPushHandler(w http.ResponseWriter, r *http.Request) {
loc := config.LoadTimeLocation()

var lastErr error

/*
Each unique label combination will have a dedicated stream. The logs associated to that label combination will be in stream.Entries.
e.g. if 4 LOGIN events for the same client/realm/env are sent, there will be 1 stream with 4 entries.
If multiple label combinations are sent in the same batch they will be in different streams.
*/
for _, stream := range req.Streams {
ls, err := promql_parser.ParseMetric(stream.Labels)
if err != nil {
Expand Down Expand Up @@ -55,18 +60,18 @@ func PromtailPushHandler(w http.ResponseWriter, r *http.Request) {
}
}

// only collect event logs but system logs
// only collect event logs, skip the system logs
if eventType == "" {
continue
}

// For the aggregated count, timestamp is flattened to the date and the count of entries in this batch will be added to the total.
for _, entry := range stream.Entries {
year, month, day := entry.Timestamp.In(loc).Date()
date = time.Date(year, month, day, 0, 0, 0, 0, loc)
}

log.Println(environment, realmId, clientId, eventType, date)
go model.UpsertClientEvent(environment, realmId, clientId, eventType, date)
go model.UpsertClientEvent(environment, realmId, clientId, eventType, date, len(stream.Entries))
}

if lastErr != nil {
Expand Down

0 comments on commit e06eb20

Please sign in to comment.