Skip to content

Commit

Permalink
[MERGE] 비동기 처리 리팩토링 - #307
Browse files Browse the repository at this point in the history
[REFACTOR] 비동기 처리 리팩토링 -  #307
  • Loading branch information
rlarlgnszx authored Oct 20, 2024
2 parents e95b524 + e64b170 commit 4fdef4f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package org.dateroad.course.service;

import static java.lang.Thread.startVirtualThread;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -17,6 +15,8 @@
import org.dateroad.exception.DateRoadException;
import org.dateroad.image.domain.Image;
import org.dateroad.image.service.ImageService;
import org.dateroad.point.event.MessageDto.FreeMessageDTO;
import org.dateroad.point.event.MessageDto.PointMessageDTO;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
Expand All @@ -32,7 +32,6 @@ public class AsyncService {
private final CourseTagService courseTagService;
private final ImageService imageService;
private final RedisTemplate<String, String> redistemplateForCluster;

public List<Image> createImage(final List<MultipartFile> images, final Course course) {
return imageService.saveImages(images, course);
}
Expand All @@ -46,13 +45,9 @@ public void createCoursePlace(final List<CoursePlaceGetReq> places, final Course
}

public void publishEvenUserPoint(final Long userId, PointUseReq pointUseReq) {
Map<String, String> fieldMap = new HashMap<>();
try {
fieldMap.put("userId", userId.toString());
fieldMap.put("point", String.valueOf(pointUseReq.getPoint()));
fieldMap.put("type", pointUseReq.getType().name());
fieldMap.put("description", pointUseReq.getDescription());
redistemplateForCluster.opsForStream().add("coursePoint", fieldMap);
PointMessageDTO pointMessage = PointMessageDTO.of(userId, pointUseReq);
redistemplateForCluster.opsForStream().add("coursePoint", pointMessage.toMap());
} catch (QueryTimeoutException e) {
log.error("Redis command timed out for userId: {} - Retrying...", userId, e);
throw new DateRoadException(FailureCode.REDIS_CONNECTION_ERROR);
Expand All @@ -63,10 +58,9 @@ public void publishEvenUserPoint(final Long userId, PointUseReq pointUseReq) {
}

public void publishEventUserFree(final Long userId) {
Map<String, String> fieldMap = new HashMap<>();
try {
fieldMap.put("userId", userId.toString());
redistemplateForCluster.opsForStream().add("courseFree", fieldMap);
FreeMessageDTO freeMessage = FreeMessageDTO.of(userId);
redistemplateForCluster.opsForStream().add("courseFree", freeMessage.toMap());
} catch (QueryTimeoutException e) {
log.error("Redis command timed out for userId: {} - Retrying...", userId, e);
throw new DateRoadException(FailureCode.REDIS_CONNECTION_ERROR);
Expand All @@ -77,22 +71,11 @@ public void publishEventUserFree(final Long userId) {
}

@Transactional
public void runAsyncTasks(List<CoursePlaceGetReq> places, List<TagCreateReq> tags,
Course saveCourse)
throws InterruptedException {
List<Thread> threads = new ArrayList<>();
final boolean[] hasError = {false}; // 에러 발생 여부 확인
threads.add(runAsyncTaskWithExceptionHandling(() -> {
createCoursePlace(places, saveCourse);
}, hasError));
threads.add(runAsyncTaskWithExceptionHandling(() -> {
createCourseTags(tags, saveCourse);
}, hasError));
for (Thread thread : threads) {
thread.join();
}
if (hasError[0]) {
throw new RuntimeException("코스 생성중 오류 발생"); // 예외 발생 시 전체 작업 실패 처리
public void runAsyncTasks(List<CoursePlaceGetReq> places, List<TagCreateReq> tags, Course saveCourse) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<Void> placeFuture = CompletableFuture.runAsync(() -> createCoursePlace(places, saveCourse), executor);
CompletableFuture<Void> tagFuture = CompletableFuture.runAsync(() -> createCourseTags(tags, saveCourse), executor);
CompletableFuture.allOf(placeFuture, tagFuture).join();
}
}

Expand All @@ -101,15 +84,4 @@ public String createCourseImages(List<MultipartFile> images, Course course) {
List<Image> imageList = createImage(images, course);
return imageList.getFirst().getImageUrl();
}

public Thread runAsyncTaskWithExceptionHandling(Runnable task, boolean[] hasError) {
return startVirtualThread(() -> {
try {
task.run();
} catch (Exception e) {
hasError[0] = true;
throw new DateRoadException(FailureCode.COURSE_CREATE_ERROR);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
import org.dateroad.course.dto.request.CoursePlaceGetReq;
import org.dateroad.course.dto.request.PointUseReq;
import org.dateroad.course.dto.request.TagCreateReq;
import org.dateroad.course.dto.response.*;
import org.dateroad.course.dto.response.CourseAccessGetAllRes;
import org.dateroad.course.dto.response.CourseCreateRes;
import org.dateroad.course.dto.response.CourseDtoGetRes;
import org.dateroad.course.dto.response.CourseGetAllRes;
import org.dateroad.course.dto.response.DateAccessCreateRes;
import org.dateroad.date.domain.Course;
import org.dateroad.date.dto.response.CourseGetDetailRes;
import org.dateroad.date.repository.CourseRepository;
import org.dateroad.dateAccess.domain.DateAccess;
import org.dateroad.dateAccess.repository.DateAccessRepository;
import org.dateroad.exception.ConflictException;
import org.dateroad.exception.DateRoadException;
import org.dateroad.exception.EntityNotFoundException;
import org.dateroad.exception.ForbiddenException;
import org.dateroad.image.domain.Image;
Expand All @@ -40,13 +43,13 @@
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.web.multipart.MultipartFile;

@Service
Expand Down Expand Up @@ -207,16 +210,12 @@ public CourseCreateRes createCourse(final Long userId, final CourseCreateReq cou
return CourseCreateRes.of(newcourse.getId(), user.getTotalPoint() + Constants.COURSE_CREATE_POINT, userCourseCount);
}

@TransactionalEventListener
@EventListener
public void handleCourseCreatedEvent(CourseCreateEvent event) {
Course course = event.getCourse();
List<CoursePlaceGetReq> places = event.getPlaces();
List<TagCreateReq> tags = event.getTags();
try {
asyncService.runAsyncTasks(places, tags, course);
} catch (Exception e) {
throw new DateRoadException(FailureCode.COURSE_CREATE_ERROR);
}
asyncService.runAsyncTasks(places, tags, course);
}

@Transactional
Expand All @@ -234,7 +233,6 @@ public DateAccessCreateRes openCourse(final Long userId, final Long courseId, fi
private DateAccessCreateRes calculateUserInfo(CoursePaymentType coursePaymentType, int userTotalPoint, int userFree, Long userPurchaseCount) {
if (coursePaymentType == CoursePaymentType.FREE) {
return DateAccessCreateRes.of(userTotalPoint, userFree-1, userPurchaseCount);

} else if (coursePaymentType == CoursePaymentType.POINT) {
return DateAccessCreateRes.of(userTotalPoint - Constants.COURSE_OPEN_POINT, userFree, userPurchaseCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -39,33 +38,30 @@ public class ImageService {

@Transactional
public List<Image> saveImages(final List<MultipartFile> images, final Course course) {
List<Image> savedImages = new ArrayList<>();
// Use thread-safe Queue to collect saved images
Queue<Image> savedImages = new ConcurrentLinkedQueue<>();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<Void>> futures = IntStream.range(0, images.size())
.mapToObj(index -> CompletableFuture.runAsync(() -> {
try {
String imagePath = s3Service.uploadImage(path, images.get(index));
Image newImage = Image.create(
course,
cachePath + imagePath,
index + 1
);
Image newImage = Image.create(course, cachePath + imagePath, index + 1);
savedImages.add(newImage);
} catch (IOException e) {
throw new BadRequestException(FailureCode.BAD_REQUEST);
throw new CompletionException(new BadRequestException(FailureCode.BAD_REQUEST));
}
}, executor))
.toList();
// Wait for all tasks to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
savedImages.sort(Comparator.comparing(Image::getSequence));
imageRepository.saveAll(savedImages);
executor.shutdown(); // Shutdown the ExecutorService
List<Image> sortedImages = new ArrayList<>(savedImages);
sortedImages.sort(Comparator.comparing(Image::getSequence));
imageRepository.saveAll(sortedImages);
return sortedImages;
}
return savedImages;
}



public String getImageUrl(final MultipartFile image) {
if (image == null || image.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.dateroad.point.event;

import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import org.dateroad.course.dto.request.PointUseReq;

@Getter
@AllArgsConstructor
public class MessageDto {
@Builder
public static class PointMessageDTO {
private final String userId;
private final String point;
private final String type;
private final String description;

public static PointMessageDTO of(Long userId, PointUseReq pointUseReq) {
return PointMessageDTO.builder()
.userId(userId.toString())
.point(String.valueOf(pointUseReq.getPoint()))
.description(pointUseReq.getDescription())
.type(pointUseReq.getType().name()).build();
}

public Map<String, String> toMap() {
Map<String, String> fieldMap = new HashMap<>();
fieldMap.put("userId", userId);
fieldMap.put("point", point);
fieldMap.put("type", type);
fieldMap.put("description", description);
return fieldMap;
}
}

@Builder
@AllArgsConstructor
public static class FreeMessageDTO {
private final String userId;
public static FreeMessageDTO of(Long userId) {
return FreeMessageDTO.builder().userId(userId.toString()).build();
}

public Map<String, String> toMap() {
Map<String, String> fieldMap = new HashMap<>();
fieldMap.put("userId", userId);
return fieldMap;
}
}
}

0 comments on commit 4fdef4f

Please sign in to comment.