From be2bc58cc4cb6f3fb314e96171043edb0f993dc9 Mon Sep 17 00:00:00 2001 From: "mykyta.oleksiienko" Date: Mon, 28 Oct 2024 17:15:18 +0200 Subject: [PATCH] consistency serial was added --- cluster.go | 2 +- frame.go | 58 +++++++++++++++++++----------------------------------- session.go | 18 +++++++++++++---- 3 files changed, 35 insertions(+), 43 deletions(-) diff --git a/cluster.go b/cluster.go index 13e62f3b0..74a0c26f1 100644 --- a/cluster.go +++ b/cluster.go @@ -150,7 +150,7 @@ type ClusterConfig struct { // Consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL. // Default: unset - SerialConsistency SerialConsistency + SerialConsistency Consistency // SslOpts configures TLS use when HostDialer is not set. // SslOpts is ignored if HostDialer is set. diff --git a/frame.go b/frame.go index d374ae574..d2f6ed646 100644 --- a/frame.go +++ b/frame.go @@ -192,6 +192,9 @@ const ( type Consistency uint16 +// SerialConsistency is deprecated. Use Consistency instead. +type SerialConsistency = Consistency + const ( Any Consistency = 0x00 One Consistency = 0x01 @@ -202,6 +205,8 @@ const ( LocalQuorum Consistency = 0x06 EachQuorum Consistency = 0x07 LocalOne Consistency = 0x0A + Serial Consistency = 0x08 + LocalSerial Consistency = 0x09 ) func (c Consistency) String() string { @@ -224,6 +229,10 @@ func (c Consistency) String() string { return "EACH_QUORUM" case LocalOne: return "LOCAL_ONE" + case Serial: + return "SERIAL" + case LocalSerial: + return "LOCAL_SERIAL" default: return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c)) } @@ -253,6 +262,10 @@ func (c *Consistency) UnmarshalText(text []byte) error { *c = EachQuorum case "LOCAL_ONE": *c = LocalOne + case "SERIAL": + *c = Serial + case "LOCAL_SERIAL": + *c = LocalSerial default: return fmt.Errorf("invalid consistency %q", string(text)) } @@ -260,6 +273,10 @@ func (c *Consistency) UnmarshalText(text []byte) error { return nil } +func (c Consistency) IsSerial() bool { + return c == Serial || c == LocalSerial + +} func ParseConsistency(s string) Consistency { var c Consistency if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil { @@ -286,41 +303,6 @@ func MustParseConsistency(s string) (Consistency, error) { return c, nil } -type SerialConsistency uint16 - -const ( - Serial SerialConsistency = 0x08 - LocalSerial SerialConsistency = 0x09 -) - -func (s SerialConsistency) String() string { - switch s { - case Serial: - return "SERIAL" - case LocalSerial: - return "LOCAL_SERIAL" - default: - return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s)) - } -} - -func (s SerialConsistency) MarshalText() (text []byte, err error) { - return []byte(s.String()), nil -} - -func (s *SerialConsistency) UnmarshalText(text []byte) error { - switch string(text) { - case "SERIAL": - *s = Serial - case "LOCAL_SERIAL": - *s = LocalSerial - default: - return fmt.Errorf("invalid consistency %q", string(text)) - } - - return nil -} - const ( apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal." ) @@ -1452,7 +1434,7 @@ type queryParams struct { values []queryValues pageSize int pagingState []byte - serialConsistency SerialConsistency + serialConsistency Consistency // v3+ defaultTimestamp bool defaultTimestampValue int64 @@ -1541,7 +1523,7 @@ func (f *framer) writeQueryParams(opts *queryParams) { } if opts.serialConsistency > 0 { - f.writeConsistency(Consistency(opts.serialConsistency)) + f.writeConsistency(opts.serialConsistency) } if f.proto > protoVersion2 && opts.defaultTimestamp { @@ -1653,7 +1635,7 @@ type writeBatchFrame struct { consistency Consistency // v3+ - serialConsistency SerialConsistency + serialConsistency Consistency defaultTimestamp bool defaultTimestampValue int64 diff --git a/session.go b/session.go index a600b95f3..b46ab9434 100644 --- a/session.go +++ b/session.go @@ -144,6 +144,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) { return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.") } + if cfg.SerialConsistency > 0 && !cfg.SerialConsistency.IsSerial() { + return nil, fmt.Errorf("the default SerialConsistency level is not allowed to be anything else but SERIAL or LOCAL_SERIAL. Recived value: %v", cfg.SerialConsistency) + } + // TODO: we should take a context in here at some point ctx, cancel := context.WithCancel(context.TODO()) @@ -915,7 +919,7 @@ type Query struct { rt RetryPolicy spec SpeculativeExecutionPolicy binding func(q *QueryInfo) ([]interface{}, error) - serialCons SerialConsistency + serialCons Consistency defaultTimestamp bool defaultTimestampValue int64 disableSkipMetadata bool @@ -1264,7 +1268,10 @@ func (q *Query) Bind(v ...interface{}) *Query { // either SERIAL or LOCAL_SERIAL and if not present, it defaults to // SERIAL. This option will be ignored for anything else that a // conditional update/insert. -func (q *Query) SerialConsistency(cons SerialConsistency) *Query { +func (q *Query) SerialConsistency(cons Consistency) *Query { + if !cons.IsSerial() { + panic("Serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String()) + } q.serialCons = cons return q } @@ -1735,7 +1742,7 @@ type Batch struct { trace Tracer observer BatchObserver session *Session - serialCons SerialConsistency + serialCons Consistency defaultTimestamp bool defaultTimestampValue int64 context context.Context @@ -1914,7 +1921,10 @@ func (b *Batch) Size() int { // conditional update/insert. // // Only available for protocol 3 and above -func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch { +func (b *Batch) SerialConsistency(cons Consistency) *Batch { + if !cons.IsSerial() { + panic("Serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String()) + } b.serialCons = cons return b }