diff --git a/aggregator/README.md b/aggregator/README.md index 190560d..d4aa0ab 100644 --- a/aggregator/README.md +++ b/aggregator/README.md @@ -59,6 +59,8 @@ A lightweight Go server running scheduled jobs. There are two cronjobs it contro - the entrypoint for `compactor` is `compactor.go`. +1. When running the aggregator server, if you need a version of keycloak and promtail to test the log ingestion, there is containerized environment for them. From the podman directory, run `podman-compose up`. This will start our build of keycloak (see the image tag for specific build), postgres and promtail. Promtail is configured to connect to the local development setup of the aggregator, i.e you can run the aggregator as above with `make dev`. See podman/promtail-config.yaml if you want to test out different configurations, for example increasing the batch time or size. For the configuration used in our deployments, see [the values file's](../helm/promtail-aggregator/values.yaml) ssoScrapeConfigs section. + ## Database migration It uses `Alembic`, which is a lightweight database migration tool, to migrate database schema. The step to create a migration script is: diff --git a/aggregator/model/client_events.go b/aggregator/model/client_events.go index eec323c..79b9ce8 100644 --- a/aggregator/model/client_events.go +++ b/aggregator/model/client_events.go @@ -18,9 +18,9 @@ type ClientEvent struct { Date string `pg:",notnull"` } -func UpsertClientEvent(environment string, realmID string, clientID string, eventType string, date time.Time) error { - query := "INSERT INTO client_events (environment, realm_id, client_id, event_type, date, count) VALUES(?,?,?,?,?,1) ON CONFLICT (environment, realm_id, client_id, event_type, date) DO UPDATE SET count = client_events.count + 1" - _, err := pgdb.Query(nil, query, environment, realmID, clientID, eventType, date) +func UpsertClientEvent(environment string, realmID string, clientID string, eventType string, date time.Time, length int) error { + query := "INSERT INTO client_events (environment, realm_id, client_id, event_type, date, count) VALUES(?,?,?,?,?,?5) ON CONFLICT (environment, realm_id, client_id, event_type, date) DO UPDATE SET count = client_events.count + ?5;" + _, err := pgdb.Query(nil, query, environment, realmID, clientID, eventType, date, length) if err != nil { log.Println(err) return err diff --git a/aggregator/podman/podman-compose.yaml b/aggregator/podman/podman-compose.yaml new file mode 100644 index 0000000..e1566b5 --- /dev/null +++ b/aggregator/podman/podman-compose.yaml @@ -0,0 +1,62 @@ +version: '3.8' + +services: + keycloak: + container_name: keycloak + image: ghcr.io/bcgov/sso:24.0.6-build.2 + depends_on: + - postgres + ports: + - 9080:9080 + command: start-dev --http-port=9080 + environment: + DB_POSTGRESQL_SERVICE_HOST: postgres + DB_POSTGRESQL_SERVICE_PORT: 5433 + KC_DB: postgres + KC_DB_URL_DATABASE: keycloak + KC_DB_URL: jdbc:postgresql://postgres:5433/keycloak + KC_DB_USERNAME: keycloak + KC_DB_PASSWORD: keycloak + KEYCLOAK_ADMIN: admin + KEYCLOAK_ADMIN_PASSWORD: admin + KC_HOSTNAME_STRICT_HTTPS: 'false' + DB_MIN_POOL_SIZE: 1 + DB_MAX_POOL_SIZE: 5 + JGROUPS_CLUSTER_PASSWORD: password + JAVA_OPTS_APPEND: -Dkeycloak.profile.feature.impersonation=disabled -Djboss.socket.binding.port-offset=1000 -Djboss.persistent.log.dir=/var/log/eap -Djgroups.dns.query=sso-keycloak-ping.c6af30-test.svc.cluster.local + KC_HTTP_RELATIVE_PATH: /auth + DB_JNDI: java:jboss/datasources/KeycloakDS + DB_SERVICE_PREFIX_MAPPING: db-postgresql=DB + TX_DATABASE_PREFIX_MAPPING: db-postgresql=DB + TZ: America/Vancouver + volumes: + - logs_data:/var/log/eap + + postgres: + image: docker.io/postgres:13 + container_name: postgres + environment: + POSTGRES_DB: keycloak + POSTGRES_USER: keycloak + POSTGRES_PASSWORD: keycloak + tty: true + ports: + - 5433:5433 + command: -p 5433 + volumes: + - postgres_data:/var/lib/postgresql/data + + promtail: + image: docker.io/grafana/promtail + depends_on: keycloak + network_mode: host + volumes: + - logs_data:/keycloak/logs + - ./promtail-config.yml:/etc/promtail/promtail-config.yml + command: -config.file=/etc/promtail/promtail-config.yml + +volumes: + postgres_data: + driver: local + logs_data: + driver: local diff --git a/aggregator/podman/promtail-config.yml b/aggregator/podman/promtail-config.yml new file mode 100644 index 0000000..5d6ad84 --- /dev/null +++ b/aggregator/podman/promtail-config.yml @@ -0,0 +1,43 @@ +server: + http_listen_port: 9081 + log_level: debug + +positions: + filename: /var/log/promtail_positions.yaml + +clients: +- url: http://localhost:8080/api/promtail/push + # Can increase batch interval below to make log batches easier to test + # batchwait: 10s + +scrape_configs: +- job_name: local + static_configs: + - targets: + - localhost + labels: + job: keycloak + environment: dev + __path__: /keycloak/logs/*.log + pipeline_stages: + - match: + selector: '{job="keycloak"}' + stages: + - json: + expressions: + timestamp: '"@timestamp"' + level: level + - labels: + level: + - timestamp: + format: RFC3339Nano + source: timestamp + - match: + selector: '{job="keycloak"}' + stages: + - regex: + expression: type=(?P\S+),.*realmId=(?P\S+),.*clientId=(?P\S+), + - labels: + event_type: type + realm_id: + client_id: diff --git a/aggregator/promtail/index.go b/aggregator/promtail/index.go index 11a2adf..c1482fa 100644 --- a/aggregator/promtail/index.go +++ b/aggregator/promtail/index.go @@ -1,7 +1,6 @@ package promtail import ( - "log" "net/http" "time" @@ -27,6 +26,12 @@ func PromtailPushHandler(w http.ResponseWriter, r *http.Request) { loc := config.LoadTimeLocation() var lastErr error + + /* + Each unique label combination will have a dedicated stream. The logs associated to that label combination will be in stream.Entries. + e.g. if 4 LOGIN events for the same client/realm/env are sent, there will be 1 stream with 4 entries. + If multiple label combinations are sent in the same batch they will be in different streams. + */ for _, stream := range req.Streams { ls, err := promql_parser.ParseMetric(stream.Labels) if err != nil { @@ -55,18 +60,18 @@ func PromtailPushHandler(w http.ResponseWriter, r *http.Request) { } } - // only collect event logs but system logs + // only collect event logs, skip the system logs if eventType == "" { continue } + // For the aggregated count, timestamp is flattened to the date and the count of entries in this batch will be added to the total. for _, entry := range stream.Entries { year, month, day := entry.Timestamp.In(loc).Date() date = time.Date(year, month, day, 0, 0, 0, 0, loc) } - log.Println(environment, realmId, clientId, eventType, date) - go model.UpsertClientEvent(environment, realmId, clientId, eventType, date) + go model.UpsertClientEvent(environment, realmId, clientId, eventType, date, len(stream.Entries)) } if lastErr != nil {