Skip to content

Commit

Permalink
Initial release
Browse files Browse the repository at this point in the history
  • Loading branch information
nitisht committed Feb 6, 2023
1 parent 5d4f89f commit 3ed1e22
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 726 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
vendor/
extensions/
*.env
*.zip
*.zip
metadata.json
862 changes: 201 additions & 661 deletions LICENSE

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ clean:
@rm -rf extensions layer*.zip

build: clean fmt vet
@CGO_ENABLED=0 go build -ldflags "-s -w" -o extensions/parseable-extension cmd/parseable-extension/main.go
@CGO_ENABLED=0 GOOS="linux" GOARCH=$(or $(GOARCH), "amd64") go build -ldflags "-s -w" -o extensions/parseable-extension cmd/parseable-extension/main.go

compress:
@upx -9 -q extensions/parseable-extension

package: build compress
package: build
@zip -9 -q -r layer-$(or $(GOARCH), "x86_64").zip extensions
@rm -rf extensions

Expand All @@ -24,10 +24,10 @@ image:

publish: package
@aws lambda publish-layer-version \
--layer-name parseable-extension \
--compatible-runtimes python3.6 python3.7 python3.8 \
--layer-name parseable-lambda-extension-$(or $(GOARCH), "x86_64")-v1-0 \
--compatible-runtimes provided provided.al2 nodejs16.x nodejs18.x ruby2.7 java11 java8 go1.x java8.al2 python3.7 python3.8 python3.9 \
--compatible-architectures $(or $(GOARCH), "x86_64") \
--description "Lambda function extension for logging to parseable" \
--description "Lambda function extension for logging to Parseable" \
--license-info "Apache-2.0" \
--zip-file fileb://layer-$(or $(GOARCH), "x86_64").zip \
--output json | tee metadata.json
Expand Down
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Parseable AWS Lambda extension

[![goreportcard](https://goreportcard.com/badge/github.com/parseablehq/aws-lambda-extension)](https://goreportcard.com/report/github.com/parseablehq/aws-lambda-extension)
[![godoc](https://img.shields.io/badge/godoc-reference-brightgreen.svg?style=flat)](https://godoc.org/github.com/parseablehq/aws-lambda-extension)
[![license](https://img.shields.io/github/license/parseablehq/aws-lambda-extension.svg)](https://raw.githubusercontent.com/parseablehq/aws-lambda-extension/master/LICENSE)

[Parseable](https://parseable.io/) is a lightweight, cloud native log observability engine. It can use either a local drive or S3 (and compatible stores) for backend data storage. Parseable is written in Rust and uses Apache Arrow and Parquet as underlying data structures. Parseable consumes up to ~80% lower memory and ~50% lower CPU than Elastic for similar ingestion throughput.

You can deploy Parseable on AWS, GCP, Azure, and on-premises. Refer the [documentation](https://parseable.io/docs) for more details.

## Usage

To use the parseable-lambda-extension with a lambda function, it must be configured as a layer. There are two variants of the extension available: one for `x86_64` architecture and one for `arm64` architecture.

You can add the extension as a layer with the AWS CLI tool:

```sh
$ aws lambda update-code-configuration \
--function-name MyAwesomeFunction
--layers "<layer version ARN>"
```

The extension's layer version ARN follows the pattern below.

```sh
# Layer Version ARN Pattern
arn:aws:lambda:<AWS_REGION>:724973952305:layer:parseable-lambda-extension-<ARCH>-<VERSION>:1
```

<AWS_REGION> - This must match the region of the Lambda function to which you are adding the extension.
<ARCH> - x86_64 or arm64.
<VERSION> - The version of the extension you want to use. Current version is v1.0. For current latest release `v1.0`, use the value `v1-0`.

### Configuration

The extension is configurable via environment variables set for your lambda function.

* **PARSEABLE_LOG_URL** - Parseable endpoint URL. It should be set to `https://<parseable-url>/api/v1/ingest`. Change `<parseable-url>` to your Parseable instance URL. (required)
* **PARSEABLE_USERNAME** - Username set for your Parseable instance. (required)
* **PARSEABLE_PASSWORD** - Password set for your Parseable instance. (required)
* **PARSEABLE_LOG_STREAM** - Parseable stream name where you want to ingest logs. (default: ``Lambda Function Name``).

Refer Parseable [installation documentation](https://www.parseable.io/docs/category/installation) for more details.

## Container image lambda

In case if you deploy your lambda as container image, to inject extension as part of your function just copy it to your image:

```Dockerfile
FROM parseable/aws-lambda-extension:latest AS parseable-extension
FROM public.ecr.aws/lambda/python:3.8
# Layer code
WORKDIR /opt
COPY --from=parseable-extension /opt/ .
# Function code
WORKDIR /var/task
COPY app.py .
CMD ["app.lambda_handler"]
```

More details you can find [here](https://aws.amazon.com/blogs/compute/working-with-lambda-layers-and-extensions-in-container-images/).
19 changes: 17 additions & 2 deletions cmd/parseable-extension/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 Cloudnatively Pvt. Ltd. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
Expand All @@ -12,6 +26,7 @@ import (
"time"

"github.com/parseablehq/aws-lambda-extension/pkg/extensionsclient"
"github.com/parseablehq/aws-lambda-extension/pkg/logsclient"
"github.com/parseablehq/aws-lambda-extension/pkg/parseableclient"
)

Expand Down Expand Up @@ -44,13 +59,13 @@ func main() {
"types": []string{"platform", "function"},
"buffering": map[string]uint{
"timeoutMs": 1000,
"maxBytes": 10485760,
"maxBytes": 1048576,
"maxItems": 10000,
},
})

for {
extensionsapiclient.Next(agentID.(string))
extensionsclient.Next(agentID.(string))
parseableclient.Send(functionName.(string), (<-queue).([]interface{}))
}
case <-time.After(9 * time.Second):
Expand Down
14 changes: 14 additions & 0 deletions pkg/extensionsclient/api.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 Cloudnatively Pvt. Ltd. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package extensionsclient

import (
Expand Down
14 changes: 14 additions & 0 deletions pkg/logsclient/subscribe.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 Cloudnatively Pvt. Ltd. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logsclient

import (
Expand Down
124 changes: 67 additions & 57 deletions pkg/parseableclient/client.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
// Copyright 2023 Cloudnatively Pvt. Ltd. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parseableclient

import (
"bytes"
"encoding/json"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
)

// Send builds logs batches and send to Parseable
func Send(functionName string, records []interface{}) {
url := GetEnv("PARSEABLE_LOG_URL", "")
username := GetEnv("PARSEABLE_USERNAME", "")
password := GetEnv("PARSEABLE_PASSWORD", "")
url := getEnv("PARSEABLE_LOG_URL", "")
stream := getEnv("PARSEABLE_LOG_STREAM", functionName)
username := getEnv("PARSEABLE_USERNAME", "")
password := getEnv("PARSEABLE_PASSWORD", "")

applicationName := GetEnv("PARSEABLE_APP_NAME", functionName)
subsystemName := GetEnv("PARSEABLE_SUB_SYSTEM", "logs")
applicationName := getEnv("PARSEABLE_APP_NAME", functionName)
logEntries := []map[string]interface{}{}

if url == "" {
Expand All @@ -32,78 +45,75 @@ func Send(functionName string, records []interface{}) {

if len(records) > 0 {
for _, record := range records {
var text string
record := record.(map[string]interface{})
timestamp, _ := time.Parse("2006-01-02T15:04:05.000Z", record["time"].(string))

switch v := record["record"].(type) {
case string:
text = string(v)
default:
jsonText, _ := json.Marshal(v)
text = string(jsonText)
}

logEntries = append(logEntries, map[string]interface{}{
"timestamp": timestamp.UnixNano() / 1000000,
"severity": GetSeverityLevel(text),
"text": text,
"category": record["type"],
})
logEntries = append(logEntries, record)
}
bulkLogs, err := json.Marshal(logEntries)
if err != nil {
log.Println("Cannot marshal log entry:", err)
}

logsBulk, _ := json.Marshal(map[string]interface{}{
"applicationName": applicationName,
"subsystemName": subsystemName,
"logEntries": logEntries,
})

client := &http.Client{}
request, _ := http.NewRequest("POST", url, bytes.NewBuffer(logsBulk))
request, _ := http.NewRequest("POST", url, bytes.NewBuffer(bulkLogs))
request.SetBasicAuth(username, password)
request.Close = true
request.Header.Set("Content-Type", "application/json")
request.Header.Set("x-p-meta-application", applicationName)
request.Header.Set("x-p-stream", stream)
request.Close = true

response, err := client.Do(request)
if err != nil {
log.Println("Cannot send logs to Parseable:", err)
} else {
defer response.Body.Close()
if response.StatusCode != 200 {
log.Println("Parseable API failed with code:", response.StatusCode)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Println("Failed to read Parseable API response:", err)
}
log.Printf("Parseable API failed with code: %d and message: %s", response.StatusCode, string(body))
}
}
}
}

// GetEnv extract environment variable or default value
func GetEnv(key string, fallback string) string {
// getEnv extract environment variable or default value
func getEnv(key string, fallback string) string {
value, exists := os.LookupEnv(key)
if !exists {
return fallback
}
return value
}

// GetSeverityLevel extract serverity from log message
func GetSeverityLevel(message string) int {
var severity int

message = strings.ToLower(message)

switch {
case strings.Contains(message, "debug"):
severity = 1
case strings.Contains(message, "verbose"), strings.Contains(message, "trace"):
severity = 2
case strings.Contains(message, "warning"), strings.Contains(message, "warn"):
severity = 4
case strings.Contains(message, "error"), strings.Contains(message, "exception"):
severity = 5
case strings.Contains(message, "fatal"), strings.Contains(message, "critical"):
severity = 6
default:
severity = 3
}
// getSeverityLevel extract severity from log message
// func getSeverityLevel(record map[string]interface{}) int {
// var message string
// switch v := record["record"].(type) {
// case string:
// message = string(v)
// default:
// jsonText, _ := json.Marshal(v)
// message = string(jsonText)
// }

return severity
}
// var severity int
// message = strings.ToLower(message)

// switch {
// case strings.Contains(message, "debug"):
// severity = 1
// case strings.Contains(message, "verbose"), strings.Contains(message, "trace"):
// severity = 2
// case strings.Contains(message, "warning"), strings.Contains(message, "warn"):
// severity = 4
// case strings.Contains(message, "error"), strings.Contains(message, "exception"):
// severity = 5
// case strings.Contains(message, "fatal"), strings.Contains(message, "critical"):
// severity = 6
// default:
// severity = 3
// }

// return severity
// }

0 comments on commit 3ed1e22

Please sign in to comment.