-
Notifications
You must be signed in to change notification settings - Fork 622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proto v5 support was added #1773
Conversation
7dba62a
to
31b4ac8
Compare
31b4ac8
to
bb416bb
Compare
bb416bb
to
148bca6
Compare
@@ -660,6 +673,16 @@ func (c *Conn) heartBeat(ctx context.Context) { | |||
} | |||
|
|||
func (c *Conn) recv(ctx context.Context) error { | |||
// If native proto v5+ is used and conn is set up, then we should | |||
// unwrap payload body from v5 compressed/uncompressed frame | |||
if c.version > protoVersion4 && c.connReady { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe revert the condition. If connection has not been initialised, we should not relay on protocol version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly you meant to change the order. Could you please add some details here?
if c.startupCompleted && c.version > protoVersion4 {
...
}
@@ -215,6 +216,14 @@ type Conn struct { | |||
host *HostInfo | |||
isSchemaV2 bool | |||
|
|||
// Only for proto v5+. | |||
// Indicates if Conn is ready to use Native Protocol V5. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag indicates that STARTUP
message has been completed, not that we can use V5. Please fix the comment, because it can be misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I changed the comment and renamed the flag to startupCompleted
.
// Only for proto v5+.
// Indicates if STARTUP has been completed.
// github.com/apache/cassandra/blob/trunk/doc/native_protocol_v5.spec
// 2.3.1 Initial Handshake
// In order to support both v5 and earlier formats, the v5 framing format is not
// applied to message exchanges before an initial handshake is completed.
startupCompleted bool
params: params, | ||
customPayload: qry.customPayload, | ||
preparedID: info.id, | ||
preparedMetadataID: info.metadataID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename the field to result_metadata_id
. preparedMetadataID
can be misleading.
customPayload: qry.customPayload, | ||
preparedID: info.id, | ||
preparedMetadataID: info.metadataID, | ||
params: params, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resultRowsFrame
does not handle new_metadata_id
and does not propagate it to next iterator calls.
<new_metadata_id> is [short bytes] representing the new, changed resultset
metadata. The new metadata ID must also be used in subsequent executions of
the corresponding prepared statement, if any.
This is needed to solve CASSANDRA-10786.
request preparedMetadata | ||
response resultMetadata | ||
id []byte | ||
metadataID []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change the name to result_metadata_id
as in the protocol not to create confusion.
return nil, fmt.Errorf("payload length (%d) exceeds maximum size of 128 KiB", payloadLen) | ||
} | ||
|
||
header := make([]byte, headerSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we here not create header
array and then copy it over to frame
, but instead just write to frame
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I moved the frame buffer initializing to allow the header to be written directly into the frame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been addressed in #1822.
// 2.2 | ||
// An uncompressed length of 0 signals that the compressed payload | ||
// should be used as-is and not decompressed. | ||
compressedPayload = uncompressedPayload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestLargeSizeQuery
will generate random text, which LZ4 compressor most likely will not encode to smaller representation. Create a test to make sure that LZ4 compression is applied and we do not fall under this condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I added a test that covers this case as well.
return buf.Bytes(), nil | ||
} | ||
|
||
func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are missing tests to simulate various frame errors here, e.g. invalid length encoded in the frame, invalid CRC. Tests could use our frame building logic for a simple EXECUTE or QUERY, change payload and then make sure decoding fails with certain error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree and added tests for readCompressedFrame
and readUncompressedFrame
.
@@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { | |||
params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) | |||
|
|||
frame = &writeExecuteFrame{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For BATCH message we are missing keyspace field.
0x0080: With keyspace. If set, <keyspace> must be present. <keyspace> is a
[string] indicating the keyspace that the query should be executed in.
It supercedes the keyspace that the connection is bound to, if any.
@@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { | |||
params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) | |||
|
|||
frame = &writeExecuteFrame{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field <now_in_seconds>
has not been added as per below quote and flags bitmap not updated.
Added now_in_seconds field in QUERY, EXECUTE, and BATCH messages (Sections 4.1.4, 4.1.6, and 4.1.7).
Thanks a lot for the feedback! Most of the issues mentioned in this PR have been covered in my other PRs related to proto 5 support. What do you think about continuing the discussion in PR #1822 which contains changes from all others? I tried to segregate different features on different PRs for easy review. However, it doesn't seem to be as easy as I thought. @lukasz-antoniak What are your thoughts on this? |
Can this be closed in favor of #1822 ? |
The pull request involves adding support for Protocol Version 5 (Proto v5) and addresses several key issues related to compatibility and the implementation of new features introduced in Cassandra's native protocol.
Key Issues and Changes in the PR:
New Frame Format Support. The PR implements changes to support the new frame format introduced in Proto v5. This format includes modifications that are not backward-compatible with previous versions, requiring updates to the driver's handling of message frames to interpret and process the protocol's data streams correctly. Also, this PR added a logic to process the CRC calculations that ensures reliable communication between the driver and Cassandra nodes using Proto v5.
Backward Compatibility. Ensuring backward compatibility with older protocol versions while implementing Proto v5 was a significant focus. These changes ensure that the driver works correctly with the latest Cassandra versions, particularly those utilizing the newer features and optimizations of Proto v5.