Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSGO-1 CASSGO-30 Native Protocol 5 Support #1822

Open
wants to merge 26 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fbdaa2b
Support for Native Protocol 5 release version
worryg0d Jul 18, 2024
93fb4ad
Added 'IF NOT EXISTS' to table creation queries in integration tests
worryg0d Oct 16, 2024
d926108
Clarified comment on startup completed
worryg0d Oct 16, 2024
a1baea2
Returned panics on setting keyspace and now_in_seconds field when pro…
worryg0d Oct 16, 2024
b77aa18
Added description for TestLargeSizeQuery and TestQueryCompressionNotW…
worryg0d Oct 17, 2024
465332b
Removed startupCompleted from Conn obj
worryg0d Oct 17, 2024
f613eaa
Creating new inflight instance when newMetadataID is received
worryg0d Oct 17, 2024
380d660
Moved panics on flags computing stage for now_in_seconds and keyspace…
worryg0d Oct 17, 2024
8b7e64b
Methods renames and comments corrections
worryg0d Oct 18, 2024
2eb3fbe
Improved reading/writing segments code
worryg0d Oct 21, 2024
7a210f2
Removed replace lz4 from go.mod
worryg0d Oct 21, 2024
d8ab2fa
Improved frame segmentation before writing it to conn
worryg0d Oct 21, 2024
34db5ed
Improved error messages
worryg0d Oct 21, 2024
df5adc0
Additional check in recvPartialFrames
worryg0d Oct 21, 2024
a79c985
removed net.Conn and *bufio.Reader from Conn
worryg0d Oct 23, 2024
86130c0
Compressor append-like API
worryg0d Oct 24, 2024
1343003
Fix reading <new_metadata_id> in parseResultMetadata
worryg0d Oct 25, 2024
98e37c1
Updating prepared stmt metadata result set and result_metadata_id whe…
worryg0d Oct 25, 2024
31d10f9
Avoiding deadlock after updating the lru cache entry
worryg0d Oct 25, 2024
2a5056d
Integration test for Metadata_changed mechanism
worryg0d Oct 28, 2024
bf0d7fa
1. Updating info to ensure the code looking at the updated prepared stmt
worryg0d Oct 30, 2024
0592a90
1. Updated TestPrepareExecuteMetadataChangedFlag to validate Metadata…
worryg0d Oct 30, 2024
0298a00
1. Updated the way how the driver constructs stmt cache keys. The cur…
worryg0d Oct 30, 2024
a1a613c
Added skip for TestStmtCacheUsesOverriddenKeyspace and TestRoutingKey…
worryg0d Oct 31, 2024
e779e73
Go bump to 1.19
worryg0d Nov 7, 2024
5ca5382
Changed CI to run integration tests over proto 5 + lz4 compressor
worryg0d Nov 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package gocql

import (
"github.com/stretchr/testify/require"
"testing"
"time"
)
Expand Down Expand Up @@ -84,3 +85,84 @@ func TestBatch_WithTimestamp(t *testing.T) {
t.Errorf("got ts %d, expected %d", storedTs, micros)
}
}

func TestBatch_WithNowInSeconds(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Batch now in seconds are only available on protocol >= 5")
}

if err := createTable(session, `CREATE TABLE IF NOT EXISTS batch_now_in_seconds (id int primary key, val text)`); err != nil {
t.Fatal(err)
}

b := session.NewBatch(LoggedBatch)
b.WithNowInSeconds(0)
b.Query("INSERT INTO batch_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val")
if err := session.ExecuteBatch(b); err != nil {
t.Fatal(err)
}

var remainingTTL int
err := session.Query(`SELECT TTL(val) FROM batch_now_in_seconds WHERE id = ?`, 1).
WithNowInSeconds(10).
Scan(&remainingTTL)
if err != nil {
t.Fatal(err)
}

require.Equal(t, remainingTTL, 10)
}

func TestBatch_SetKeyspace(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("keyspace for BATCH message is not supported in protocol < 5")
}

const keyspaceStmt = `
CREATE KEYSPACE IF NOT EXISTS gocql_keyspace_override_test
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
`

err := session.Query(keyspaceStmt).Exec()
if err != nil {
t.Fatal(err)
}

err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_keyspace_override_test.batch_keyspace(id int, value text, PRIMARY KEY (id))")
if err != nil {
t.Fatal(err)
}

ids := []int{1, 2}
texts := []string{"val1", "val2"}

b := session.NewBatch(LoggedBatch).SetKeyspace("gocql_keyspace_override_test")
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[0], texts[0])
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[1], texts[1])
err = session.ExecuteBatch(b)
if err != nil {
t.Fatal(err)
}

var (
id int
text string
)

iter := session.Query("SELECT * FROM gocql_keyspace_override_test.batch_keyspace").Iter()
defer iter.Close()

for i := 0; iter.Scan(&id, &text); i++ {
require.Equal(t, id, ids[i])
require.Equal(t, text, texts[i])
}
}
252 changes: 252 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/require"
"io"
"math"
"math/big"
Expand Down Expand Up @@ -3288,3 +3289,254 @@ func TestQuery_NamedValues(t *testing.T) {
t.Fatal(err)
}
}

func TestQuery_WithNowInSeconds(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Query now in seconds are only available on protocol >= 5")
}

if err := createTable(session, `CREATE TABLE IF NOT EXISTS query_now_in_seconds (id int primary key, val text)`); err != nil {
t.Fatal(err)
}

err := session.Query("INSERT INTO query_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val").
WithNowInSeconds(int(0)).
Exec()
if err != nil {
t.Fatal(err)
}

var remainingTTL int
err = session.Query(`SELECT TTL(val) FROM query_now_in_seconds WHERE id = ?`, 1).
WithNowInSeconds(10).
Scan(&remainingTTL)
if err != nil {
t.Fatal(err)
}

require.Equal(t, remainingTTL, 10)
}

func TestQuery_SetKeyspace(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("keyspace for QUERY message is not supported in protocol < 5")
}

const keyspaceStmt = `
CREATE KEYSPACE IF NOT EXISTS gocql_query_keyspace_override_test
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
`

err := session.Query(keyspaceStmt).Exec()
if err != nil {
t.Fatal(err)
}

err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_query_keyspace_override_test.query_keyspace(id int, value text, PRIMARY KEY (id))")
if err != nil {
t.Fatal(err)
}

expectedID := 1
expectedText := "text"

// Testing PREPARE message
err = session.Query("INSERT INTO gocql_query_keyspace_override_test.query_keyspace (id, value) VALUES (?, ?)", expectedID, expectedText).Exec()
if err != nil {
t.Fatal(err)
}

var (
id int
text string
)

q := session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace").
SetKeyspace("gocql_query_keyspace_override_test")
err = q.Scan(&id, &text)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedID, id)
require.Equal(t, expectedText, text)

// Testing QUERY message
id = 0
text = ""

q = session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace").
SetKeyspace("gocql_query_keyspace_override_test")
q.skipPrepare = true
err = q.Scan(&id, &text)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedID, id)
require.Equal(t, expectedText, text)
}

func TestLargeSizeQuery(t *testing.T) {
// TestLargeSizeQuery runs a query bigger than the max allowed size of the payload of a frame,
// so it should be sent as 2 different frames where each contains a self-contained bit set to zero.

session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.large_size_query(id int, text_col text, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

longString := strings.Repeat("a", 500_000)

err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec()
if err != nil {
t.Fatal(err)
}

var result string
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
if err != nil {
t.Fatal(err)
}

require.Equal(t, longString, result)
}

func TestQueryCompressionNotWorthIt(t *testing.T) {
// TestQueryCompressionNotWorthIt runs a query that is not likely to be compressed efficiently
// (uncompressed payload size > compressed payload size).
// So, it should send a Compressed Frame where:
// 1. Compressed length is set to the length of the uncompressed payload;
// 2. Uncompressed length is set to zero;
// 3. Payload is the uncompressed payload.

session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.compression_now_worth_it(id int, text_col text, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

str := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()_+"
err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", str).Exec()
if err != nil {
t.Fatal(err)
}

var result string
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
if err != nil {
t.Fatal(err)
}

require.Equal(t, str, result)
joao-r-reis marked this conversation as resolved.
Show resolved Hide resolved
}

func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
// This test ensures that the whole Metadata_changed flow
// is handled properly.
//
// To trigger C* to return Metadata_changed we should do:
// 1. Create a table
// 2. Prepare stmt which uses the created table
// 3. Change the table schema in order to affect prepared stmt (e.g. add a column)
// 4. Execute prepared stmt. As a result C* should return RESULT/ROWS response with
// Metadata_changed flag, new metadata id and updated metadata resultset.
//
// The driver should handle this by updating its prepared statement inside the cache
// when it receives RESULT/ROWS with Metadata_changed flag
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Metadata_changed mechanism is only available in proto > 4")
}

if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.metadata_changed(id int, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

err := session.Query("INSERT INTO gocql_test.metadata_changed (id) VALUES (?)", 1).Exec()
if err != nil {
t.Fatal(err)
}

// We have to specify conn for all queries to ensure that
// all queries are running on the same node
conn := session.getConn()

const selectStmt = "SELECT * FROM gocql_test.metadata_changed"
queryBeforeTableAltering := session.Query(selectStmt)
queryBeforeTableAltering.conn = conn
row := make(map[string]interface{})
err = queryBeforeTableAltering.MapScan(row)
if err != nil {
t.Fatal(err)
}

require.Len(t, row, 1, "Expected to retrieve a single column")
stmtCacheKey := session.stmtsLRU.keyFor(conn.host.HostID(), conn.currentKeyspace, queryBeforeTableAltering.stmt)
inflight, _ := session.stmtsLRU.get(stmtCacheKey)
preparedStatementBeforeTableAltering := inflight.preparedStatment

// Changing table schema in order to cause C* to return RESULT/ROWS Metadata_changed
alteringTableQuery := session.Query("ALTER TABLE gocql_test.metadata_changed ADD new_col int")
alteringTableQuery.conn = conn
err = alteringTableQuery.Exec()
if err != nil {
t.Fatal(err)
}

// Expecting C* will return RESULT/ROWS Metadata_changed
// and it will be properly handled
queryAfterTableAltering := session.Query(selectStmt)
queryAfterTableAltering.conn = conn
row = make(map[string]interface{})
err = queryAfterTableAltering.MapScan(row)
if err != nil {
t.Fatal(err)
}

// Ensuring if cache contains updated prepared statement
require.Len(t, row, 2, "Expected to retrieve both columns")
inflight, _ = session.stmtsLRU.get(stmtCacheKey)
preparedStatementAfterTableAltering := inflight.preparedStatment
require.NotEqual(t, preparedStatementBeforeTableAltering.resultMetadataID, preparedStatementAfterTableAltering.resultMetadataID)
require.NotEqual(t, preparedStatementBeforeTableAltering.response, preparedStatementAfterTableAltering.response)

// Executing prepared stmt and expecting that C* won't return
// Metadata_changed because the table is not being changed.
// Running query with timeout to ensure there is no deadlocks.
// However, it doesn't 100% proves that there is a deadlock...
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30000)
defer cancel()

queryAfterTableAltering2 := session.Query(selectStmt).WithContext(ctx)
queryAfterTableAltering2.conn = conn
row = make(map[string]interface{})
err = queryAfterTableAltering2.MapScan(row)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
t.Fatal("It is likely failed due deadlock")
}
t.Fatal(err)
}

// Ensuring metadata of prepared stmt is not changed
require.Len(t, row, 2, "Expected to retrieve both columns")
inflight, _ = session.stmtsLRU.get(stmtCacheKey)
preparedStatementAfterTableAltering2 := inflight.preparedStatment
require.Equal(t, preparedStatementAfterTableAltering.resultMetadataID, preparedStatementAfterTableAltering2.resultMetadataID)
require.Equal(t, preparedStatementAfterTableAltering.response, preparedStatementAfterTableAltering2.response)
}
Loading