Skip to content

Commit

Permalink
Fix ignored nodetool stop ANTICOMPACTION
Browse files Browse the repository at this point in the history
(cherry picked from commit fad831e)
  • Loading branch information
adelapena authored and djatnieks committed Mar 29, 2024
1 parent 410c0ec commit da035fa
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 13 deletions.
30 changes: 17 additions & 13 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1833,27 +1833,25 @@ else if (transChecker.test(token))
pendingRepair);
return fullSSTables.size() + transSSTables.size() + unrepairedSSTables.size();
}
catch (Throwable e)
catch (CompactionInterruptedException e)
{
if (e instanceof CompactionInterruptedException)
if (isCancelled.getAsBoolean())
{
if (isCancelled.getAsBoolean())
{
logger.info("Anticompaction has been canceled for session {}", pendingRepair);
logger.trace(e.getMessage(), e);
}
else
{
logger.info("Anticompaction for session {} has been stopped by request.", pendingRepair);
}
logger.info("Anticompaction has been canceled for session {}", pendingRepair);
logger.trace(e.getMessage(), e);
}
else
{
JVMStabilityInspector.inspectThrowable(e);
logger.error("Error anticompacting " + txn + " for " + pendingRepair, e);
logger.info("Anticompaction for session {} has been stopped by request.", pendingRepair);
}
throw e;
}
catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
logger.error("Error anticompacting " + txn + " for " + pendingRepair, e);
throw e;
}
}

@VisibleForTesting
Expand Down Expand Up @@ -1885,6 +1883,12 @@ public OperationProgress getProgress()
return compaction.getProgress();
}

@Override
public void stop()
{
compaction.stop();
}

@Override
public boolean isStopRequested()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,40 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.assertj.core.api.Assertions;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;

import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -97,4 +111,58 @@ private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collec

return builder.build();
}

@BMRules(rules = { @BMRule(name = "Abort anti-compaction after first call to onOperationStart",
targetClass = "CompactionManager",
targetMethod = "antiCompactGroup",
condition = "not flagged(\"done\")",
targetLocation = "AFTER INVOKE compactionRateLimiterAcquire",
action = "org.apache.cassandra.db.compaction.CompactionManager.instance.stopCompaction(\"ANTICOMPACTION\");") } )
@Test
public void testStopAntiCompaction()
{
Assert.assertSame(ByteOrderedPartitioner.class, DatabaseDescriptor.getPartitioner().getClass());
cfs.disableAutoCompaction();

// create 2 sstables, one that will be split, and another that will be moved
for (int i = 0; i < 10; i++)
{
QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i);
}
cfs.forceBlockingFlush(UNIT_TESTS);
for (int i = 10; i < 20; i++)
{
QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i);
}
cfs.forceBlockingFlush(UNIT_TESTS);

assertEquals(2, cfs.getLiveSSTables().size());
assertEquals(0, cfs.getLiveSSTables().stream().filter(SSTableReader::isPendingRepair).count());

Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes(5));
Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes(15));
List<Range<Token>> ranges = Collections.singletonList(new Range<>(left, right));
List<ColumnFamilyStore> tables = Collections.singletonList(cfs);

// create a repair session so the anti-compaction job can find it
TimeUUID sessionID = TimeUUID.Generator.nextTimeUUID();
ActiveRepairService.instance().registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), tables, ranges, true, 1, true, PreviewKind.NONE);

ExecutorService executor = Executors.newSingleThreadExecutor();
try
{
PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, tables, atEndpoint(ranges, NO_RANGES), executor, () -> false);
Future<?> future = pac.run();
Assertions.assertThatThrownBy(future::get)
.hasCauseInstanceOf(CompactionInterruptedException.class)
.hasMessageContaining("Compaction interrupted");
}
finally
{
executor.shutdown();
}

assertEquals(2, cfs.getLiveSSTables().size());
assertEquals(0, cfs.getLiveSSTables().stream().filter(SSTableReader::isPendingRepair).count());
}
}

0 comments on commit da035fa

Please sign in to comment.