From da035fabd3217d9386edd5067bb8698eb3f12a91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Thu, 18 Jan 2024 15:15:14 +0000 Subject: [PATCH] Fix ignored nodetool stop ANTICOMPACTION (cherry picked from commit fad831ec2e531dd92a6903e447773374f9116897) --- .../db/compaction/CompactionManager.java | 30 ++++---- .../PendingAntiCompactionBytemanTest.java | 68 +++++++++++++++++++ 2 files changed, 85 insertions(+), 13 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index fbd67d3e7ed5..22630819f219 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1833,27 +1833,25 @@ else if (transChecker.test(token)) pendingRepair); return fullSSTables.size() + transSSTables.size() + unrepairedSSTables.size(); } - catch (Throwable e) + catch (CompactionInterruptedException e) { - if (e instanceof CompactionInterruptedException) + if (isCancelled.getAsBoolean()) { - if (isCancelled.getAsBoolean()) - { - logger.info("Anticompaction has been canceled for session {}", pendingRepair); - logger.trace(e.getMessage(), e); - } - else - { - logger.info("Anticompaction for session {} has been stopped by request.", pendingRepair); - } + logger.info("Anticompaction has been canceled for session {}", pendingRepair); + logger.trace(e.getMessage(), e); } else { - JVMStabilityInspector.inspectThrowable(e); - logger.error("Error anticompacting " + txn + " for " + pendingRepair, e); + logger.info("Anticompaction for session {} has been stopped by request.", pendingRepair); } throw e; } + catch (Throwable e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.error("Error anticompacting " + txn + " for " + pendingRepair, e); + throw e; + } } @VisibleForTesting @@ -1885,6 +1883,12 @@ public OperationProgress getProgress() return compaction.getProgress(); } + @Override + public void stop() + { + compaction.stop(); + } + @Override public boolean isStopRequested() { diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java index 795c7fe82c47..47470090fda0 100644 --- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java +++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java @@ -20,26 +20,40 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import com.google.common.collect.Lists; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.assertj.core.api.Assertions; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -97,4 +111,58 @@ private static RangesAtEndpoint atEndpoint(Collection> full, Collec return builder.build(); } + + @BMRules(rules = { @BMRule(name = "Abort anti-compaction after first call to onOperationStart", + targetClass = "CompactionManager", + targetMethod = "antiCompactGroup", + condition = "not flagged(\"done\")", + targetLocation = "AFTER INVOKE compactionRateLimiterAcquire", + action = "org.apache.cassandra.db.compaction.CompactionManager.instance.stopCompaction(\"ANTICOMPACTION\");") } ) + @Test + public void testStopAntiCompaction() + { + Assert.assertSame(ByteOrderedPartitioner.class, DatabaseDescriptor.getPartitioner().getClass()); + cfs.disableAutoCompaction(); + + // create 2 sstables, one that will be split, and another that will be moved + for (int i = 0; i < 10; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); + } + cfs.forceBlockingFlush(UNIT_TESTS); + for (int i = 10; i < 20; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); + } + cfs.forceBlockingFlush(UNIT_TESTS); + + assertEquals(2, cfs.getLiveSSTables().size()); + assertEquals(0, cfs.getLiveSSTables().stream().filter(SSTableReader::isPendingRepair).count()); + + Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes(5)); + Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes(15)); + List> ranges = Collections.singletonList(new Range<>(left, right)); + List tables = Collections.singletonList(cfs); + + // create a repair session so the anti-compaction job can find it + TimeUUID sessionID = TimeUUID.Generator.nextTimeUUID(); + ActiveRepairService.instance().registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), tables, ranges, true, 1, true, PreviewKind.NONE); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try + { + PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, tables, atEndpoint(ranges, NO_RANGES), executor, () -> false); + Future future = pac.run(); + Assertions.assertThatThrownBy(future::get) + .hasCauseInstanceOf(CompactionInterruptedException.class) + .hasMessageContaining("Compaction interrupted"); + } + finally + { + executor.shutdown(); + } + + assertEquals(2, cfs.getLiveSSTables().size()); + assertEquals(0, cfs.getLiveSSTables().stream().filter(SSTableReader::isPendingRepair).count()); + } }