From c3e56febf166c41c6055d40b8c77bef4c76e3690 Mon Sep 17 00:00:00 2001 From: Ben Sully Date: Wed, 30 Aug 2023 15:51:33 +0100 Subject: [PATCH] Add vector service concept, and resource endpoint to handle vector search This cannibalizes some of #23 but focusses on the read path: searching a configured vector store using the configured embedding engine. --- docker-compose.yaml | 5 +- pkg/main.go | 1 + pkg/plugin/app.go | 37 ++++----- pkg/plugin/resources.go | 29 +++++++ pkg/plugin/settings.go | 32 ++++++++ pkg/plugin/vector/embed/embedder.go | 28 +++++++ pkg/plugin/vector/embed/openai.go | 93 +++++++++++++++++++++++ pkg/plugin/vector/service.go | 70 +++++++++++++++++ pkg/plugin/vector/store/store.go | 51 +++++++++++++ pkg/plugin/vector/store/vectorapi.go | 89 ++++++++++++++++++++++ provisioning/plugins/grafana-llm-app.yaml | 17 +++++ src/plugin.json | 18 +++++ 12 files changed, 446 insertions(+), 24 deletions(-) create mode 100644 pkg/plugin/settings.go create mode 100644 pkg/plugin/vector/embed/embedder.go create mode 100644 pkg/plugin/vector/embed/openai.go create mode 100644 pkg/plugin/vector/service.go create mode 100644 pkg/plugin/vector/store/store.go create mode 100644 pkg/plugin/vector/store/vectorapi.go create mode 100644 provisioning/plugins/grafana-llm-app.yaml diff --git a/docker-compose.yaml b/docker-compose.yaml index f78524e9..804e10f5 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -6,7 +6,10 @@ services: build: context: ./.config args: - grafana_version: ${GRAFANA_VERSION:-9.5.2} + grafana_version: ${GRAFANA_VERSION:-10.1.0} + environment: + GF_FEATURE_TOGGLES_ENABLE: 'externalServiceAuth' + OPENAI_API_KEY: $OPENAI_API_KEY ports: - 3000:3000/tcp volumes: diff --git a/pkg/main.go b/pkg/main.go index f739ebb9..8298d071 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -9,6 +9,7 @@ import ( ) func main() { + log.DefaultLogger.Info("Starting plugin process") // Start listening to requests sent from Grafana. This call is blocking so // it won't finish until Grafana shuts down the process or the plugin choose // to exit by itself using os.Exit. Manage automatically manages life cycle diff --git a/pkg/plugin/app.go b/pkg/plugin/app.go index 4bc9baf0..1a2c977d 100644 --- a/pkg/plugin/app.go +++ b/pkg/plugin/app.go @@ -2,12 +2,13 @@ package plugin import ( "context" - "encoding/json" "net/http" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" + "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter" + "github.com/grafana/llm/pkg/plugin/vector" ) // Make sure App implements required interfaces. This is important to do @@ -21,32 +22,16 @@ var ( _ backend.StreamHandler = (*App)(nil) ) -const openAIKey = "openAIKey" - -type Settings struct { - OpenAIURL string `json:"openAIUrl"` - OpenAIOrganizationID string `json:"openAIOrganizationId"` - - openAIKey string -} - -func loadSettings(appSettings backend.AppInstanceSettings) Settings { - settings := Settings{ - OpenAIURL: "https://api.openai.com", - } - _ = json.Unmarshal(appSettings.JSONData, &settings) - - settings.openAIKey = appSettings.DecryptedSecureJSONData[openAIKey] - return settings -} - // App is an example app backend plugin which can respond to data queries. type App struct { backend.CallResourceHandler + + vectorService vector.Service } // NewApp creates a new example *App instance. func NewApp(appSettings backend.AppInstanceSettings) (instancemgmt.Instance, error) { + log.DefaultLogger.Info("Creating new app instance") var app App // Use a httpadapter (provided by the SDK) for resource calls. This allows us @@ -56,17 +41,23 @@ func NewApp(appSettings backend.AppInstanceSettings) (instancemgmt.Instance, err app.registerRoutes(mux) app.CallResourceHandler = httpadapter.New(mux) + settings := loadSettings(appSettings) + var err error + app.vectorService, err = vector.NewService(settings.EmbeddingSettings, settings.VectorStoreSettings) + if err != nil { + return nil, err + } + return &app, nil } // Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance // created. -func (a *App) Dispose() { - // cleanup -} +func (a *App) Dispose() {} // CheckHealth handles health checks sent from Grafana to the plugin. func (a *App) CheckHealth(_ context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + log.DefaultLogger.Info("check health") return &backend.CheckHealthResult{ Status: backend.HealthStatusOk, Message: "ok", diff --git a/pkg/plugin/resources.go b/pkg/plugin/resources.go index 81bd285b..116b25bd 100644 --- a/pkg/plugin/resources.go +++ b/pkg/plugin/resources.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter" + "github.com/grafana/llm/pkg/plugin/vector/store" ) // /api/plugins/app-with-backend/resources/ping @@ -62,9 +63,37 @@ func newOpenAIProxy() http.Handler { } } +type vectorSearchRequest struct { + Text string `json:"text"` + Collection string `json:"collection"` +} + +type vectorSearchResponse struct { + Results []store.SearchResult `json:"results"` +} + +func (app *App) handleVectorSearch(w http.ResponseWriter, req *http.Request) { + body := vectorSearchRequest{} + if err := json.NewDecoder(req.Body).Decode(&body); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + results, err := app.vectorService.Search(req.Context(), body.Collection, body.Text) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + resp := vectorSearchResponse{Results: results} + bodyJSON, err := json.Marshal(resp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + w.Write(bodyJSON) +} + // registerRoutes takes a *http.ServeMux and registers some HTTP handlers. func (a *App) registerRoutes(mux *http.ServeMux) { mux.HandleFunc("/ping", a.handlePing) mux.HandleFunc("/echo", a.handleEcho) mux.Handle("/openai/", newOpenAIProxy()) + mux.HandleFunc("/vector/search", a.handleVectorSearch) } diff --git a/pkg/plugin/settings.go b/pkg/plugin/settings.go new file mode 100644 index 00000000..ab3ca00f --- /dev/null +++ b/pkg/plugin/settings.go @@ -0,0 +1,32 @@ +package plugin + +import ( + "encoding/json" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/llm/pkg/plugin/vector/embed" + "github.com/grafana/llm/pkg/plugin/vector/store" +) + +const openAIKey = "openAIKey" + +type Settings struct { + OpenAIURL string `json:"openAIUrl"` + OpenAIOrganizationID string `json:"openAIOrganizationId"` + + openAIKey string + + EmbeddingSettings embed.Settings `json:"embeddings"` + VectorStoreSettings store.Settings `json:"vectorStore"` +} + +func loadSettings(appSettings backend.AppInstanceSettings) Settings { + settings := Settings{ + OpenAIURL: "https://api.openai.com", + } + _ = json.Unmarshal(appSettings.JSONData, &settings) + + settings.openAIKey = appSettings.DecryptedSecureJSONData[openAIKey] + settings.EmbeddingSettings.OpenAI.APIKey = settings.openAIKey + return settings +} diff --git a/pkg/plugin/vector/embed/embedder.go b/pkg/plugin/vector/embed/embedder.go new file mode 100644 index 00000000..4e8fa159 --- /dev/null +++ b/pkg/plugin/vector/embed/embedder.go @@ -0,0 +1,28 @@ +package embed + +import "context" + +type EmbedderType string + +const ( + EmbedderOpenAI EmbedderType = "openai" +) + +type Embedder interface { + Embed(ctx context.Context, model string, text string) ([]float32, error) +} + +type Settings struct { + Type string `json:"type"` + + OpenAI openAISettings `json:"openai"` +} + +// NewEmbedder creates a new embedder. +func NewEmbedder(s Settings) (Embedder, error) { + switch EmbedderType(s.Type) { + case EmbedderOpenAI: + return newOpenAIEmbedder(s.OpenAI), nil + } + return nil, nil +} diff --git a/pkg/plugin/vector/embed/openai.go b/pkg/plugin/vector/embed/openai.go new file mode 100644 index 00000000..c0c4344d --- /dev/null +++ b/pkg/plugin/vector/embed/openai.go @@ -0,0 +1,93 @@ +package embed + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/grafana/grafana-plugin-sdk-go/backend/log" +) + +type openAISettings struct { + URL string `json:"url"` + APIKey string `json:"apiKey"` +} + +type openAILLMClient struct { + client *http.Client + url string + apiKey string +} + +type openAIEmbeddingsRequest struct { + Model string `json:"model"` + Input string `json:"input"` +} + +type openAIEmbeddingsResponse struct { + Data []openAIEmbeddingData `json:"data"` +} + +type openAIEmbeddingData struct { + Embedding []float32 `json:"embedding"` +} + +func (o *openAILLMClient) Embed(ctx context.Context, model string, payload string) ([]float32, error) { + // TODO: ensure payload is under 8191 tokens, somehow. + url := o.url + if url == "" { + url = "https://api.openai.com" + } + url = strings.TrimSuffix(url, "/") + url = url + "/v1/embeddings" + reqBody := openAIEmbeddingsRequest{ + Model: model, + Input: payload, + } + bodyJSON, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bodyJSON)) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+o.apiKey) + resp, err := o.client.Do(req) + if err != nil { + return nil, fmt.Errorf("make request: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + log.DefaultLogger.Warn("failed to close response body", "err", err) + } + }() + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("got non-2xx status from OpenAI: %s", resp.Status) + } + respBody, err := io.ReadAll(io.LimitReader(resp.Body, 1024*1024*2)) + if err != nil { + return nil, fmt.Errorf("read response body: %w", err) + } + var body openAIEmbeddingsResponse + err = json.Unmarshal(respBody, &body) + if err != nil { + return nil, fmt.Errorf("unmarshal response body: %w", err) + } + return body.Data[0].Embedding, nil +} + +// newOpenAIEmbedder creates a new Embedder using OpenAI's API. +func newOpenAIEmbedder(settings openAISettings) Embedder { + impl := openAILLMClient{ + client: &http.Client{}, + url: settings.URL, + apiKey: settings.APIKey, + } + return &impl +} diff --git a/pkg/plugin/vector/service.go b/pkg/plugin/vector/service.go new file mode 100644 index 00000000..ac8b1d4e --- /dev/null +++ b/pkg/plugin/vector/service.go @@ -0,0 +1,70 @@ +// package vector provides a service for searching vector embeddings. +// It combines the embedding engine and the vector store. +package vector + +import ( + "context" + "fmt" + + "github.com/grafana/llm/pkg/plugin/vector/embed" + "github.com/grafana/llm/pkg/plugin/vector/store" +) + +type Collection struct { + Name string `json:"name"` + Dimension int `json:"dimension"` + Model string `json:"model"` +} + +type Service interface { + Search(ctx context.Context, collection string, query string) ([]store.SearchResult, error) +} + +type vectorService struct { + embedder embed.Embedder + store store.ReadVectorStore + collectionConfig map[string]Collection +} + +func NewService(embedSettings embed.Settings, storeSettings store.Settings) (Service, error) { + em, err := embed.NewEmbedder(embedSettings) + if err != nil { + return nil, fmt.Errorf("new embedder: %w", err) + } + if em == nil { + return nil, nil + } + st, err := store.NewReadVectorStore(storeSettings) + if err != nil { + return nil, fmt.Errorf("new vector store: %w", err) + } + if st == nil { + return nil, nil + } + return &vectorService{ + embedder: em, + store: st, + }, nil +} + +func (g vectorService) Search(ctx context.Context, collection string, query string) ([]store.SearchResult, error) { + // Determine which model was used to embed this collection. + c := g.collectionConfig[collection] + if c.Name == "" { + return nil, fmt.Errorf("unknown collection %s", collection) + } + + // Get the embedding for the search query. + e, err := g.embedder.Embed(ctx, c.Model, query) + if err != nil { + return nil, fmt.Errorf("embed query: %w", err) + } + + // Search the vector store for similar vectors. + results, err := g.store.Search(ctx, collection, e, 10) + if err != nil { + return nil, fmt.Errorf("vector store search: %w", err) + } + + return results, nil +} diff --git a/pkg/plugin/vector/store/store.go b/pkg/plugin/vector/store/store.go new file mode 100644 index 00000000..4847a657 --- /dev/null +++ b/pkg/plugin/vector/store/store.go @@ -0,0 +1,51 @@ +package store + +import "context" + +type VectorStoreType string + +const ( + VectorStoreTypeGrafanaVectorAPI VectorStoreType = "grafana/vectorapi" +) + +type SearchResult struct { + Payload map[string]any `json:"payload"` + Score float64 `json:"score"` +} + +type ReadVectorStore interface { + Collections(ctx context.Context) ([]string, error) + Search(ctx context.Context, collection string, vector []float32, limit uint64) ([]SearchResult, error) +} + +type WriteVectorStore interface { + Collections(ctx context.Context) ([]string, error) + CollectionExists(ctx context.Context, collection string) (bool, error) + CreateCollection(ctx context.Context, collection string, size uint64) error + PointExists(ctx context.Context, collection string, id uint64) (bool, error) + UpsertColumnar(ctx context.Context, collection string, ids []uint64, embeddings [][]float32, payloadJSONs []string) error +} + +type VectorStore interface { + ReadVectorStore + WriteVectorStore +} + +type Settings struct { + Type string `json:"type"` + + GrafanaVectorAPI grafanaVectorAPISettings `json:"grafanaVectorAPI"` +} + +func NewReadVectorStore(s Settings) (ReadVectorStore, error) { + switch VectorStoreType(s.Type) { + case VectorStoreTypeGrafanaVectorAPI: + return newGrafanaVectorAPI(s.GrafanaVectorAPI), nil + } + return nil, nil +} + +func NewVectorStore(s Settings) (VectorStore, error) { + // TODO: Implement write vector store. + return nil, nil +} diff --git a/pkg/plugin/vector/store/vectorapi.go b/pkg/plugin/vector/store/vectorapi.go new file mode 100644 index 00000000..91a5dd2e --- /dev/null +++ b/pkg/plugin/vector/store/vectorapi.go @@ -0,0 +1,89 @@ +package store + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" +) + +type grafanaVectorAPISettings struct { + URL string `json:"url"` +} + +type grafanaVectorAPI struct { + client *http.Client + url string +} + +func (g *grafanaVectorAPI) Collections(ctx context.Context) ([]string, error) { + resp, err := g.client.Get(g.url + "/collections") + if err != nil { + return nil, fmt.Errorf("get collections: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("get collections: %s", resp.Status) + } + + type collectionResponse struct { + Name string `json:"name"` + Dimension int `json:"dimension"` + } + collections := []collectionResponse{} + if err := json.NewDecoder(resp.Body).Decode(&collections); err != nil { + return nil, fmt.Errorf("decode collections: %w", err) + } + names := make([]string, 0, len(collections)) + for _, c := range collections { + names = append(names, c.Name) + } + return names, nil +} + +func (g *grafanaVectorAPI) Search(ctx context.Context, collection string, vector []float32, limit uint64) ([]SearchResult, error) { + type queryPointsRequest struct { + Query []float32 `json:"query"` + TopK uint64 `json:"top_k"` + } + reqBody := queryPointsRequest{ + Query: vector, + TopK: limit, + } + reqJSON, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + resp, err := g.client.Post(g.url+"/collections/"+collection+"/query", "application/json", bytes.NewReader(reqJSON)) + if err != nil { + return nil, fmt.Errorf("post collections: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("post collections: %s", resp.Status) + } + type queryPointResult struct { + ID string `json:"id"` + Embedding []float32 `json:"embedding"` + Metadata map[string]any `json:"metadata"` + Score float64 `json:"score"` + } + queryResult := []queryPointResult{} + if err := json.NewDecoder(resp.Body).Decode(&queryResult); err != nil { + return nil, fmt.Errorf("decode collections: %w", err) + } + results := make([]SearchResult, 0, len(queryResult)) + for _, r := range queryResult { + results = append(results, SearchResult{ + Payload: r.Metadata, + Score: r.Score, + }) + } + return results, nil +} + +func newGrafanaVectorAPI(s grafanaVectorAPISettings) ReadVectorStore { + return &grafanaVectorAPI{ + client: &http.Client{}, + url: s.URL, + } +} diff --git a/provisioning/plugins/grafana-llm-app.yaml b/provisioning/plugins/grafana-llm-app.yaml new file mode 100644 index 00000000..ced739ec --- /dev/null +++ b/provisioning/plugins/grafana-llm-app.yaml @@ -0,0 +1,17 @@ +apiVersion: 1 + +apps: + - type: grafana-llm-app + jsonData: + openAIUrl: https://api.openai.com + embeddings: + type: openai + openai: + url: http://localhost:8889 + vectorStore: + type: grafana/vectorapi + grafanaVectorAPI: + url: http://localhost:8889 + + secureJsonData: + openAIKey: $OPENAI_API_KEY diff --git a/src/plugin.json b/src/plugin.json index 39996b1e..1a9f5991 100644 --- a/src/plugin.json +++ b/src/plugin.json @@ -32,5 +32,23 @@ "dependencies": { "grafanaDependency": ">=9.5.2", "plugins": [] + }, + "externalServiceRegistration": { + "self": { + "permissions": [ + { + "action": "dashboards:read", + "scope": "dashboards:*" + }, + { + "action": "dashboards:read", + "scope": "folders:*" + }, + { + "action": "folders:read", + "scope": "folders:*" + } + ] + } } }