Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] 테스트 구현 및 Cluster 구성 변경 - #275 #276

Merged
merged 5 commits into from
Sep 18, 2024

Conversation

rlarlgnszx
Copy link
Member

🔥Pull requests

⛳️ 작업한 브랜치

👷 작업한 내용

    1. Test
    1. StreamListener Container가 redis 연결이 끊길경우 재구독 설정
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> createStreamSubscription(
            String streamKey, String consumerGroup, String consumerName,
            StreamListener<String, MapRecord<String, String, String>> eventListener) {

        StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1L))
                .errorHandler(e -> {
                    log.error("Error in listener: {}", e.getMessage());
                    restartSubscription(streamKey, consumerGroup, consumerName, eventListener);
                }).build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                StreamMessageListenerContainer.create(redisConnectionFactoryForCluster, containerOptions);

        container.register(
                StreamMessageListenerContainer.StreamReadRequest.builder(
                                StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
                        .cancelOnError(t -> true) // 오류 발생 시 구독 취소
                        .consumer(Consumer.from(consumerGroup, consumerName))
                        .autoAcknowledge(true)
                        .build(), eventListener);

        container.start();
        log.info("Listener container started for stream: {}", streamKey);
        return container;
    }

    private void restartSubscription(String streamKey, String consumerGroup, String consumerName,
                                     StreamListener<String, MapRecord<String, String, String>> eventListener) {
        scheduler.schedule(() -> {
            log.info("Restarting subscription for stream: {}", streamKey);
            stopContainer(streamKey);
            createStreamSubscription(streamKey, consumerGroup, consumerName, eventListener).start();
        }, 5, TimeUnit.SECONDS); // 일정 시간 후 재시작
    }

    private void stopContainer(String streamKey) {
        if ("coursePoint".equals(streamKey) && pointListenerContainer != null && pointListenerContainer.isRunning()) {
            pointListenerContainer.stop();
            log.info("Stopped point listener container");
        }
        if ("courseFree".equals(streamKey) && freeListenerContainer != null && freeListenerContainer.isRunning()) {
            freeListenerContainer.stop();
            log.info("Stopped free listener container");
        }
    }

    @PreDestroy
    public void onDestroy() {
        stopContainer("coursePoint");
        stopContainer("courseFree");
        scheduler.shutdown();
        log.info("All listener containers stopped and scheduler shutdown.");
    }
    1. Image Service 로직 Executor를 통한 VirtualThread 명시화
@Transactional
    public List<Image> saveImages(final List<MultipartFile> images, final Course course) {
        List<Image> savedImages = new ArrayList<>();
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            List<CompletableFuture<Void>> futures = IntStream.range(0, images.size())
                    .mapToObj(index -> CompletableFuture.runAsync(() -> {
                        try {
                            String imagePath = s3Service.uploadImage(path, images.get(index));
                            Image newImage = Image.create(
                                    course,
                                    cachePath + imagePath,
                                    index + 1
                            );
                            savedImages.add(newImage);
                        } catch (IOException e) {
                            throw new BadRequestException(FailureCode.BAD_REQUEST);
                        }
                    }, executor))
                    .toList();
            // Wait for all tasks to complete
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
            savedImages.sort(Comparator.comparing(Image::getSequence));
            imageRepository.saveAll(savedImages);
            executor.shutdown(); // Shutdown the ExecutorService
        }
        return savedImages;
    }

🚨 참고 사항

📟 관련 이슈

@rlarlgnszx rlarlgnszx merged commit 69ab9c3 into develop Sep 18, 2024
1 check passed
@rlarlgnszx rlarlgnszx self-assigned this Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEAT] 테스트 구현 및 Cluster 구성 변경
1 participant