Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] 테스트 구현 및 Cluster 구성 변경 - #275 #276

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dateroad-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
runtimeOnly 'org.postgresql:postgresql'
//swagger
implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.6.0'
testImplementation 'io.github.autoparams:autoparams:8.3.0'
}

jar.enabled = false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.dateroad.common;

import io.lettuce.core.RedisConnectionException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.dateroad.code.FailureCode;
Expand Down Expand Up @@ -62,4 +63,13 @@ protected ResponseEntity<FailureResponse> handleException(final Exception e) {
final FailureResponse response = FailureResponse.of(FailureCode.INTERNAL_SERVER_ERROR, errors);
return new ResponseEntity<>(response, HttpStatus.INTERNAL_SERVER_ERROR);
}

@ExceptionHandler(RedisConnectionException.class)
protected ResponseEntity<FailureResponse> handleRedisConnectionException(final RedisConnectionException e) {
log.error(">>> handle: RedisConnectionException ", e);
String errorMessage = "Redis connection error: " + e.getMessage();
List<FailureResponse.FieldError> errors = FailureResponse.FieldError.of("RedisConnection", "", errorMessage);
final FailureResponse response = FailureResponse.of(FailureCode.REDIS_CONNECTION_ERROR, errors);
return new ResponseEntity<>(response, HttpStatus.SERVICE_UNAVAILABLE);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package org.dateroad.config;

import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.PolymorphicTypeValidator;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode.NodeFlag;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import java.time.Duration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
Expand All @@ -25,24 +23,20 @@
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@Configuration
@Slf4j
public class RedisClusterConfig {

@Value("${aws.ip}")
private String host;

@Value("${spring.data.redis.cluster.password}")
private String password;

private RedisConnectionFactory redisConnectionFactory;
@Bean
@Primary
public RedisConnectionFactory redisConnectionFactoryForCluster() {
if (this.redisConnectionFactory != null) {
return this.redisConnectionFactory;
}

RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration()
.clusterNode(host, 7001)
.clusterNode(host, 7002)
Expand All @@ -52,7 +46,7 @@ public RedisConnectionFactory redisConnectionFactoryForCluster() {
.clusterNode(host, 7006);
clusterConfig.setPassword(RedisPassword.of(password));
SocketOptions socketOptions = SocketOptions.builder()
.connectTimeout(Duration.ofSeconds(3L))
.connectTimeout(Duration.ofSeconds(5L))
.tcpNoDelay(true)
.keepAlive(true)
.build();
Expand All @@ -61,26 +55,28 @@ public RedisConnectionFactory redisConnectionFactoryForCluster() {
.builder()
.dynamicRefreshSources(true)
.enableAllAdaptiveRefreshTriggers()
.enablePeriodicRefresh(Duration.ofHours(1L))
.enablePeriodicRefresh() // 60초마다 refresh
.refreshTriggersReconnectAttempts(3) // 재연결 시도 후 갱신
.build();

ClusterClientOptions clusterClientOptions = ClusterClientOptions
.builder()
.pingBeforeActivateConnection(true)
.socketOptions(socketOptions)
.pingBeforeActivateConnection(true) // 연결 활성화 전에 ping
.autoReconnect(true)
.topologyRefreshOptions(clusterTopologyRefreshOptions)
.nodeFilter(it ->
!(it.is(NodeFlag.EVENTUAL_FAIL)
|| it.is(NodeFlag.FAIL)
|| it.is(NodeFlag.NOADDR)
|| it.is(NodeFlag.HANDSHAKE)))
.validateClusterNodeMembership(false)
.maxRedirects(5).build();
.nodeFilter(it ->
! (it.is(RedisClusterNode.NodeFlag.FAIL)
|| it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)
|| it.is(RedisClusterNode.NodeFlag.HANDSHAKE)
|| it.is(RedisClusterNode.NodeFlag.NOADDR)))
.maxRedirects(3).build();

final LettuceClientConfiguration clientConfig = LettuceClientConfiguration
.builder()
.readFrom(ReadFrom.REPLICA_PREFERRED)
.commandTimeout(Duration.ofSeconds(10L))
.commandTimeout(Duration.ofSeconds(5L)) // 명령 타임아웃 5초로 설정
.clientOptions(clusterClientOptions)
.build();

Expand All @@ -89,10 +85,10 @@ public RedisConnectionFactory redisConnectionFactoryForCluster() {
factory.setValidateConnection(false);
factory.setShareNativeConnection(true);

this.redisConnectionFactory = factory; // 재사용을 위해 저장
return factory;
}


@Bean
public RedisTemplate<String, String> redistemplateForCluster() {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
Expand All @@ -101,7 +97,6 @@ public RedisTemplate<String, String> redistemplateForCluster() {
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
// redisTemplate.setEnableTransactionSupport(true);
return redisTemplate;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package org.dateroad.config;


import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dateroad.point.event.FreeEventListener;
import org.dateroad.point.event.PointEventListener;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
@EnableScheduling
public class RedisStreamSubscriber {

private final PointEventListener pointEventListener;
private final FreeEventListener freeEventListener;
private final RedisTemplate<String, String> redistemplateForCluster;
private final RedisConnectionFactory redisConnectionFactoryForCluster;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private StreamMessageListenerContainer<String, MapRecord<String, String, String>> pointListenerContainer;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> freeListenerContainer;

@PostConstruct
public void createConsumer() {
createStreamConsumerGroup("coursePoint", "coursePointGroup");
createStreamConsumerGroup("courseFree", "courseFreeGroup");
}

public void createStreamConsumerGroup(final String streamKey, final String consumerGroupName) {
boolean streamExists = Boolean.TRUE.equals(redistemplateForCluster.hasKey(streamKey));
if (!streamExists) {
redistemplateForCluster.execute((RedisCallback<Void>) connection -> {
byte[] streamKeyBytes = streamKey.getBytes();
byte[] consumerGroupNameBytes = consumerGroupName.getBytes();
connection.execute("XGROUP", "CREATE".getBytes(), streamKeyBytes, consumerGroupNameBytes,
"0".getBytes(), "MKSTREAM".getBytes());
return null;
});
} else if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
redistemplateForCluster.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}

public boolean isStreamConsumerGroupExist(final String streamKey, final String consumerGroupName) {
return redistemplateForCluster
.opsForStream().groups(streamKey).stream()
.anyMatch(group -> group.groupName().equals(consumerGroupName));
}

@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> startPointListener() {
pointListenerContainer = createStreamSubscription(
"coursePoint", "coursePointGroup", "instance-1", pointEventListener
);
return pointListenerContainer;
}

@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> startFreeListener() {
freeListenerContainer = createStreamSubscription(
"courseFree", "courseFreeGroup", "instance-2", freeEventListener
);
return freeListenerContainer;
}

private StreamMessageListenerContainer<String, MapRecord<String, String, String>> createStreamSubscription(
String streamKey, String consumerGroup, String consumerName,
StreamListener<String, MapRecord<String, String, String>> eventListener) {

StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1L))
.errorHandler(e -> {
log.error("Error in listener: {}", e.getMessage());
restartSubscription(streamKey, consumerGroup, consumerName, eventListener);
}).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(redisConnectionFactoryForCluster, containerOptions);

container.register(
StreamMessageListenerContainer.StreamReadRequest.builder(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
.cancelOnError(t -> true) // 오류 발생 시 구독 취소
.consumer(Consumer.from(consumerGroup, consumerName))
.autoAcknowledge(true)
.build(), eventListener);

container.start();
log.info("Listener container started for stream: {}", streamKey);
return container;
}

private void restartSubscription(String streamKey, String consumerGroup, String consumerName,
StreamListener<String, MapRecord<String, String, String>> eventListener) {
scheduler.schedule(() -> {
log.info("Restarting subscription for stream: {}", streamKey);
stopContainer(streamKey);
createStreamSubscription(streamKey, consumerGroup, consumerName, eventListener).start();
}, 5, TimeUnit.SECONDS); // 일정 시간 후 재시작
}

private void stopContainer(String streamKey) {
if ("coursePoint".equals(streamKey) && pointListenerContainer != null && pointListenerContainer.isRunning()) {
pointListenerContainer.stop();
log.info("Stopped point listener container");
}
if ("courseFree".equals(streamKey) && freeListenerContainer != null && freeListenerContainer.isRunning()) {
freeListenerContainer.stop();
log.info("Stopped free listener container");
}
}

@PreDestroy
public void onDestroy() {
stopContainer("coursePoint");
stopContainer("courseFree");
scheduler.shutdown();
log.info("All listener containers stopped and scheduler shutdown.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@Getter
@Builder(access = AccessLevel.PRIVATE)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@AllArgsConstructor
public class CourseCreateReq {
@Size(min = 5)
private String title;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@Getter
@Builder(access = AccessLevel.PRIVATE)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@AllArgsConstructor
public class CoursePlaceGetReq {
private String title;
private float duration;
Expand Down
Loading
Loading