단일 애플리케이션에서 동시성
코프링 단일 애플리케이션에서 동시성 처리
애플리케이션에서 동시성을 처리하기 위한 학습을 작업하는 중 물론, 정답은 다중 애플리케이션 동시성 처리 -> 분산 서버이자 단일성 처리하는 서버 필요 -> 분산락,비관적 락 같은 요소들이 필요해진다.
겠지만 단일 애플리케이션일때도 동시성을 처리하는 방법에 대해 학습해보고 싶어서 정리한다.
테스트 방법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Test
fun `동시에 여러개의 요청이 오면 하나만 성공한다`() {
val results = ConcurrentTestUtil.concurrent(numberOfRequests = 20) {
lottoPurchaseService.purchase(
lottoPurchaseRequest = LottoPurchaseRequest(
purchaseType = PurchaseType.CARD,
currency = Currency.KRW,
amount = BigDecimal(1000),
paymentKey = "paymentKey",
orderId = "orderId"
),
lottoPublishId = lottoPublishId,
authenticated = AuthenticatedMember("ID", "user@email.com")
)
}
results.successCount() shouldBe 1
results.failureCount(IllegalStateException::class) shouldBe 19
}
fun concurrent(numberOfRequests: Int = 10, wait: Long = 5, block: () -> Unit): ConcurrentResult {
val executor = Executors.newFixedThreadPool(numberOfRequests)
val latch = CountDownLatch(1) // 모든 작업을 동시에 시작시키기 위한 래치
val results = ConcurrentLinkedQueue<Result<Unit>>() // 각 작업의 결과를 저장할 concurrent 컬렉션
// [numberOfRequests] 만큼 Callable 작업 생성
val tasks = (1..numberOfRequests).map {
Callable {
// latch가 해제될 때까지 대기
latch.await()
try {
block()
results.add(Result.success(Unit))
} catch (e: Exception) {
results.add(Result.failure(e))
}
}
}
// 작업들을 executor에 제출
tasks.forEach { executor.submit(it) }
// 모든 스레드가 동시에 실행되도록 latch 해제
latch.countDown()
// 작업 완료 대기
executor.shutdown()
if (!executor.awaitTermination(wait, TimeUnit.SECONDS)) {
executor.shutdownNow()
}
return ConcurrentResult(results)
}
이와같이 성공과 실패를 측정하고, 특정 예외에 대한 실패가 몇번 발생했는지를 측정하는 식으로 테스트를 했다.
기본 로직
1
2
3
4
5
6
7
8
9
orderValidator.checkOrderValid(lottoPurchaseRequest.toOrderDataRequest())
val purchase = purchaseProcessor.purchase(lottoPurchaseRequest.toPurchaseRequest())
val lottoPublish = lottoPublisher.complete(lottoPublishId)
val bill = lottoWriter.saveBill(purchase.getId(), lottoPublish.getId(), authenticated.memberId)
return LottoBillData(
id = bill.getId()!!,
purchase = PurchaseData.from(purchase),
lottoPublish = LottoPublishData.from(lottoPublish)
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Transaction
@Write
fun complete(publishId: Long): LottoPublish {
val lottoPublish = getLottoPublish(publishId)
lottoPublish.complete()
return lottoPublish
}
// LottoPublish
fun complete() {
if (this.status != LottoPublishStatus.WAITING) {
throw IllegalStateException("결제 대기 상태에서만 완료가 가능합니다")
}
this.status = LottoPublishStatus.COMPLETE
}
이와같은 로직이 있을때 관건은
- 하나의 요청에 대해 결제 요청을 여러번 보내지 않는다.
- DB 전파 및 상태 변화를 한번만 한다. 이다.
동시성 처리 X
동시성 처리를 하지 않고
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Transaction
@Write
fun complete(publishId: Long): LottoPublish {
val lottoPublish = getLottoPublish(publishId)
lottoPublish.complete()
Thread.sleep(500)
return lottoPublish
}
fun purchase(purchaseRequest: PurchaseRequest): Purchase {
val purchaseData = paymentProcessor.purchase(purchaseRequest, PurchaseProvider.TOSS)
val purchase = purchaseWriter.savePurchase(purchaseData)
Thread.sleep(1000)
return purchase
}
이와같이 DB 단에서 0.5초 대기 + 결제 단에서 1초 대기를 한다고 가정해보자.
1
Success count: 2, Failure count: 18
와 같이 20개 요청 중 2개의 요청이 성공하는 경우가 있다.
시간을 3초(Thread.sleep(3000)
) 로 늘려보면?
1
Success count: 4, Failure count: 16
4개까지 증가한다.
사용자가, 답답함을 느끼고 계속 여러번 요청을 누르면? 그야말로, 심각 해질 수 있다.
물론, PG 사가 대부분은 멱등성 및 동시성을 자체 처리를 해준다. ( 두번 결제 방지 ) 하지만, 우리 DB 및 로직 처리상 두번이 되는것도 + 외부 API 에 요청을 두~n번 보내는 것도 문제가 되므로 해결 하는 것이다.
동시성 처리 O
아래 요소들 전부 시간의 차이는 크지 않다. 어차피, 첫 번째 요청이 끝나면 그다음부턴 DB 값이 다르므로 예외가 발생한다. ( Lock Free - CAS 제외 )
Synchronized
2 sec 270 ms
의 시간이 걸렸다.
1
2
3
4
5
6
7
8
9
10
11
12
val lock = locks.computeIfAbsent(lottoPublishId) { Any() }
synchronized(lock) {
orderValidator.checkOrderValid(lottoPurchaseRequest.toOrderDataRequest())
val purchase = purchaseProcessor.purchase(lottoPurchaseRequest.toPurchaseRequest())
val lottoPublish = lottoPublisher.complete(lottoPublishId)
val bill = lottoWriter.saveBill(purchase.getId(), lottoPublish.getId(), authenticated.memberId)
return LottoBillData(
id = bill.getId()!!,
purchase = PurchaseData.from(purchase),
lottoPublish = LottoPublishData.from(lottoPublish)
)
}
1
2
3
4
5
6
7
8
9
10
11
Thread: ConcurrentTestUtil-thread-2 (ID: 71, State: BLOCKED)
at app//lotto.service.LottoPurchaseService.purchase(LottoPurchaseService.kt:53)
at app//lotto.service.LottoPurchaseServiceConfirmTest$PurchaseCase.동시에_여러개의_요청이_오면_하나만_성공한다$lambda$1(LottoPurchaseServiceConfirmTest.kt:87)
at app//lotto.service.LottoPurchaseServiceConfirmTest$PurchaseCase$$Lambda/0x0000000701dc1118.invoke(Unknown Source)
at app//ConcurrentTestUtil.concurrent$lambda$1$lambda$0(ConcurrentTestUtil.kt:28)
at app//ConcurrentTestUtil$$Lambda/0x0000000701dc1558.call(Unknown Source)
at java.base@21.0.4/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base@21.0.4/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base@21.0.4/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base@21.0.4/java.lang.Thread.runWith(Thread.java:1596)
at java.base@21.0.4/java.lang.Thread.run(Thread.java:1583)
나머지 스레드들은 BLOCKING 상태로 대기한다.
compareAndSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private val publishCompletionStatus = ConcurrentHashMap<Long, AtomicBoolean>()
fun purchase(
lottoPurchaseRequest: LottoPurchaseRequest,
lottoPublishId: Long,
authenticated: Authenticated
): LottoBillData {
val isCompleted = publishCompletionStatus.computeIfAbsent(lottoPublishId) { AtomicBoolean(false) }
if (!isCompleted.compareAndSet(false, true)) {
throw IllegalStateException("이미 결제가 진행되었습니다.")
}
orderValidator.checkOrderValid(lottoPurchaseRequest.toOrderDataRequest())
val purchase = purchaseProcessor.purchase(lottoPurchaseRequest.toPurchaseRequest())
val lottoPublish = lottoPublisher.complete(lottoPublishId)
val bill = lottoWriter.saveBill(purchase.getId(), lottoPublish.getId(), authenticated.memberId)
return LottoBillData(
id = bill.getId()!!,
purchase = PurchaseData.from(purchase),
lottoPublish = LottoPublishData.from(lottoPublish)
)
}
2 sec 207 ms
의 시간이 걸렸다.
요청을 하기 전, ConcurrentHashMap
에서 false 에서 true 로 변경을 실패한다면? 예외를 던져서 하나의 스레드만 통과해서 수행을 하게 구현되어있다.
수행시간은? 1sec 805ms
가 걸리고 통과를 한다! 매우 빠르다. 이와 같은 동시성 처리 방식을 Lock Free
방식이라고 한다.
Lock Free
여러 개의 스레드에서 동시 호출하더라도, 특정 단위 시간 내 한 개의 호출이 무조건 완료되게 해주는 알고리즘이다.
위와 같은 방식을 CAS(Compare And Set)
이라고 한다. 값을 변경하면 true 를 리턴, 값을 변경하지 못하면 false 를 리턴
1
2
3
1. 기존 값을 읽어 변경할 값을 계산
2. 기존 값이 현재 메모리와 같다면, 변경할 값으로 교체
3. 기존 값이 현재 메모리의 값과 다르다면, 값을 변경하지 않거나 or 1부터 재시도
이때, 경쟁하려는 요소가 많으면 문제가 되는지 궁금해졌다.
1
ConcurrentTestUtil.concurrent(numberOfRequests = 1000, wait = 1000L)
와 같이 1000개의 스레드가 서로 경쟁하게 해보면
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
===== 1초 경과 후 Executor 스레드 덤프 =====
===== EXECUTOR THREAD DUMP =====
Thread: ConcurrentTestUtil-thread-1 (ID: 70, State: TIMED_WAITING)
at java.base@21.0.4/java.lang.Thread.sleep0(Native Method)
at java.base@21.0.4/java.lang.Thread.sleep(Thread.java:509)
at app//purchase.domain.implementation.PurchaseProcessor.purchase(PurchaseProcessor.kt:22)
at app//lotto.service.LottoPurchaseService.purchase(LottoPurchaseService.kt:79)
at app//lotto.service.LottoPurchaseServiceConfirmTest$PurchaseCase.동시에_여러개의_요청이_오면_하나만_성공한다$lambda$1(LottoPurchaseServiceConfirmTest.kt:87)
at app//lotto.service.LottoPurchaseServiceConfirmTest$PurchaseCase$$Lambda/0x0000009801dd5560.invoke(Unknown Source)
at app//ConcurrentTestUtil.concurrent$lambda$1$lambda$0(ConcurrentTestUtil.kt:28)
at app//ConcurrentTestUtil$$Lambda/0x0000009801dd59a0.call(Unknown Source)
at java.base@21.0.4/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base@21.0.4/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base@21.0.4/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base@21.0.4/java.lang.Thread.runWith(Thread.java:1596)
at java.base@21.0.4/java.lang.Thread.run(Thread.java:1583)
===== END EXECUTOR THREAD DUMP =====
====================================
지극히 문제없이, 빠르게 경쟁이 끝나고 스레드들이 다 종료된다.
https://stackoverflow.com/questions/47569600/is-cas-a-loop-like-spin
해당 내용에 자세히 설명이 되어있다.
- CAS 는
cmpxchg
를 통해 비교와 동시에 변경(+단일 스레드 보장)한다. - CAS 는 한 번에 성공 및 실패하는 단일 연산 - 별도 스핀 루프 포함 X
Lock Free 에선 ABA 문제가 발생할 수 있는데( 메모리 값이 변경되었는지 확인하지만, 중간에 변경되었다가 다시 원래 값으로 돌아오는 상황 ) 이에 대해선 10개 가량과 Boolean 변경 사항에서 고려할 요소가 아니라고 생각한다.
즉, 다른 스레드들이 대기할 필요 없이 연산 후 빠르게 처리되므로 시간이 매우 짧고 + 높은 성능을 보인다.
Stamped Lock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val stamp = stampedLock.writeLock()
try {
orderValidator.checkOrderValid(lottoPurchaseRequest.toOrderDataRequest())
val purchase = purchaseProcessor.purchase(lottoPurchaseRequest.toPurchaseRequest())
val lottoPublish = lottoPublisher.complete(lottoPublishId)
val bill = lottoWriter.saveBill(purchase.getId(), lottoPublish.getId(), authenticated.memberId)
return LottoBillData(
id = bill.getId()!!,
purchase = PurchaseData.from(purchase),
lottoPublish = LottoPublishData.from(lottoPublish)
)
} finally {
stampedLock.unlockWrite(stamp)
}
2 sec 349 ms
의 시간이 걸렸다.
Stamped Lock
의 특징으론
- 세 가지 락 모드가 지원된다. -
쓰기 락
,읽기 락
,낙관적 읽기 락
stamp
라는 토큰 기반으로 해제 가능하다.- 락 전환이 용이하다. - 읽기 락에서 쓰기 락으로 전환 가능
읽기 작업이 많고 쓰기 작업이 상대적으로 드문 환경에서 낙관적 읽기 허용하면 높은 동시성과 성능을 얻을 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Thread: ConcurrentTestUtil-thread-6 (ID: 75, State: WAITING)
at java.base@21.0.4/jdk.internal.misc.Unsafe.park(Native Method)
at java.base@21.0.4/java.util.concurrent.locks.LockSupport.park(LockSupport.java:221)
at java.base@21.0.4/java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1251)
at java.base@21.0.4/java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:480)
at app//lotto.service.LottoPurchaseService.purchase(LottoPurchaseService.kt:97)
at app//lotto.service.LottoPurchaseServiceConfirmTest$PurchaseCase.동시에_여러개의_요청이_오면_하나만_성공한다$lambda$1(LottoPurchaseServiceConfirmTest.kt:87)
at app//lotto.service.LottoPurchaseServiceConfirmTest$PurchaseCase$$Lambda/0x0000000601dbf7e8.invoke(Unknown Source)
at app//ConcurrentTestUtil.concurrent$lambda$1$lambda$0(ConcurrentTestUtil.kt:28)
at app//ConcurrentTestUtil$$Lambda/0x0000000601dbfc28.call(Unknown Source)
at java.base@21.0.4/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base@21.0.4/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base@21.0.4/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base@21.0.4/java.lang.Thread.runWith(Thread.java:1596)
at java.base@21.0.4/java.lang.Thread.run(Thread.java:1583)
나머지들은 WAITING 상태로 대기한다.
재 진입이 불가능하다. StampedLock
은 같은 Lock 사용 중 다른 Lock 취득하려 하면, BLOCKING 되어서 DEADLOCK 이 걸린다.
1
2
3
4
5
stampedLock.unlockWrite(stamp+1)
public void unlockWrite(long stamp) {
...
}
의도적으로 변경해서 해제하면? 이 역시도 DEADLOCK 이 걸린다.
ReentrantLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun purchase(
lottoPurchaseRequest: LottoPurchaseRequest,
lottoPublishId: Long,
authenticated: Authenticated
): LottoBillData {
val lock = locks.computeIfAbsent(lottoPublishId) { ReentrantLock() }
return lock.withLock {
orderValidator.checkOrderValid(lottoPurchaseRequest.toOrderDataRequest())
val lottoPublish = lottoPublisher.complete(lottoPublishId)
val purchase = purchaseProcessor.purchase(lottoPurchaseRequest.toPurchaseRequest())
val bill = lottoWriter.saveBill(purchase.getId(), lottoPublish.getId(), authenticated.memberId)
return LottoBillData(
id = bill.getId()!!,
purchase = PurchaseData.from(purchase),
lottoPublish = LottoPublishData.from(lottoPublish)
)
}
}
2 sec 349 ms
의 시간이 걸렸다.
동기화 블록(synchornized
) 보다 더 세밀하고 유연한 제어를 가능하게 해준다.
- 명시적인 락 제어 가능 (
lock
,unlock
) - 같은 스레드에서 재진입 가능하다. ( 동일한 락 여러번 획득 가능, 획득 횟수만큼 해제해야 함 )
- 공정성 모드 통해 대기열에 있는 스레드들에 먼저 요청한 순서대로 할당 가능하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Thread: ConcurrentTestUtil-thread-20 (ID: 89, State: WAITING)
at java.base@21.0.4/jdk.internal.misc.Unsafe.park(Native Method)
at java.base@21.0.4/java.util.concurrent.locks.LockSupport.park(LockSupport.java:221)
at java.base@21.0.4/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
at java.base@21.0.4/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:990)
at java.base@21.0.4/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:153)
at java.base@21.0.4/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
at app//lotto.service.LottoPurchaseService.purchase(LottoPurchaseService.kt:125)
at app//lotto.service.LottoPurchaseServiceConfirmTest$PurchaseCase.동시에_여러개의_요청이_오면_하나만_성공한다$lambda$1(LottoPurchaseServiceConfirmTest.kt:87)
at app//lotto.service.LottoPurchaseServiceConfirmTest$PurchaseCase$$Lambda/0x0000000601dd7378.invoke(Unknown Source)
at app//ConcurrentTestUtil.concurrent$lambda$1$lambda$0(ConcurrentTestUtil.kt:28)
at app//ConcurrentTestUtil$$Lambda/0x0000000601dd77b8.call(Unknown Source)
at java.base@21.0.4/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base@21.0.4/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base@21.0.4/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base@21.0.4/java.lang.Thread.runWith(Thread.java:1596)
at java.base@21.0.4/java.lang.Thread.run(Thread.java:1583)
나머지들은 WAITING 상태로 대기한다.
고민점
1
2
3
4
5
6
7
8
9
orderValidator.checkOrderValid(lottoPurchaseRequest.toOrderDataRequest())
val lottoPublish = lottoPublisher.complete(lottoPublishId)
val purchase = purchaseProcessor.purchase(lottoPurchaseRequest.toPurchaseRequest())
val bill = lottoWriter.saveBill(purchase.getId(), lottoPublish.getId(), authenticated.memberId)
return LottoBillData(
id = bill.getId()!!,
purchase = PurchaseData.from(purchase),
lottoPublish = LottoPublishData.from(lottoPublish)
)
이와같은 코드일때 결제가 성공했다고 판단
+ DB 반영
은 결제보다 앞에 해야할까? 뒤에 해야할까?
DB가 반영되어 다시 결제를 안보내게 하기 위해선 purchase
보다 complete
가 먼저가 되어야 한다. 하지만, 근본적으로 결제가 되었는데도 DB에 쿼리를 보내야만 한다.
이런 관점에선 Lock-Free
와 같은 메소드 진입 점부터 의도적인 멱등성 처리도 필요한거 같다. 다음은 다중 서버에서 동시성을 처리해볼 예정이다.