diff --git a/quesma/backend_connectors/basic_sql_backend_connector.go b/quesma/backend_connectors/basic_sql_backend_connector.go new file mode 100644 index 000000000..d1bfe6e2f --- /dev/null +++ b/quesma/backend_connectors/basic_sql_backend_connector.go @@ -0,0 +1,83 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package backend_connectors + +import ( + "context" + "database/sql" + quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core" +) + +type BasicSqlBackendConnector struct { + connection *sql.DB +} + +type SqlRows struct { + rows *sql.Rows +} + +func (p *SqlRows) Next() bool { + return p.rows.Next() +} + +func (p *SqlRows) Scan(dest ...interface{}) error { + return p.rows.Scan(dest...) +} + +func (p *SqlRows) Close() error { + return p.rows.Close() +} + +func (p *SqlRows) Err() error { + return p.rows.Err() +} + +func (p *BasicSqlBackendConnector) Open() error { + conn, err := initDBConnection() + if err != nil { + return err + } + p.connection = conn + return nil +} + +func (p *BasicSqlBackendConnector) Close() error { + if p.connection == nil { + return nil + } + return p.connection.Close() +} + +func (p *BasicSqlBackendConnector) Ping() error { + return p.connection.Ping() +} + +func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { + rows, err := p.connection.QueryContext(ctx, query, args...) + if err != nil { + return nil, err + } + return &SqlRows{rows: rows}, nil +} + +func (p *BasicSqlBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row { + return p.connection.QueryRowContext(ctx, query, args...) +} + +func (p *BasicSqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { + if len(args) == 0 { + _, err := p.connection.ExecContext(ctx, query) + return err + } + _, err := p.connection.ExecContext(ctx, query, args...) + return err +} + +func (p *BasicSqlBackendConnector) Stats() quesma_api.DBStats { + stats := p.connection.Stats() + return quesma_api.DBStats{ + MaxOpenConnections: stats.MaxOpenConnections, + OpenConnections: stats.OpenConnections, + } +} diff --git a/quesma/backend_connectors/clickhouse_backend_connector.go b/quesma/backend_connectors/clickhouse_backend_connector.go index e6824fd6c..c6ee3106a 100644 --- a/quesma/backend_connectors/clickhouse_backend_connector.go +++ b/quesma/backend_connectors/clickhouse_backend_connector.go @@ -4,7 +4,6 @@ package backend_connectors import ( - "context" "database/sql" "github.com/ClickHouse/clickhouse-go/v2" @@ -12,38 +11,8 @@ import ( ) type ClickHouseBackendConnector struct { - Endpoint string - connection *sql.DB -} - -type ClickHouseRows struct { - rows *sql.Rows -} -type ClickHouseRow struct { - row *sql.Row -} - -func (p *ClickHouseRow) Scan(dest ...interface{}) error { - if p.row == nil { - return sql.ErrNoRows - } - return p.row.Scan(dest...) -} - -func (p *ClickHouseRows) Next() bool { - return p.rows.Next() -} - -func (p *ClickHouseRows) Scan(dest ...interface{}) error { - return p.rows.Scan(dest...) -} - -func (p *ClickHouseRows) Close() error { - return p.rows.Close() -} - -func (p *ClickHouseRows) Err() error { - return p.rows.Err() + BasicSqlBackendConnector + Endpoint string } func (p *ClickHouseBackendConnector) GetId() quesma_api.BackendConnectorType { @@ -59,46 +28,6 @@ func (p *ClickHouseBackendConnector) Open() error { return nil } -func (p *ClickHouseBackendConnector) Close() error { - if p.connection == nil { - return nil - } - return p.connection.Close() -} - -func (p *ClickHouseBackendConnector) Ping() error { - return p.connection.Ping() -} - -func (p *ClickHouseBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { - rows, err := p.connection.QueryContext(ctx, query, args...) - if err != nil { - return nil, err - } - return &ClickHouseRows{rows: rows}, nil -} - -func (p *ClickHouseBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row { - return p.connection.QueryRowContext(ctx, query, args...) -} - -func (p *ClickHouseBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { - if len(args) == 0 { - _, err := p.connection.ExecContext(ctx, query) - return err - } - _, err := p.connection.ExecContext(ctx, query, args...) - return err -} - -func (p *ClickHouseBackendConnector) Stats() quesma_api.DBStats { - stats := p.connection.Stats() - return quesma_api.DBStats{ - MaxOpenConnections: stats.MaxOpenConnections, - OpenConnections: stats.OpenConnections, - } -} - // func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB { func initDBConnection() (*sql.DB, error) { options := clickhouse.Options{Addr: []string{"localhost:9000"}} @@ -124,8 +53,10 @@ func NewClickHouseBackendConnector(endpoint string) *ClickHouseBackendConnector // so that it is can be used in pre-v2 code. Should be removed when moving forwards. func NewClickHouseBackendConnectorWithConnection(endpoint string, conn *sql.DB) *ClickHouseBackendConnector { return &ClickHouseBackendConnector{ - Endpoint: endpoint, - connection: conn, + BasicSqlBackendConnector: BasicSqlBackendConnector{ + connection: conn, + }, + Endpoint: endpoint, } } diff --git a/quesma/backend_connectors/mysql_backend_connector.go b/quesma/backend_connectors/mysql_backend_connector.go index 69a63f0a3..f3cc0c980 100644 --- a/quesma/backend_connectors/mysql_backend_connector.go +++ b/quesma/backend_connectors/mysql_backend_connector.go @@ -4,35 +4,14 @@ package backend_connectors import ( - "context" "database/sql" quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core" _ "github.com/go-sql-driver/mysql" ) -type MySqlRows struct { - rows *sql.Rows -} - -func (p *MySqlRows) Next() bool { - return p.rows.Next() -} - -func (p *MySqlRows) Scan(dest ...interface{}) error { - return p.rows.Scan(dest...) -} - -func (p *MySqlRows) Close() error { - return p.rows.Close() -} - -func (p *MySqlRows) Err() error { - return p.rows.Err() -} - type MySqlBackendConnector struct { - Endpoint string - connection *sql.DB + BasicSqlBackendConnector + Endpoint string } func (p *MySqlBackendConnector) InstanceName() string { @@ -55,43 +34,3 @@ func (p *MySqlBackendConnector) Open() error { p.connection = conn return nil } - -func (p *MySqlBackendConnector) Close() error { - if p.connection == nil { - return nil - } - return p.connection.Close() -} - -func (p *MySqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { - rows, err := p.connection.QueryContext(context.Background(), query, args...) - if err != nil { - return nil, err - } - return &MySqlRows{rows: rows}, nil -} - -func (p *MySqlBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row { - return p.connection.QueryRowContext(ctx, query, args...) -} - -func (p *MySqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { - if len(args) == 0 { - _, err := p.connection.ExecContext(context.Background(), query) - return err - } - _, err := p.connection.ExecContext(context.Background(), query, args...) - return err -} - -func (p *MySqlBackendConnector) Stats() quesma_api.DBStats { - stats := p.connection.Stats() - return quesma_api.DBStats{ - MaxOpenConnections: stats.MaxOpenConnections, - OpenConnections: stats.OpenConnections, - } -} - -func (p *MySqlBackendConnector) Ping() error { - return nil -} diff --git a/quesma/backend_connectors/postgres_backend_connector.go b/quesma/backend_connectors/postgres_backend_connector.go index 7ce13242f..72f86067b 100644 --- a/quesma/backend_connectors/postgres_backend_connector.go +++ b/quesma/backend_connectors/postgres_backend_connector.go @@ -4,14 +4,19 @@ package backend_connectors import ( - "context" + "database/sql" quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core" - "github.com/jackc/pgx/v4" + + _ "github.com/jackc/pgx/v5/stdlib" ) type PostgresBackendConnector struct { - Endpoint string - connection *pgx.Conn + BasicSqlBackendConnector + Endpoint string +} + +func (p *PostgresBackendConnector) InstanceName() string { + return "postgresql" } func (p *PostgresBackendConnector) GetId() quesma_api.BackendConnectorType { @@ -19,63 +24,16 @@ func (p *PostgresBackendConnector) GetId() quesma_api.BackendConnectorType { } func (p *PostgresBackendConnector) Open() error { - conn, err := pgx.Connect(context.Background(), p.Endpoint) + // Note: pgx library also has its own custom interface (pgx.Connect), which is not compatible + // with the standard sql.DB interface, but has more features and is more efficient. + conn, err := sql.Open("pgx", p.Endpoint) if err != nil { return err } - p.connection = conn - return nil -} - -func (p *PostgresBackendConnector) Close() error { - if p.connection == nil { - return nil - } - return p.connection.Close(context.Background()) -} - -func (p *PostgresBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { - pgRows, err := p.connection.Query(ctx, query, args...) + err = conn.Ping() if err != nil { - return nil, err - } - return &PgRows{rows: pgRows}, nil -} - -func (p *PostgresBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row { - return p.connection.QueryRow(ctx, query, args...) -} - -func (p *PostgresBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { - if len(args) == 0 { - _, err := p.connection.Exec(ctx, query) return err } - _, err := p.connection.Exec(ctx, query, args...) - return err -} - -func (p *PostgresBackendConnector) Stats() quesma_api.DBStats { - return quesma_api.DBStats{} -} - -type PgRows struct { - rows pgx.Rows -} - -func (p *PgRows) Next() bool { - return p.rows.Next() -} - -func (p *PgRows) Scan(dest ...interface{}) error { - return p.rows.Scan(dest...) -} - -func (p *PgRows) Close() error { - p.rows.Close() + p.connection = conn return nil } - -func (p *PgRows) Err() error { - return p.rows.Err() -} diff --git a/quesma/backend_connectors/tcp_backend_connector.go b/quesma/backend_connectors/tcp_backend_connector.go new file mode 100644 index 000000000..527965bce --- /dev/null +++ b/quesma/backend_connectors/tcp_backend_connector.go @@ -0,0 +1,118 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package backend_connectors + +import ( + "bytes" + "context" + "errors" + "fmt" + quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core" + "io" + "net" + "os" + "time" +) + +type TcpBackendConnector struct { + conn net.Conn + addr string +} + +func NewTcpBackendConnector(addr string) (*TcpBackendConnector, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to connect to %s: %w", addr, err) + } + + return &TcpBackendConnector{ + conn: conn, + addr: addr, + }, nil +} + +func (t TcpBackendConnector) InstanceName() string { + return "TcpBackendConnector" +} + +func (t TcpBackendConnector) GetId() quesma_api.BackendConnectorType { + return quesma_api.TcpBackend +} + +func (t TcpBackendConnector) Open() error { + return nil +} + +func (t TcpBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { + //TODO implement me + panic("implement me") +} + +func (t TcpBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row { + //TODO implement me + panic("implement me") +} + +func (t TcpBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { + //TODO implement me + panic("implement me") +} + +func (t TcpBackendConnector) Stats() quesma_api.DBStats { + //TODO implement me + panic("implement me") +} + +func (t TcpBackendConnector) Close() error { + return nil +} + +func (t TcpBackendConnector) Ping() error { + //TODO implement me + panic("implement me") +} + +func (t TcpBackendConnector) Write(data []byte) error { + if t.conn == nil { + return fmt.Errorf("connection is nil") + } + n, err := t.conn.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d bytes but expected to write %d bytes", n, len(data)) + } + return nil +} + +func (t TcpBackendConnector) Read(n int) ([]byte, error) { + if t.conn == nil { + return nil, fmt.Errorf("connection is nil") + } + + // Create buffer to store all read data + var buffer bytes.Buffer + + // Create temporary buffer for reading chunks + tmp := make([]byte, n) + + // Set read deadline to avoid blocking forever + err := t.conn.SetReadDeadline(time.Now().Add(1000 * time.Millisecond)) + if err != nil { + return buffer.Bytes(), err + } + + n, err = t.conn.Read(tmp) + if err != nil { + if err == io.EOF || errors.Is(err, os.ErrDeadlineExceeded) { + return buffer.Bytes(), nil + } + return buffer.Bytes(), err + } + + buffer.Write(tmp[:n]) + + return buffer.Bytes(), nil +} diff --git a/quesma/frontend_connectors/basic_tcp_connector.go b/quesma/frontend_connectors/basic_tcp_connector.go index 9910f5067..8824cbf48 100644 --- a/quesma/frontend_connectors/basic_tcp_connector.go +++ b/quesma/frontend_connectors/basic_tcp_connector.go @@ -58,6 +58,10 @@ func (t *TCPListener) GetEndpoint() string { return t.Endpoint } +func (t *TCPListener) InstanceName() string { + return "TCPListener" +} + func (t *TCPListener) AddConnectionHandler(handler quesma_api.TCPConnectionHandler) { t.handler = handler } diff --git a/quesma/frontend_connectors/tcp_postgres_connection_handler.go b/quesma/frontend_connectors/tcp_postgres_connection_handler.go index 20e97a2f3..3d8134975 100644 --- a/quesma/frontend_connectors/tcp_postgres_connection_handler.go +++ b/quesma/frontend_connectors/tcp_postgres_connection_handler.go @@ -18,19 +18,31 @@ func (p *TcpPostgresConnectionHandler) HandleConnection(conn net.Conn) error { backend := pgproto3.NewBackend(conn, conn) defer p.close(conn) - err := p.handleStartup(conn, backend) - if err != nil { - return err - } + //err := p.handleStartup(conn, backend) + //if err != nil { + // return err + //} + + receivedStartupMessage := false dispatcher := quesma_api.Dispatcher{} for { - msg, err := backend.Receive() + var resp any + var err error + + if receivedStartupMessage { + resp, err = backend.Receive() + } else { + resp, err = backend.ReceiveStartupMessage() + if _, isStartup := resp.(*pgproto3.StartupMessage); isStartup { + receivedStartupMessage = true + } + } if err != nil { return fmt.Errorf("error receiving message: %w", err) } - var resp any = msg + metadata := make(map[string]interface{}) _, resp = dispatcher.Dispatch(p.processors, metadata, resp) if resp != nil { diff --git a/quesma/go.mod b/quesma/go.mod index 9400f859c..d0c19cdfc 100644 --- a/quesma/go.mod +++ b/quesma/go.mod @@ -48,6 +48,7 @@ require ( github.com/jackc/pgproto3/v2 v2.3.3 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgtype v1.14.4 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/kr/text v0.2.0 // indirect @@ -57,6 +58,7 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect golang.org/x/crypto v0.32.0 // indirect + golang.org/x/sync v0.10.0 // indirect golang.org/x/text v0.21.0 // indirect ) diff --git a/quesma/go.sum b/quesma/go.sum index 2e779a0e8..6df570e71 100644 --- a/quesma/go.sum +++ b/quesma/go.sum @@ -140,7 +140,10 @@ github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsb github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0= github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40= @@ -365,6 +368,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/quesma/main.go b/quesma/main.go index eb730c84a..7beee1fcb 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -7,16 +7,19 @@ import ( "fmt" "github.com/QuesmaOrg/quesma/quesma/ab_testing" "github.com/QuesmaOrg/quesma/quesma/ab_testing/sender" + "github.com/QuesmaOrg/quesma/quesma/backend_connectors" "github.com/QuesmaOrg/quesma/quesma/buildinfo" "github.com/QuesmaOrg/quesma/quesma/clickhouse" "github.com/QuesmaOrg/quesma/quesma/common_table" "github.com/QuesmaOrg/quesma/quesma/connectors" "github.com/QuesmaOrg/quesma/quesma/elasticsearch" "github.com/QuesmaOrg/quesma/quesma/feature" + "github.com/QuesmaOrg/quesma/quesma/frontend_connectors" "github.com/QuesmaOrg/quesma/quesma/ingest" "github.com/QuesmaOrg/quesma/quesma/licensing" "github.com/QuesmaOrg/quesma/quesma/logger" "github.com/QuesmaOrg/quesma/quesma/persistence" + "github.com/QuesmaOrg/quesma/quesma/processors" "github.com/QuesmaOrg/quesma/quesma/quesma" "github.com/QuesmaOrg/quesma/quesma/quesma/async_search_storage" "github.com/QuesmaOrg/quesma/quesma/quesma/config" @@ -25,6 +28,7 @@ import ( "github.com/QuesmaOrg/quesma/quesma/table_resolver" "github.com/QuesmaOrg/quesma/quesma/telemetry" "github.com/QuesmaOrg/quesma/quesma/tracing" + quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core" "log" "os" "os/signal" @@ -55,6 +59,15 @@ const EnableConcurrencyProfiling = false //} func main() { + if false { + launchPostgressPassthrough() + return + } + if true { + launchTcpPassthrough() + return + } + if EnableConcurrencyProfiling { runtime.SetBlockProfileRate(1) runtime.SetMutexProfileFraction(1) @@ -155,6 +168,52 @@ func main() { } +func launchTcpPassthrough() { + var frontendConn = frontend_connectors.NewTCPConnector(":15432") + var tcpProcessor quesma_api.Processor = processors.NewTcpPassthroughProcessor() + var tcpPostgressHandler = frontend_connectors.TcpPostgresConnectionHandler{} + frontendConn.AddConnectionHandler(&tcpPostgressHandler) + var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + postgressPipeline.AddProcessor(tcpProcessor) + postgressPipeline.AddFrontendConnector(frontendConn) + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies()) + backendConn, err := backend_connectors.NewTcpBackendConnector("localhost:5432") + if err != nil { + panic(err) + } + postgressPipeline.AddBackendConnector(backendConn) + quesmaBuilder.AddPipeline(postgressPipeline) + qb, err := quesmaBuilder.Build() + if err != nil { + panic(err) + } + qb.Start() + stop := make(chan os.Signal, 1) + <-stop + qb.Stop(context.Background()) +} + +func launchPostgressPassthrough() { + var frontendConn = frontend_connectors.NewTCPConnector(":15432") + var tcpProcessor quesma_api.Processor = processors.NewPostgresPassthroughProcessor() + var tcpPostgressHandler = frontend_connectors.TcpPostgresConnectionHandler{} + frontendConn.AddConnectionHandler(&tcpPostgressHandler) + var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + postgressPipeline.AddProcessor(tcpProcessor) + postgressPipeline.AddFrontendConnector(frontendConn) + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies()) + postgressPipeline.AddBackendConnector(&backend_connectors.PostgresBackendConnector{Endpoint: "postgresql://localhost:5432/postgres"}) + quesmaBuilder.AddPipeline(postgressPipeline) + qb, err := quesmaBuilder.Build() + if err != nil { + panic(err) + } + qb.Start() + stop := make(chan os.Signal, 1) + <-stop + qb.Stop(context.Background()) +} + func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, im elasticsearch.IndexManagement, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender, indexRegistry table_resolver.TableResolver) *quesma.Quesma { if cfg.TransparentProxy { return quesma.NewQuesmaTcpProxy(cfg, quesmaManagementConsole, logChan, false) diff --git a/quesma/processors/postgres_passthrough_processor.go b/quesma/processors/postgres_passthrough_processor.go new file mode 100644 index 000000000..5d422b1a6 --- /dev/null +++ b/quesma/processors/postgres_passthrough_processor.go @@ -0,0 +1,135 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package processors + +import ( + "context" + "fmt" + quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core" + "github.com/jackc/pgx/v5/pgproto3" +) + +type PostgresPassthroughProcessor struct { + BaseProcessor +} + +func NewPostgresPassthroughProcessor() *PostgresPassthroughProcessor { + return &PostgresPassthroughProcessor{ + BaseProcessor: NewBaseProcessor(), + } +} + +func (p *PostgresPassthroughProcessor) InstanceName() string { + return "PostgresPassthroughProcessor" +} + +func (p *PostgresPassthroughProcessor) GetId() string { + return "postgrespassthrough_processor" +} + +func (p *PostgresPassthroughProcessor) respond(query string) ([]byte, error) { + backendConn := p.GetBackendConnector(quesma_api.PgSQLBackend) + if backendConn == nil { + return nil, fmt.Errorf("no backend connector found") + } + fmt.Println("Backend connector found") + err := backendConn.Open() + if err != nil { + return nil, fmt.Errorf("error opening connection: %v", err) + } + defer backendConn.Close() + + // Execute the query + rows, err := backendConn.Query(context.Background(), query) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %v", err) + } + defer rows.Close() + + // backendConn interface doesn't have a method to get column names, so we create dummy column names + columnNames := []string{"col_1"} + + // Create field descriptions for each column + fields := make([]pgproto3.FieldDescription, len(columnNames)) + for i, name := range columnNames { + fields[i] = pgproto3.FieldDescription{ + Name: []byte(name), + TableOID: 0, + TableAttributeNumber: 0, + DataTypeOID: 25, // Default to text type + DataTypeSize: -1, + TypeModifier: -1, + Format: 0, + } + } + + // Create row description + rowDesc := &pgproto3.RowDescription{Fields: fields} + buf := mustEncode(rowDesc.Encode(nil)) + + // Prepare scannable destination slice + values := make([]interface{}, len(columnNames)) + valuePtrs := make([]interface{}, len(columnNames)) + for i := range values { + valuePtrs[i] = &values[i] + } + + // Iterate through rows + for rows.Next() { + err = rows.Scan(valuePtrs...) + if err != nil { + return nil, fmt.Errorf("failed to scan row: %v", err) + } + + // Convert values to strings + rowValues := make([][]byte, len(columnNames)) + for i, val := range values { + if val == nil { + rowValues[i] = nil + } else { + rowValues[i] = []byte(fmt.Sprintf("%v", val)) + } + } + + // Encode each row + dataRow := &pgproto3.DataRow{Values: rowValues} + buf = mustEncode(dataRow.Encode(buf)) + } + + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("row iteration error: %v", err) + } + + // Add command complete and ready for query messages + buf = mustEncode((&pgproto3.CommandComplete{CommandTag: []byte("SELECT")}).Encode(buf)) + buf = mustEncode((&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)) + + return buf, nil +} + +func (p *PostgresPassthroughProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + fmt.Println("PostgresPassthrough processor ") + for _, m := range message { + msg := m.(pgproto3.FrontendMessage) + switch msg := msg.(type) { + case *pgproto3.Query: + response, err := p.respond(msg.String) + if err != nil { + return metadata, nil, fmt.Errorf("error generating query response: %w", err) + } + return metadata, response, nil + case *pgproto3.Terminate: + return metadata, nil, nil + + default: + fmt.Println("Received other than query") + return metadata, nil, fmt.Errorf("received message other than Query from client: %#v", msg) + } + } + return metadata, nil, nil +} + +func (p *PostgresPassthroughProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.PgSQLBackend} +} diff --git a/quesma/processors/tcp_passthrough_processor.go b/quesma/processors/tcp_passthrough_processor.go new file mode 100644 index 000000000..bcd03cdd6 --- /dev/null +++ b/quesma/processors/tcp_passthrough_processor.go @@ -0,0 +1,133 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package processors + +import ( + "fmt" + "github.com/QuesmaOrg/quesma/quesma/backend_connectors" + quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core" + "github.com/jackc/pgx/v5/pgproto3" +) + +type TcpPassthroughProcessor struct { + BaseProcessor + alreadySentHandshake bool + alreadyReceivedReady bool +} + +func (t TcpPassthroughProcessor) InstanceName() string { + return "TcpPassthroughProcessor" +} + +func (t TcpPassthroughProcessor) GetId() string { + return "TcpPassthroughProcessor" +} + +func (t *TcpPassthroughProcessor) Handle(metadata map[string]interface{}, messages ...any) (map[string]interface{}, any, error) { + backendConnector := t.GetBackendConnector(quesma_api.TcpBackend).(*backend_connectors.TcpBackendConnector) + // + //if !t.alreadySentHandshake { + // t.alreadySentHandshake = true + // err := backendConnector.Write([]byte{0x0, 0x0, 0x0, 0x5a, 0x0, 0x3, 0x0, 0x0, 0x75, 0x73, 0x65, 0x72, 0x0, 0x70, 0x69, 0x6f, 0x74, 0x72, 0x67, 0x72, 0x61, 0x62, 0x6f, 0x77, 0x73, 0x6b, 0x69, 0x0, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x0, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x0, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x0, 0x70, 0x73, 0x71, 0x6c, 0x0, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x0, 0x55, 0x54, 0x46, 0x38, 0x0, 0x0}) + // if err != nil { + // panic(err) + // } + //} + + var fullMsg []byte + + for _, m := range messages { + msg := m.(pgproto3.FrontendMessage) + encoded := make([]byte, 0) + encoded, err := msg.Encode(encoded) + if err != nil { + panic(err) + } + + err = backendConnector.Write(encoded) + if err != nil { + panic(err) + } + + if _, isSslRequest := msg.(*pgproto3.SSLRequest); isSslRequest { + sslResponse, err := backendConnector.Read(1) + if err != nil { + panic(err) + } + fmt.Printf("Got SSL response: %v\n", sslResponse) + fullMsg = append(fullMsg, sslResponse...) + } + + fmt.Printf("%v %T %v\n", msg, msg, encoded) + } + + //if !t.alreadyReceivedReady { + // t.alreadyReceivedReady = true + // for { + // // Read 4-byte length prefix + // lengthBytes, err := backendConnector.Read(5) + // if err != nil { + // panic(err) + // } + // //fmt.Printf("Got length (startup): %v\n", lengthBytes) + // + // // Convert 4 bytes to big endian integer + // msgLength := int(lengthBytes[1])<<24 | int(lengthBytes[2])<<16 | int(lengthBytes[3])<<8 | int(lengthBytes[4]) - 4 + // + // // Read the rest of the message based on length + // _, err = backendConnector.Read(msgLength) + // if err != nil { + // panic(err) + // } + // + // // Combine length prefix and message body + // //fullMsg = append(fullMsg, lengthBytes...) + // //fullMsg = append(fullMsg, msgBody...) + // //fmt.Printf("Got response (startup): %v\n", fullMsg) + // + // if lengthBytes[0] == 90 /* Z - ready */ { + // break + // } + // } + //} + + // Read 4-byte length prefix + for { + lengthBytes, err := backendConnector.Read(5) + if err != nil { + panic(err) + } + //fmt.Printf("Got length: %v\n", lengthBytes) + if len(lengthBytes) == 0 { + break + } + + // Convert 4 bytes to big endian integer + msgLength := int(lengthBytes[1])<<24 | int(lengthBytes[2])<<16 | int(lengthBytes[3])<<8 | int(lengthBytes[4]) - 4 + + // Read the rest of the message based on length + msgBody, err := backendConnector.Read(msgLength) + if err != nil { + panic(err) + } + + // Combine length prefix and message body + fullMsg = append(fullMsg, lengthBytes...) + fullMsg = append(fullMsg, msgBody...) + + if lengthBytes[0] == 90 /* Z - ready */ { + break + } + } + + //fmt.Printf("Got response: %v\n", fullMsg) + //fmt.Printf("Will write: %v\n", fullMsg) + return metadata, fullMsg, nil +} + +func NewTcpPassthroughProcessor() *TcpPassthroughProcessor { + return &TcpPassthroughProcessor{ + BaseProcessor: NewBaseProcessor(), + } +} diff --git a/quesma/v2/core/backend_connectors.go b/quesma/v2/core/backend_connectors.go index a0c81e351..e11169576 100644 --- a/quesma/v2/core/backend_connectors.go +++ b/quesma/v2/core/backend_connectors.go @@ -12,6 +12,7 @@ const ( PgSQLBackend ClickHouseSQLBackend ElasticsearchBackend + TcpBackend ) func GetBackendConnectorNameFromType(connectorType BackendConnectorType) string {