Skip to content

Commit

Permalink
consistency serial was added
Browse files Browse the repository at this point in the history
  • Loading branch information
OleksiienkoMykyta committed Nov 8, 2024
1 parent 953e0df commit 3662be4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 35 deletions.
51 changes: 16 additions & 35 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ const (

type Consistency uint16

type SerialConsistency = Consistency

const (
Any Consistency = 0x00
One Consistency = 0x01
Expand All @@ -202,6 +204,8 @@ const (
LocalQuorum Consistency = 0x06
EachQuorum Consistency = 0x07
LocalOne Consistency = 0x0A
Serial Consistency = 0x08
LocalSerial Consistency = 0x09
)

func (c Consistency) String() string {
Expand All @@ -224,6 +228,10 @@ func (c Consistency) String() string {
return "EACH_QUORUM"
case LocalOne:
return "LOCAL_ONE"
case Serial:
return "SERIAL"
case LocalSerial:
return "LOCAL_SERIAL"
default:
return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
}
Expand Down Expand Up @@ -253,13 +261,21 @@ func (c *Consistency) UnmarshalText(text []byte) error {
*c = EachQuorum
case "LOCAL_ONE":
*c = LocalOne
case "SERIAL":
*c = Serial
case "LOCAL_SERIAL":
*c = LocalSerial
default:
return fmt.Errorf("invalid consistency %q", string(text))
}

return nil
}

func (c Consistency) IsSerial() bool {
return c == Serial || c == LocalSerial

}
func ParseConsistency(s string) Consistency {
var c Consistency
if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil {
Expand All @@ -286,41 +302,6 @@ func MustParseConsistency(s string) (Consistency, error) {
return c, nil
}

type SerialConsistency uint16

const (
Serial SerialConsistency = 0x08
LocalSerial SerialConsistency = 0x09
)

func (s SerialConsistency) String() string {
switch s {
case Serial:
return "SERIAL"
case LocalSerial:
return "LOCAL_SERIAL"
default:
return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s))
}
}

func (s SerialConsistency) MarshalText() (text []byte, err error) {
return []byte(s.String()), nil
}

func (s *SerialConsistency) UnmarshalText(text []byte) error {
switch string(text) {
case "SERIAL":
*s = Serial
case "LOCAL_SERIAL":
*s = LocalSerial
default:
return fmt.Errorf("invalid consistency %q", string(text))
}

return nil
}

const (
apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
)
Expand Down
10 changes: 10 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
}

if cfg.Consistency.IsSerial() {
return nil, fmt.Errorf("the default consistency level is not allowed to be SERIAL or LOCAL_SERIAL. Recived value: %v", cfg.Consistency)
}

// TODO: we should take a context in here at some point
ctx, cancel := context.WithCancel(context.TODO())

Expand Down Expand Up @@ -1265,6 +1269,9 @@ func (q *Query) Bind(v ...interface{}) *Query {
// SERIAL. This option will be ignored for anything else that a
// conditional update/insert.
func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
if !cons.IsSerial() {
panic("Serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String())
}
q.serialCons = cons
return q
}
Expand Down Expand Up @@ -1915,6 +1922,9 @@ func (b *Batch) Size() int {
//
// Only available for protocol 3 and above
func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
if !cons.IsSerial() {
panic("Serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String())
}
b.serialCons = cons
return b
}
Expand Down

0 comments on commit 3662be4

Please sign in to comment.