카테고리 없음

[Spring boot] 비관락, 낙관락, 분산락 구현

ckm7907 2024. 8. 8. 23:46

일반적으로 단일 스레드 상황의 성공 조회수는 다음과 같다.

       @Test
       @Transactional
       void 성공_조회수_추가() {
           //given
           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           //when
           for (int i = 0; i < 100; i++) {
               commentService.createComment(memberDto, board.getId(), commentRequestDto);
           }

           //then
           assertThat(board.getHit()).isEqualTo(100);
       }

 

하지만 일반적으로 스프링은 스레드풀을 가지고 있어, 한번에 여러 요청이 동시에 이루어 질수 있다.

예를 들어 여러명이 동시에 조회수를 추가하는 경우의 테스트를 진행해보자

 

20개의 스레드가 동시에 조회수를 늘린다면 어떻게 될까?

       @Test
       void 에러_기본_코드는_동시성_테스트시_성공하지_않는_트랜잭션이_발생한다() throws InterruptedException {
           int threadCount = 100;

           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           ExecutorService executorService = Executors.newFixedThreadPool(20);
           CountDownLatch latch = new CountDownLatch(threadCount);

           AtomicInteger hitCount = new AtomicInteger(0);
           AtomicInteger missCount = new AtomicInteger(0);

           for (int i = 0; i < threadCount; i++) {
               executorService.execute(() -> {
                   try {
                       commentService.createComment(memberDto, board.getId(), commentRequestDto);
                       hitCount.incrementAndGet();
                   } catch (Exception e) {
                       System.out.println(e.getMessage());
                       missCount.incrementAndGet();
                   } finally {
                       latch.countDown();
                   }
               });
           }
           latch.await();

           System.out.println("hitCount = " + hitCount);
           System.out.println("missCount = " + missCount);

           Board newBoard = boardRepository.findById(board.getId()).orElseThrow();

           assertThat(newBoard.getHit()).isNotEqualTo(100);
       }

comment를 추가하면서 동시성 이슈로 에러 처리가 된 부분이 생긴다.

또한 게시판의 조회수는 100이 아니도록 출력이된다.

 

그래서 첫번째로 비관적  락으로 구현을 해본다.

비관적락이란, 현재 스레드가 사용하고 있는 DB에 대한 접근을 막는 방법이다.

그렇게 되면 다른 스레드는 해당 DB를 접근하기 위해 기다리게 되어 동시성 이슈를 해결한다.

       @Test
       void 성공_동시성_테스트_비관적_락_사용() throws InterruptedException {
           int threadCount = 100;

           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           ExecutorService executorService = Executors.newFixedThreadPool(20);
           CountDownLatch latch = new CountDownLatch(threadCount);

           AtomicInteger hitCount = new AtomicInteger(0);
           AtomicInteger missCount = new AtomicInteger(0);

           for (int i = 0; i < threadCount; i++) {
               executorService.execute(() -> {
                   try {
                       commentService.createCommentWithPessimisticLock(memberDto, board.getId(), commentRequestDto);
                       hitCount.incrementAndGet();
                   } catch (Exception e) {
                       System.out.println(e.getMessage());
                       missCount.incrementAndGet();
                   } finally {
                       latch.countDown();
                   }
               });
           }
           latch.await();

           System.out.println("hitCount = " + hitCount);
           System.out.println("missCount = " + missCount);

           Board newBoard = boardRepository.findById(board.getId()).orElseThrow();

           assertThat(newBoard.getHit()).isEqualTo(100);
       }

hitCount 는 100을 달성하고, 테스트는 성공한다.

하지만 이 방법을 사용하면 스레드가 DB 접근권한을 획득 한 후 다른 스레드가 접근을 못하는 동안의 시간이 발생해 느리다는 단점이 있다.

 

다음은 낙관적 락이다.

낙관적 락은 각 DB 접근 시 마다 version을 매긴다.

그 후 다음 스레드가 DB 접근 시 version을 매기고 write를 할 때 현재 DB에 있는 version이 지금 전달할 version 보다 1만큼 작지 않으면 다시 시작한다.

그래서 aop로 retry 문을 구현해야 한다.

       @Test
       void 성공_동시성_테스트_낙관적_락_사용() throws InterruptedException {
           int threadCount = 100;

           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           ExecutorService executorService = Executors.newFixedThreadPool(20);
           CountDownLatch latch = new CountDownLatch(threadCount);

           AtomicInteger hitCount = new AtomicInteger(0);
           AtomicInteger missCount = new AtomicInteger(0);

           for (int i = 0; i < threadCount; i++) {
               executorService.execute(() -> {
                   try {
                       commentService.createCommentWithOptimisticLock(memberDto, board.getId(), commentRequestDto);
                       hitCount.incrementAndGet();
                   } catch (Exception e) {
                       System.out.println(e.getMessage());
                       missCount.incrementAndGet();
                   } finally {
                       latch.countDown();
                   }
               });
           }
           latch.await();

           System.out.println("hitCount = " + hitCount);
           System.out.println("missCount = " + missCount);

           Board newBoard = boardRepository.findById(board.getId()).orElseThrow();

           assertThat(newBoard.getHit()).isEqualTo(100);
       }

이 방법은 기존의 알려진 락 방식과는 사뭇 다른 느낌이다.

왜냐하면 version을 매겨서 잘못된 version update 라면 재시도하는 로직이기 때문이다.

이미 실패할 걸 생각하고 구현하는 느낌이다.

 

aop retry는 다음과 같다.

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Retry {
    int value() default 100;
}
@Order(Ordered.LOWEST_PRECEDENCE - 1)
@Aspect
@Component
public class RetryAspect {
    private static final Logger log = LoggerFactory.getLogger(RetryAspect.class);

    @Around("@annotation(retry)")
    public Object doRetry(ProceedingJoinPoint joinPoint, Retry retry) throws Throwable {
        int maxRetry = retry.value();
        Exception exceptionHolder = null;
        for (int retryCount = 1; retryCount <= maxRetry; retryCount++) {
            try {
                return joinPoint.proceed();
            } catch (Exception e) {
//                log.warn("[retry] try count = {}/{}", retryCount, maxRetry);
                exceptionHolder = e;
            }
        }
        throw exceptionHolder;
    }
}

 

여기서 @Order를 설정한 이유는 transaction aop 보다 바깥에서 retry가 사용되도록 구현하기 위해서다.

 

분산락 (redisson)

인메모리 데이터 베이스인 redis의 redisson을 활용해서 구현한다.

redis에서 db에 동시에 접근을 막기위해 한 스레드가 접근하고 있는 value를 저장하는 방식이다.

pub/sub를 사용해서 lettuce의 스핀락 방식보다 부하가 적다.

락이 해제될때 마다 subcribe 중인 클라이언트에게 "락을 시도해도 된다" 라는 알림을 준다.

그래서 락 획득이 실패했을 때, 락 획득 요청을 보내지 않고 기다린다. 

       @Test
       void 성공_동시성_테스트_분산_락_사용_redisson() throws InterruptedException {
           int threadCount = 100;

           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           ExecutorService executorService = Executors.newFixedThreadPool(20);
           CountDownLatch latch = new CountDownLatch(threadCount);

           AtomicInteger hitCount = new AtomicInteger(0);
           AtomicInteger missCount = new AtomicInteger(0);

           for (int i = 0; i < threadCount; i++) {
               executorService.execute(() -> {
                   try {
                       commentService.createCommentWithDistributedLock(memberDto, board.getId(), commentRequestDto);
                       hitCount.incrementAndGet();
                   } catch (Exception e) {
                       System.out.println(e.getMessage());
                       missCount.incrementAndGet();
                   } finally {
                       latch.countDown();
                   }
               });
           }
           latch.await();

           System.out.println("hitCount = " + hitCount);
           System.out.println("missCount = " + missCount);

           Board newBoard = boardRepository.findById(board.getId()).orElseThrow();

           assertThat(newBoard.getHit()).isEqualTo(100);
       }

 

분산락을 위한 aop는 다음과 같다.

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
   //락의 이름
   String key();

   //락의 시간 단위
   TimeUnit timeUnit() default TimeUnit.SECONDS;

   //락을 기다리는 시간(default - 5s)
   //락 획들을 위해 기다림
   long waitTime() default 5L;

   //락 임대시간(default - 3s)
   //락 획득후 lease time이 지나면 락을 해제한다.
   long leaseTime() default 3L;
}
@lombok.extern.slf4j.Slf4j
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
@Order(Ordered.LOWEST_PRECEDENCE - 1)
public class DistributedLockAop {
   private static final String REDISSON_LOCK_PREFIX = "LOCK:";
   private final RedissonClient redissonClient;
   private final AopForTransaction aopForTransaction;

   @Around("@annotation(com.ssafy.home.global.aop.distributed.DistributedLock)")
   public Object lock(final ProceedingJoinPoint joinPoint) throws Throwable {
       MethodSignature signature = (MethodSignature) joinPoint.getSignature();
       Method method = signature.getMethod();
       DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);

       String key = REDISSON_LOCK_PREFIX + CustomSpringELParser.getDynamicValue(signature.getParameterNames(),
               joinPoint.getArgs(), distributedLock.key());
       RLock rLock = redissonClient.getLock(key);

       try {
           boolean available = rLock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(),
                   distributedLock.timeUnit());
           if (!available) {
               return false;
           }
           log.info("락 획득");
           return aopForTransaction.proceed(joinPoint);
       } catch (InterruptedException e) {
           throw new InterruptedException();
       } finally {
           try {
               rLock.unlock();
               log.info("락 해제");
           } catch (IllegalMonitorStateException e) {
           }
       }
   }
}

 

전체 코드는 다음과 같다.

@Nested
   @DisplayName("댓글 동시성 처리 관련 테스트")
   class CommentCouncurrency {
       @Test
       @Transactional
       void 성공_조회수_추가() {
           //given
           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           //when
           for (int i = 0; i < 100; i++) {
               commentService.createComment(memberDto, board.getId(), commentRequestDto);
           }

           //then
           assertThat(board.getHit()).isEqualTo(100);
       }

       @Test
       void 에러_기본_코드는_동시성_테스트시_성공하지_않는_트랜잭션이_발생한다() throws InterruptedException {
           int threadCount = 100;

           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           ExecutorService executorService = Executors.newFixedThreadPool(20);
           CountDownLatch latch = new CountDownLatch(threadCount);

           AtomicInteger hitCount = new AtomicInteger(0);
           AtomicInteger missCount = new AtomicInteger(0);

           for (int i = 0; i < threadCount; i++) {
               executorService.execute(() -> {
                   try {
                       commentService.createComment(memberDto, board.getId(), commentRequestDto);
                       hitCount.incrementAndGet();
                   } catch (Exception e) {
                       System.out.println(e.getMessage());
                       missCount.incrementAndGet();
                   } finally {
                       latch.countDown();
                   }
               });
           }
           latch.await();

           System.out.println("hitCount = " + hitCount);
           System.out.println("missCount = " + missCount);

           Board newBoard = boardRepository.findById(board.getId()).orElseThrow();

           assertThat(newBoard.getHit()).isNotEqualTo(100);
       }

       @Test
       void 성공_동시성_테스트_비관적_락_사용() throws InterruptedException {
           int threadCount = 100;

           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           ExecutorService executorService = Executors.newFixedThreadPool(20);
           CountDownLatch latch = new CountDownLatch(threadCount);

           AtomicInteger hitCount = new AtomicInteger(0);
           AtomicInteger missCount = new AtomicInteger(0);

           for (int i = 0; i < threadCount; i++) {
               executorService.execute(() -> {
                   try {
                       commentService.createCommentWithPessimisticLock(memberDto, board.getId(), commentRequestDto);
                       hitCount.incrementAndGet();
                   } catch (Exception e) {
                       System.out.println(e.getMessage());
                       missCount.incrementAndGet();
                   } finally {
                       latch.countDown();
                   }
               });
           }
           latch.await();

           System.out.println("hitCount = " + hitCount);
           System.out.println("missCount = " + missCount);

           Board newBoard = boardRepository.findById(board.getId()).orElseThrow();

           assertThat(newBoard.getHit()).isEqualTo(100);
       }

       @Test
       void 성공_동시성_테스트_낙관적_락_사용() throws InterruptedException {
           int threadCount = 100;

           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           ExecutorService executorService = Executors.newFixedThreadPool(20);
           CountDownLatch latch = new CountDownLatch(threadCount);

           AtomicInteger hitCount = new AtomicInteger(0);
           AtomicInteger missCount = new AtomicInteger(0);

           for (int i = 0; i < threadCount; i++) {
               executorService.execute(() -> {
                   try {
                       commentService.createCommentWithOptimisticLock(memberDto, board.getId(), commentRequestDto);
                       hitCount.incrementAndGet();
                   } catch (Exception e) {
                       System.out.println(e.getMessage());
                       missCount.incrementAndGet();
                   } finally {
                       latch.countDown();
                   }
               });
           }
           latch.await();

           System.out.println("hitCount = " + hitCount);
           System.out.println("missCount = " + missCount);

           Board newBoard = boardRepository.findById(board.getId()).orElseThrow();

           assertThat(newBoard.getHit()).isEqualTo(100);
       }

       @Test
       void 성공_동시성_테스트_분산_락_사용_redisson() throws InterruptedException {
           int threadCount = 100;

           CommentRequestDto commentRequestDto = CommentRequestDto.builder().content("test").build();

           ExecutorService executorService = Executors.newFixedThreadPool(20);
           CountDownLatch latch = new CountDownLatch(threadCount);

           AtomicInteger hitCount = new AtomicInteger(0);
           AtomicInteger missCount = new AtomicInteger(0);

           for (int i = 0; i < threadCount; i++) {
               executorService.execute(() -> {
                   try {
                       commentService.createCommentWithDistributedLock(memberDto, board.getId(), commentRequestDto);
                       hitCount.incrementAndGet();
                   } catch (Exception e) {
                       System.out.println(e.getMessage());
                       missCount.incrementAndGet();
                   } finally {
                       latch.countDown();
                   }
               });
           }
           latch.await();

           System.out.println("hitCount = " + hitCount);
           System.out.println("missCount = " + missCount);

           Board newBoard = boardRepository.findById(board.getId()).orElseThrow();

           assertThat(newBoard.getHit()).isEqualTo(100);
       }
   }