Skip to content

Commit

Permalink
Merge pull request #2749 from rockwotj/pgvector
Browse files Browse the repository at this point in the history
Support pgvector for postgres outputs
  • Loading branch information
Jeffail authored Aug 1, 2024
2 parents aa8c4c5 + 54cb8b3 commit b5bcb36
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ All notable changes to this project will be documented in this file.

- Field `content_md5` added to the `aws_s3` output. (@dom-lee-naimuri)
- Field `send_ack` added to the `nats` input. (@plejd-sebman)
- New Bloblang method `vector`. (@rockwotj)
- New experimental `ockam_kafka` input and output. (@mrinalwadhwa, @davide-baldo)

## 4.32.1 - 2024-07-24

Expand Down
1 change: 1 addition & 0 deletions cmd/tools/docs_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func doBloblangMethods(dir string) {
"Object & Array Manipulation",
"Parsing",
"Encoding and Encryption",
"SQL",
"JSON Web Tokens",
"GeoIP",
"Deprecated",
Expand Down
30 changes: 30 additions & 0 deletions docs/modules/guides/pages/bloblang/methods.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3402,6 +3402,36 @@ root.h2 = this.value.hash(algorithm: "crc32", polynomial: "Koopman").encode("hex
# Out: {"h1":"c99465aa","h2":"df373d3c"}
```
== SQL
=== `vector`
[CAUTION]
====
This method is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with it is found.
====
Creates a vector from a given array of floating point numbers.
This vector can be inserted into various SQL databases if they have support for embeddings vectors (for example `pgvector`).
Introduced in version 4.33.0.
==== Examples
Create a vector from an array literal
```coffeescript
root.embeddings = [1.2, 0.6, 0.9].vector()
```
Create a vector from an array
```coffeescript
root.embedding_vector = this.embedding_array.vector()
```
== JSON Web Tokens
=== `parse_jwt_es256`
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ require (
github.com/opencontainers/runc v1.1.12 // indirect
github.com/oschwald/maxminddb-golang v1.13.0 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pgvector/pgvector-go v0.2.1
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
Expand Down
34 changes: 34 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ cuelang.org/go v0.9.1/go.mod h1:qpAYsLOf7gTM1YdEg6cxh553uZ4q9ZDWlPbtZr9q1Wk=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
entgo.io/ent v0.13.1 h1:uD8QwN1h6SNphdCCzmkMN3feSUzNnVvV/WIkHKMbzOE=
entgo.io/ent v0.13.1/go.mod h1:qCEmo+biw3ccBn9OyL4ZK5dfpwg++l1Gxwac5B1206A=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
Expand Down Expand Up @@ -477,6 +479,10 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-pg/pg/v10 v10.11.0 h1:CMKJqLgTrfpE/aOVeLdybezR2om071Vh38OLZjsyMI0=
github.com/go-pg/pg/v10 v10.11.0/go.mod h1:4BpHRoxE61y4Onpof3x1a2SQvi9c+q1dJnrNdMjsroA=
github.com/go-pg/zerochecker v0.2.0 h1:pp7f72c3DobMWOb2ErtZsnrPaSvHd2W4o9//8HtF4mU=
github.com/go-pg/zerochecker v0.2.0/go.mod h1:NJZ4wKL0NmTtz0GKCoJ8kym6Xn/EQzXRl2OnAe7MmDo=
github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI=
github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
Expand Down Expand Up @@ -710,6 +716,8 @@ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgS
github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA=
github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
Expand All @@ -732,11 +740,17 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jhump/protoreflect v1.16.0 h1:54fZg+49widqXYQ0b+usAFHbMkBGR4PpXrsHc8+TBDg=
github.com/jhump/protoreflect v1.16.0/go.mod h1:oYPd7nPvcBw/5wlDfm/AVmU9zH9BgqGCI469pGxfj/8=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down Expand Up @@ -920,6 +934,8 @@ github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKf
github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
github.com/pgvector/pgvector-go v0.2.1 h1:tsrw4Y8A2EoeLr2ONc8W5/sOMNYWkyK5Pqq8NXgPBNM=
github.com/pgvector/pgvector-go v0.2.1/go.mod h1:tUykcz5NRZU9XTZiVEbRu8y8dnQFD5Mpu/0jsnlSmt8=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
Expand Down Expand Up @@ -1087,6 +1103,8 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/tmc/langchaingo v0.1.5 h1:PNPFu54wn5uVPRt9GS/quRwdFZW4omSab9/dcFAsGmU=
github.com/tmc/langchaingo v0.1.5/go.mod h1:RLtnUED/hH2v765vdjS9Z6gonErZAXURuJHph0BttqM=
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo=
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs=
github.com/trinodb/trino-go-client v0.315.0 h1:9mU+42VGw9Hnp9R1hkhWlIrQp9o+V01Gx1KlHjTkM1c=
github.com/trinodb/trino-go-client v0.315.0/go.mod h1:ND1s5JuAHWUXnllV3dvt/pYKhlrc0G51l6LvVFD2bJ4=
github.com/trivago/grok v1.0.0 h1:oV2ljyZT63tgXkmgEHg2U0jMqiKKuL0hkn49s6aRavQ=
Expand All @@ -1098,11 +1116,21 @@ github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk=
github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ=
github.com/uptrace/bun v1.1.12/go.mod h1:NPG6JGULBeQ9IU6yHp7YGELRa5Agmd7ATZdz4tGZ6z0=
github.com/uptrace/bun/dialect/pgdialect v1.1.12 h1:m/CM1UfOkoBTglGO5CUTKnIKKOApOYxkcP2qn0F9tJk=
github.com/uptrace/bun/dialect/pgdialect v1.1.12/go.mod h1:Ij6WIxQILxLlL2frUBxUBOZJtLElD2QQNDcu/PWDHTc=
github.com/uptrace/bun/driver/pgdriver v1.1.12 h1:3rRWB1GK0psTJrHwxzNfEij2MLibggiLdTqjTtfHc1w=
github.com/uptrace/bun/driver/pgdriver v1.1.12/go.mod h1:ssYUP+qwSEgeDDS1xm2XBip9el1y9Mi5mTAvLoiADLM=
github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI=
github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM=
github.com/vmihailenco/bufpool v0.1.11 h1:gOq2WmBrq0i2yW5QJ16ykccQ4wH9UyEsgLm6czKAd94=
github.com/vmihailenco/bufpool v0.1.11/go.mod h1:AFf/MOy3l2CFTKbxwt0mp2MwnqjNEs5H/UxrkA5jxTQ=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vbd1qPqc=
github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wI2L/jsondiff v0.4.0 h1:iP56F9tK83eiLttg3YdmEENtZnwlYd3ezEpNNnfZVyM=
Expand Down Expand Up @@ -1617,6 +1645,10 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo=
gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0=
gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls=
gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand All @@ -1626,6 +1658,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
inet.af/peercred v0.0.0-20210906144145-0893ea02156a/go.mod h1:FjawnflS/udxX+SvpsMgZfdqx2aykOlkISeAsADi5IU=
mellium.im/sasl v0.3.1 h1:wE0LW6g7U83vhvxjC1IY8DnXM+EU095yeo8XClvCdfo=
mellium.im/sasl v0.3.1/go.mod h1:xm59PUYpZHhgQ9ZqoJ5QaCqzWMi8IeS49dhp6plPCzw=
modernc.org/cc/v4 v4.21.2 h1:dycHFB/jDc3IyacKipCNSDrjIC0Lm1hyoWOZTRR20Lk=
modernc.org/cc/v4 v4.21.2/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ=
modernc.org/ccgo/v4 v4.17.10 h1:6wrtRozgrhCxieCeJh85QsxkX/2FFrT9hdaWPlbn4Zo=
Expand Down
60 changes: 60 additions & 0 deletions internal/impl/sql/bloblang.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sql

import (
"fmt"

"github.com/redpanda-data/benthos/v4/public/bloblang"
)

type vector struct {
value []float32
}

func init() {
vectorSpec := bloblang.NewPluginSpec().
Beta().
Category("SQL").
Description(`Creates a vector from a given array of floating point numbers.
This vector can be inserted into various SQL databases if they have support for embeddings vectors (for example `+"`pgvector`).").
Version("4.33.0").
Example("Create a vector from an array literal",
`root.embeddings = [1.2, 0.6, 0.9].vector()`,
).
Example("Create a vector from an array",
`root.embedding_vector = this.embedding_array.vector()`,
)

if err := bloblang.RegisterMethodV2(
"vector", vectorSpec,
func(args *bloblang.ParsedParams) (bloblang.Method, error) {
return bloblang.ArrayMethod(func(a []any) (any, error) {
vec := make([]float32, len(a))
for i, e := range a {
f, err := bloblang.ValueAsFloat32(e)
if err != nil {
return nil, fmt.Errorf("could not convert value at index %d to float32: %w", i, err)
}
vec[i] = f
}
return vector{vec}, nil
}), nil
},
); err != nil {
panic(err)
}
}
111 changes: 111 additions & 0 deletions internal/impl/sql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,117 @@ func TestIntegrationPostgres(t *testing.T) {
testSuite(t, "postgres", dsn, createTable)
}

func TestIntegrationPostgresVector(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
if err != nil {
t.Skipf("Could not connect to docker: %s", err)
}
pool.MaxWait = 3 * time.Minute

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "pgvector/pgvector",
Tag: "pg16",
ExposedPorts: []string{"5432/tcp"},
Env: []string{
"POSTGRES_USER=testuser",
"POSTGRES_PASSWORD=testpass",
"POSTGRES_DB=testdb",
},
})
require.NoError(t, err)

var db *sql.DB
t.Cleanup(func() {
if err = pool.Purge(resource); err != nil {
t.Logf("Failed to clean up docker resource: %s", err)
}
if db != nil {
db.Close()
}
})

dsn := fmt.Sprintf("postgres://testuser:testpass@localhost:%s/testdb?sslmode=disable", resource.GetPort("5432/tcp"))
require.NoError(t, pool.Retry(func() error {
db, err = sql.Open("postgres", dsn)
if err != nil {
return err
}
if err = db.Ping(); err != nil {
db.Close()
db = nil
return err
}
_, err := db.Exec(`CREATE EXTENSION IF NOT EXISTS vector`)
if err != nil {
return err
}
_, err = db.Exec(`CREATE TABLE items (
foo text PRIMARY KEY,
embedding vector(3)
)`)
if err != nil {
return err
}
return nil
}))

env := service.NewEnvironment()

insertConfig, err := isql.InsertProcessorConfig().ParseYAML(fmt.Sprintf(`
driver: postgres
dsn: %s
table: items
columns: ["foo", "embedding"]
args_mapping: 'root = [ this.foo, this.embedding.vector() ]'
`, dsn), env)
require.NoError(t, err)
insertProc, err := isql.NewSQLInsertProcessorFromConfig(insertConfig, service.MockResources())
require.NoError(t, err)
t.Cleanup(func() { insertProc.Close(context.Background()) })

insertBatch := service.MessageBatch{
service.NewMessage([]byte(`{"foo": "blob","embedding": [4,5,6]}`)),
service.NewMessage([]byte(`{"foo": "fish","embedding": [1,2,3]}`)),
}

resBatches, err := insertProc.ProcessBatch(context.Background(), insertBatch)
require.NoError(t, err)
require.Len(t, resBatches, 1)
require.Len(t, resBatches[0], len(insertBatch))
for _, v := range resBatches[0] {
require.NoError(t, v.GetError())
}

queryConf := fmt.Sprintf(`
driver: postgres
dsn: %s
table: items
columns: [ "foo" ]
suffix: ORDER BY embedding <-> '[3,1,2]' LIMIT 1
`, dsn)

selectConfig, err := isql.SelectProcessorConfig().ParseYAML(queryConf, env)
require.NoError(t, err)

selectProc, err := isql.NewSQLSelectProcessorFromConfig(selectConfig, service.MockResources())
require.NoError(t, err)
t.Cleanup(func() { selectProc.Close(context.Background()) })

queryBatch := service.MessageBatch{service.NewMessage([]byte(`{}`))}
resBatches, err = selectProc.ProcessBatch(context.Background(), queryBatch)
require.NoError(t, err)
require.Len(t, resBatches, 1)
require.Len(t, resBatches[0], 1)
m := resBatches[0][0]
require.NoError(t, m.GetError())
actBytes, err := m.AsBytes()
require.NoError(t, err)
assert.JSONEq(t, `[{"foo":"fish"}]`, string(actBytes))
}

func TestIntegrationMySQL(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()
Expand Down
3 changes: 2 additions & 1 deletion internal/impl/sql/output_sql_deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ func newSQLDeprecatedOutputFromConfig(conf *service.ParsedConfig, mgr *service.R
return nil, err
}
}
argsConverter := func(v []any) []any { return v }

connSettings, err := connSettingsFromParsed(conf, mgr)
if err != nil {
return nil, err
}
return newSQLRawOutput(mgr.Logger(), driverStr, dsnStr, queryStatic, nil, argsMapping, connSettings), nil
return newSQLRawOutput(mgr.Logger(), driverStr, dsnStr, queryStatic, nil, argsMapping, argsConverter, connSettings), nil
}
12 changes: 10 additions & 2 deletions internal/impl/sql/output_sql_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ type sqlInsertOutput struct {
builder squirrel.InsertBuilder
dbMut sync.RWMutex

useTxStmt bool
argsMapping *bloblang.Executor
useTxStmt bool
argsMapping *bloblang.Executor
argsConverter argsConverter

connSettings *connSettings

Expand Down Expand Up @@ -167,6 +168,12 @@ func newSQLInsertOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resou
s.builder = s.builder.PlaceholderFormat(squirrel.Colon)
}

if s.driver == "postgres" {
s.argsConverter = bloblValuesToPgSQLValues
} else {
s.argsConverter = func(v []any) []any { return v }
}

if s.useTxStmt {
values := make([]any, 0, len(columns))
for _, c := range columns {
Expand Down Expand Up @@ -268,6 +275,7 @@ func (s *sqlInsertOutput) WriteBatch(ctx context.Context, batch service.MessageB
if args, ok = iargs.([]any); !ok {
return fmt.Errorf("mapping returned non-array result: %T", iargs)
}
args = s.argsConverter(args)
}

if tx == nil {
Expand Down
Loading

0 comments on commit b5bcb36

Please sign in to comment.