Skip to content

Commit

Permalink
STAR-1903: ConsistencyLevel.THREE means ALL_BUT_ONE (#943)
Browse files Browse the repository at this point in the history
Add a system property (dse.consistency_level.three_means_all_but_one)
that changes the behavior of ConsistencyLevel.THREE to mean
ALL_BUT_ONE, i.e. that all replicas except for at most one in the
cluster (across all DCs) must accept the write for it to be successful.

(cherry picked from commit 9875081)
  • Loading branch information
szymon-miezal authored and djatnieks committed Mar 29, 2024
1 parent da035fa commit c1c1fe6
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,9 @@ public enum CassandraRelevantProperties
TEST_UTIL_ALLOW_TOOL_REINIT_FOR_TEST("org.apache.cassandra.tools.UtilALLOW_TOOL_REINIT_FOR_TEST"),
/** Activate write survey mode. The node not becoming an active ring member, and you must use JMX StorageService->joinRing() to finalize the ring joining. */
TEST_WRITE_SURVEY("cassandra.write_survey"),
// Changes the semantic of the "THREE" consistency level to mean "all but one"
// i.e. that all replicas except for at most one in the cluster (across all DCs) must accept the write for it to be successful.
THREE_MEANS_ALL_BUT_ONE("dse.consistency_level.three_means_all_but_one", "false"),
TOLERATE_SSTABLE_SIZE("cassandra.tolerate_sstable_size"),
TRIGGERS_DIR("cassandra.triggers_dir"),
TRUNCATE_BALLOT_METADATA("cassandra.truncate_ballot_metadata"),
Expand Down
23 changes: 23 additions & 0 deletions src/java/org/apache/cassandra/db/ConsistencyLevel.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;

import com.carrotsearch.hppc.ObjectIntHashMap;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InOurDc;
import org.apache.cassandra.schema.TableMetadata;
Expand Down Expand Up @@ -51,6 +52,8 @@ public enum ConsistencyLevel
LOCAL_ONE (10, true),
NODE_LOCAL (11, true);

public static final boolean THREE_MEANS_ALL_BUT_ONE = CassandraRelevantProperties.THREE_MEANS_ALL_BUT_ONE.getBoolean();

// Used by the binary protocol
public final int code;
private final boolean isDCLocal;
Expand Down Expand Up @@ -87,6 +90,16 @@ public static ConsistencyLevel fromCode(int code)
return codeIdx[code];
}

@Override
public String toString()
{
if (this == THREE && THREE_MEANS_ALL_BUT_ONE)
{
return "THREE (ALL_BUT_ONE)";
}
return super.toString();
}

public static ConsistencyLevel fromString(String str)
{
return valueOf(str.toUpperCase(Locale.US));
Expand All @@ -97,6 +110,12 @@ public static int quorumFor(AbstractReplicationStrategy replicationStrategy)
return (replicationStrategy.getReplicationFactor().allReplicas / 2) + 1;
}

static int allButOneFor(AbstractReplicationStrategy replicationStrategy)
{
int rf = replicationStrategy.getReplicationFactor().fullReplicas;
return rf <= 1 ? rf : rf - 1;
}

public static int localQuorumFor(AbstractReplicationStrategy replicationStrategy, String dc)
{
return (replicationStrategy instanceof NetworkTopologyStrategy)
Expand Down Expand Up @@ -146,6 +165,10 @@ public int blockFor(AbstractReplicationStrategy replicationStrategy)
case TWO:
return 2;
case THREE:
if (THREE_MEANS_ALL_BUT_ONE)
{
return allButOneFor(replicationStrategy);
}
return 3;
case QUORUM:
case SERIAL:
Expand Down
180 changes: 180 additions & 0 deletions test/unit/org/apache/cassandra/db/ConsistencyLevelTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db;

import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.EverywhereStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.utils.Pair;

import static org.junit.Assert.*;

public class ConsistencyLevelTest
{
private static final String KS = "test";
private static final Map<String, String> RACK = new HashMap<>(), DATACENTER = new HashMap<>();
private static final IEndpointSnitch SNITCH = new AbstractNetworkTopologySnitch()
{
@Override
public String getRack(InetAddressAndPort endpoint)
{
return RACK.getOrDefault(endpoint.getHostAddress(false), "RC1");
}

@Override
public String getDatacenter(InetAddressAndPort endpoint)
{
return DATACENTER.getOrDefault(endpoint.getHostAddress(false), "DC1");
}
};

@BeforeClass
public static void setSnitch()
{
DatabaseDescriptor.daemonInitialization();
DatabaseDescriptor.setEndpointSnitch(SNITCH);
}

@AfterClass
public static void resetSnitch()
{
DatabaseDescriptor.setEndpointSnitch(null);
}

@After
public void resetSnitchState()
{
RACK.clear();
DATACENTER.clear();
}

@Test
public void allButOne_shouldBe_2_forReplicationFactor_3()
{
testAllButOne(simpleStrategy(3), 2);
}

@Test
public void allButOne_shouldBe_1_forReplicationFactor_2()
{
testAllButOne(simpleStrategy(2), 1);
}

@Test
public void allButOne_shouldBe_1_forReplicationFactor_1()
{
testAllButOne(simpleStrategy(1), 1);
}

@Test
public void allButOne_shouldBe_1_forLocalStrategy()
{
testAllButOne(localStrategy(), 1);
}

@Test
public void allButOne_shouldBe_8_forReplicationFactor_3_3_3()
{
testAllButOne(networkTopologyStrategy(3, 3, 3), 8);
}

@Test
public void allButOne_shouldBe_11_forEverywhereStrategyOnClusterOf_12() throws Exception
{
testAllButOne(everywhereStrategy(
dc(1, Pair.create("192.168.0.1", "A"), Pair.create("192.168.0.2", "E"), Pair.create("192.168.0.3", "H"),
Pair.create("192.168.0.4", "C"), Pair.create("192.168.0.5", "I"), Pair.create("192.168.0.6", "J")),
dc(2, Pair.create("192.168.1.1", "B"), Pair.create("192.168.1.2", "G"), Pair.create("192.168.1.3", "L"),
Pair.create("192.168.1.4", "D"), Pair.create("192.168.1.5", "F"), Pair.create("192.168.1.6", "K"))),
11);
}

private void testAllButOne(AbstractReplicationStrategy replicationStrategy, int expected)
{
// when
int blockFor = ConsistencyLevel.allButOneFor(replicationStrategy);

// then
assertEquals("number of nodes to block for", expected, blockFor);
}

private static NetworkTopologyStrategy networkTopologyStrategy(int... dc)
{
Map<String, String> config = new HashMap<>();
for (int i = 0; i < dc.length; i++)
{
config.put("DC" + i, Integer.toString(dc[i]));
}
return new NetworkTopologyStrategy(KS, new TokenMetadata(), SNITCH, config);
}

private static AbstractReplicationStrategy simpleStrategy(int replicationFactory)
{
Map<String, String> config = Collections.singletonMap("replication_factor", Integer.toString(replicationFactory));
return new SimpleStrategy(KS, new TokenMetadata(), SNITCH, config);
}

@SafeVarargs
private static AbstractReplicationStrategy everywhereStrategy(Multimap<InetAddressAndPort, Token>... dcs)
{
TokenMetadata metadata = new TokenMetadata();
for (Multimap<InetAddressAndPort, Token> dc : dcs)
{
metadata.updateNormalTokens(dc);
}
return new EverywhereStrategy(KS, metadata, SNITCH, Collections.emptyMap());
}

private static AbstractReplicationStrategy localStrategy()
{
return new LocalStrategy(KS, new TokenMetadata(), SNITCH, Collections.emptyMap());
}

private static Multimap<InetAddressAndPort, Token> dc(int id, Pair<String, String>... addressToken) throws UnknownHostException
{
Multimap<InetAddressAndPort, Token> dc = HashMultimap.create();
for (Pair<String, String> pair : addressToken)
{
DATACENTER.put(pair.left, "DC" + id);
dc.put(InetAddressAndPort.getByName(pair.left), new OrderPreservingPartitioner.StringToken(pair.right));
}
return dc;
}
}

0 comments on commit c1c1fe6

Please sign in to comment.