글을 작성하게 된 계기
판매자 등급 산정 을 하기 위한 배치를 개발하고 있는데, 대형 판매자의 매출 집계 작업 이 오래 걸려 전체 배치가 지연 되는 문제가 발생했습니다. 이를 해결하기 위해 비동기 처리를 도입했는데, 이 과정에서 알게된 내용을 정리하기 위해 글을 작성하게 되었습니다.
1. 문제 상황
판매자 등급 산정 배치는 각 판매자들의 일/월 매출을 집계하는 배치 입니다. 즉, 전체 판매자 목록을 순회하며 각 판매자의 판매 기록을 집계해야 하는데요, 이는 기본적으로 동기로 처리하고 있었습니다. 대부분의 판매자 매출은 금방 집계되지만, 판매량이 많은 대형 판매자 는 데이터가 많아서 처리 시간이 길어졌습니다.
1
2
3
4
5
6
7
8
9
10
11
12
@Service
class SellerBatchService {
fun calculateSellersGmv(
date: LocalDate,
sellers: Sellers
) {
for (seller in sellers) {
processBatch(date, seller)
}
}
}
결국 대형 판매자 매출을 계산할 때, 전체 배치가 지연 되는 문제가 발생하게 되는데요, 이를 해결하기 위해 판매량이 많은 판매자는 비동기로 데이터를 처리하기로 했습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
class SellerBatchService {
fun calculateSellersGmv(
date: LocalDate,
sellers: Sellers
) {
for (seller in sellers) {
if (seller.isBigSeller()) {
processBatchAsync(date, seller)
continue
}
processBatchSync(date, seller)
}
}
}
비동기를 사용하면 작업이 많을 때, 바로 실행하지 못하는 경우도 종종 발생합니다. 쓰레드풀의 쓰레드가 모두 바쁘고 큐가 찼을 때, 새로 들어온 작업이 즉시 실행되지 않고 대기하게 되는 경우입니다. 이때 쓰레드풀은 실행 중인 쓰레드 외에 아직 처리되지 않은 작업을 담는 큐를 두어, 나중에 이를 처리할 수 있습니다. 그런데 대형 판매자가 판매한 상품이 많아 큐가 가득 차면서, 큐 내부에 쌓인 작업이 에러를 내는 문제 가 발생했습니다. 매출 집계 배치는 유실되면 안 되는 작업 인데 난감했죠.
1
2
3
4
5
6
7
8
9
10
11
12
Worker #1 (dequeue)
|
v
+--------------------------------------------+
| T1 | T2 | T3 | T4 | T5 | 대기 Queue
+--------------------------------------------+
|
| dequeue() → T1을 대기 Queue에서 꺼내 작업 Queue로 이동
v
+---------------------------------------------+
| | 작업 Queue
+---------------------------------------------+
2. 비동기 쓰레드 풀 정책
어떤 정책을 사용할지 정하기 전, 먼저 ThreadPoolExecutor가 제공하는 AbortPolicy, DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy 네 가지 RejectedExecutionHandler 정책을 간단히 살펴보겠습니다.
- AbortPolicy
- DiscardPolicy
- DiscardOldestPolicy
- CallerRunsPolicy
2-1. AbortPolicy
AbortPolicy는 쓰레드풀의 쓰레드와 큐가 모두 가득 차 더 이상 작업을 처리할 수 없을 때, 새로운 작업을 즉시 거부하고 예외를 발생시키는 정책입니다. 작업을 절대 유실하면 안 되고, 실패를 반드시 감지해야 하는 상황에서 사용됩니다.
A handler for rejected tasks that throws a RejectedExecutionException.
2-2. DiscardPolicy
DiscardPolicy는 쓰레드풀의 큐가 가득 찼을 때, 새로 들어온 작업을 예외 없이 조용히 버리는 정책입니다. 작업 유실이 허용되거나 중요도가 낮은 작업에서 사용됩니다.
A handler for rejected tasks that silently discards the rejected task.
2-3. DiscardOldestPolicy
DiscardOldestPolicy는 큐가 가득 찼을 때, 큐에서 가장 오래된 작업을 제거하고 새 작업을 대신 넣는 정책입니다. 이전 작업보다 최신 작업이 더 중요할 때 사용되며, 작업 순서 보장이 필요한 환경에서는 적합하지 않습니다.
A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.
2-4. CallerRunsPolicy
CallerRunsPolicy는 큐와 쓰레드가 모두 가득 찼을 때, 새 작업을 제출한 호출 쓰레드가 직접 작업을 실행하게 하는 정책입니다. 작업 유실 없이 처리할 수 있지만, 호출 쓰레드가 바빠져 전체 처리 속도가 자연스럽게 줄어드는 백프레셔(Back Pressure) 효과가 생깁니다.
A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded.
3. 어떤 정책을 사용할까?
저한테 필요한 요구사항은 판매량이 많은 판매자의 작업만 별도로 비동기 로 처리하며, 이 과정에서 작업 유실이 없어야 하고, 시간이 오래걸리더라도 반드시 처리 돼야 하는 것입니다. 또한 예외가 발생하면 바로 인지할 수 있어야 하고, 언제든 재시도도 할 수 있어야 하죠. 따라서 AbortPolicy 가 가장 적합하다고 판단했는데요, 여기에 몇 가지 옵션을 더 추가해 다음과 같이 사용하게 되었습니다.
1
2
3
4
5
6
7
8
9
10
@Bean
fun taskExecutor(): ThreadPoolTaskExecutor {
return ThreadPoolTaskExecutor().apply {
corePoolSize = 8
maxPoolSize = 16
queueCapacity = Int.MAX_VALUE
setRejectedExecutionHandler(AbortPolicy())
initialize()
}
}
큐의 용량을 저렇게 늘리면 메모리 이슈가 발생하지 않을지 걱정할 수 있는데, 내부에 담고 있는 데이터가 주문 ID로 크기가 작기 때문에 큰 문제가 없었습니다. 만약 작업당 메모리 사용량이 크다면, 큐 용량을 적절히 조절하는 것이 필요합니다.
1
2
3
# 10만개의 Set<Long>
- 64bit: 약 6MB (24B×10만 + 32B×10만 + 0.5MB)
- 32bit: 약 4.5MB (16B×10만 + 24B×10만 + 0.5MB)
4. 정리
비동기 작업을 처리할 때, 쓰레드풀의 작업 거부 정책을 적절히 선택하는 것이 중요합니다.큐 용량과 작업 특성에 맞게 설정을 조정해야 하며, 이를 통해 안정적이고 효율적인 비동기 처리를 구현할 수 있습니다.