diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 17b052f97a2e..77db9edbf319 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -387,7 +387,11 @@ public enum CassandraRelevantProperties CUSTOM_READ_OBSERVER_FACTORY("cassandra.custom_read_observer_factory_class"), // Allows skipping advising the OS to free cached pages associated with commitlog flushing - COMMITLOG_SKIP_FILE_ADVICE("cassandra.commitlog.skip_file_advice"); + COMMITLOG_SKIP_FILE_ADVICE("cassandra.commitlog.skip_file_advice"), + + // Changes the semantic of the "THREE" consistency level to mean "all but one" + // i.e. that all replicas except for at most one in the cluster (across all DCs) must accept the write for it to be successful. + THREE_MEANS_ALL_BUT_ONE("dse.consistency_level.three_means_all_but_one", "false"); CassandraRelevantProperties(String key, String defaultVal) { diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 8cbfb47a425c..9b1949ca4f1b 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import com.carrotsearch.hppc.ObjectIntHashMap; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.guardrails.Guardrails; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.schema.TableMetadata; @@ -50,6 +51,8 @@ public enum ConsistencyLevel LOCAL_ONE (10, true), NODE_LOCAL (11, true); + public static final boolean THREE_MEANS_ALL_BUT_ONE = CassandraRelevantProperties.THREE_MEANS_ALL_BUT_ONE.getBoolean(); + // Used by the binary protocol public final int code; private final boolean isDCLocal; @@ -79,6 +82,16 @@ private ConsistencyLevel(int code, boolean isDCLocal) this.isDCLocal = isDCLocal; } + @Override + public String toString() + { + if (this == THREE && THREE_MEANS_ALL_BUT_ONE) + { + return "THREE (ALL_BUT_ONE)"; + } + return super.toString(); + } + public static ConsistencyLevel fromString(String str) { return valueOf(str.toUpperCase(Locale.US)); @@ -96,6 +109,12 @@ public static int quorumFor(AbstractReplicationStrategy replicationStrategy) return (replicationStrategy.getReplicationFactor().allReplicas / 2) + 1; } + static int allButOneFor(AbstractReplicationStrategy replicationStrategy) + { + int rf = replicationStrategy.getReplicationFactor().fullReplicas; + return rf <= 1 ? rf : rf - 1; + } + public static int localQuorumFor(AbstractReplicationStrategy replicationStrategy, String dc) { return (replicationStrategy instanceof NetworkTopologyStrategy) @@ -145,6 +164,10 @@ public int blockFor(AbstractReplicationStrategy replicationStrategy) case TWO: return 2; case THREE: + if (THREE_MEANS_ALL_BUT_ONE) + { + return allButOneFor(replicationStrategy); + } return 3; case QUORUM: case SERIAL: diff --git a/test/unit/org/apache/cassandra/db/ConsistencyLevelTest.java b/test/unit/org/apache/cassandra/db/ConsistencyLevelTest.java new file mode 100644 index 000000000000..54626e73d5d6 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/ConsistencyLevelTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.OrderPreservingPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.EverywhereStrategy; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.*; + +public class ConsistencyLevelTest +{ + private static final String KS = "test"; + private static final Map RACK = new HashMap<>(), DATACENTER = new HashMap<>(); + private static final IEndpointSnitch SNITCH = new AbstractNetworkTopologySnitch() + { + @Override + public String getRack(InetAddressAndPort endpoint) + { + return RACK.getOrDefault(endpoint.getHostAddress(false), "RC1"); + } + + @Override + public String getDatacenter(InetAddressAndPort endpoint) + { + return DATACENTER.getOrDefault(endpoint.getHostAddress(false), "DC1"); + } + }; + + @BeforeClass + public static void setSnitch() + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setEndpointSnitch(SNITCH); + } + + @AfterClass + public static void resetSnitch() + { + DatabaseDescriptor.setEndpointSnitch(null); + } + + @After + public void resetSnitchState() + { + RACK.clear(); + DATACENTER.clear(); + } + + @Test + public void allButOne_shouldBe_2_forReplicationFactor_3() + { + testAllButOne(simpleStrategy(3), 2); + } + + @Test + public void allButOne_shouldBe_1_forReplicationFactor_2() + { + testAllButOne(simpleStrategy(2), 1); + } + + @Test + public void allButOne_shouldBe_1_forReplicationFactor_1() + { + testAllButOne(simpleStrategy(1), 1); + } + + @Test + public void allButOne_shouldBe_1_forLocalStrategy() + { + testAllButOne(localStrategy(), 1); + } + + @Test + public void allButOne_shouldBe_8_forReplicationFactor_3_3_3() + { + testAllButOne(networkTopologyStrategy(3, 3, 3), 8); + } + + @Test + public void allButOne_shouldBe_11_forEverywhereStrategyOnClusterOf_12() throws Exception + { + testAllButOne(everywhereStrategy( + dc(1, Pair.create("192.168.0.1", "A"), Pair.create("192.168.0.2", "E"), Pair.create("192.168.0.3", "H"), + Pair.create("192.168.0.4", "C"), Pair.create("192.168.0.5", "I"), Pair.create("192.168.0.6", "J")), + dc(2, Pair.create("192.168.1.1", "B"), Pair.create("192.168.1.2", "G"), Pair.create("192.168.1.3", "L"), + Pair.create("192.168.1.4", "D"), Pair.create("192.168.1.5", "F"), Pair.create("192.168.1.6", "K"))), + 11); + } + + private void testAllButOne(AbstractReplicationStrategy replicationStrategy, int expected) + { + // when + int blockFor = ConsistencyLevel.allButOneFor(replicationStrategy); + + // then + assertEquals("number of nodes to block for", expected, blockFor); + } + + private static NetworkTopologyStrategy networkTopologyStrategy(int... dc) + { + Map config = new HashMap<>(); + for (int i = 0; i < dc.length; i++) + { + config.put("DC" + i, Integer.toString(dc[i])); + } + return new NetworkTopologyStrategy(KS, new TokenMetadata(), SNITCH, config); + } + + private static AbstractReplicationStrategy simpleStrategy(int replicationFactory) + { + Map config = Collections.singletonMap("replication_factor", Integer.toString(replicationFactory)); + return new SimpleStrategy(KS, new TokenMetadata(), SNITCH, config); + } + + @SafeVarargs + private static AbstractReplicationStrategy everywhereStrategy(Multimap... dcs) + { + TokenMetadata metadata = new TokenMetadata(); + for (Multimap dc : dcs) + { + metadata.updateNormalTokens(dc); + } + return new EverywhereStrategy(KS, metadata, SNITCH, Collections.emptyMap()); + } + + private static AbstractReplicationStrategy localStrategy() + { + return new LocalStrategy(KS, new TokenMetadata(), SNITCH, Collections.emptyMap()); + } + + private static Multimap dc(int id, Pair... addressToken) throws UnknownHostException + { + Multimap dc = HashMultimap.create(); + for (Pair pair : addressToken) + { + DATACENTER.put(pair.left, "DC" + id); + dc.put(InetAddressAndPort.getByName(pair.left), new OrderPreservingPartitioner.StringToken(pair.right)); + } + return dc; + } +}