Skip to content

Commit

Permalink
Check for thread interrupt in subscribe process of PubSub (#3726)
Browse files Browse the repository at this point in the history
- adding thread interrupted check in `JedisShardedPubSubBase::process`
- also in `JedisPubSubBase::process`

Fixes #3725 

---------

Co-authored-by: charles.chang <[email protected]>
Co-authored-by: M Sazzadul Hoque <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2024
1 parent f86e1ba commit 65d431f
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/JedisPubSubBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void process() {
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());
} while (!Thread.currentThread().isInterrupted() && isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void process() {
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());
} while (!Thread.currentThread().isInterrupted() && isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redis.clients.jedis;

import junit.framework.TestCase;
import redis.clients.jedis.util.SafeEncoder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.ResponseKeyword.MESSAGE;
import static redis.clients.jedis.Protocol.ResponseKeyword.SUBSCRIBE;

public class JedisPubSubBaseTest extends TestCase {

public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
// setup
final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {

@Override
public void onMessage(String channel, String message) {
fail("this should not happen when thread is interrupted");
}

@Override
protected String encode(byte[] raw) {
return SafeEncoder.encode(raw);
}
};

final Connection mockConnection = mock(Connection.class);
final List<Object> mockSubscribe = Arrays.asList(
SUBSCRIBE.getRaw(), "channel".getBytes(), 1L
);
final List<Object> mockResponse = Arrays.asList(
MESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
);

when(mockConnection.getUnflushedObject()).

thenReturn(mockSubscribe, mockResponse);


final CountDownLatch countDownLatch = new CountDownLatch(1);
// action
final Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
pubSub.proceed(mockConnection, "channel");

countDownLatch.countDown();
});
thread.start();

assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));

}
}
56 changes: 56 additions & 0 deletions src/test/java/redis/clients/jedis/JedisShardedPubSubBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package redis.clients.jedis;

import junit.framework.TestCase;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.ResponseKeyword.SMESSAGE;
import static redis.clients.jedis.Protocol.ResponseKeyword.SSUBSCRIBE;

public class JedisShardedPubSubBaseTest extends TestCase {

public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
// setup
final JedisShardedPubSubBase<String> pubSub = new JedisShardedPubSubBase<String>() {

@Override
public void onSMessage(String channel, String message) {
fail("this should not happen when thread is interrupted");
}

@Override
protected String encode(byte[] raw) {
return new String(raw);
}

};

final Connection mockConnection = mock(Connection.class);
final List<Object> mockSubscribe = Arrays.asList(
SSUBSCRIBE.getRaw(), "channel".getBytes(), 1L
);
final List<Object> mockResponse = Arrays.asList(
SMESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
);
when(mockConnection.getUnflushedObject()).thenReturn(mockSubscribe, mockResponse);


final CountDownLatch countDownLatch = new CountDownLatch(1);
// action
final Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
pubSub.proceed(mockConnection, "channel");

countDownLatch.countDown();
});
thread.start();

assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));

}
}

0 comments on commit 65d431f

Please sign in to comment.