Kotlin 동시성 문제 해결방법
Mutex, Actor, Synchronized 등
동시성을 고려하여 동기화가 필요한 경우
var count = 0
const val repeatCount = 1000000
fun main() = runBlocking {
launch(Dispatchers.Default) {
repeat(repeatCount) {
count++
}
println(count)
}
launch(Dispatchers.Default) {
repeat(repeatCount) {
count++
}
println(count)
}
println("기대값 : ${repeatCount * 2}")
}
기대값 : 2000000
981272
1862564
2000000이라는 기대값과는 다르게 981272, 1862564이 결과로 출력되었습니다.
이는 두 개의 코루틴이 동기화를 하지 않고 count 변수에 대해 연산을 수행하기 때문에 발생하는 문제입니다.
Thread 2에서 count++
를 수행한 결과 값이 반영되기 전에 Thread 3에서 read count
로 count 1
을 읽게 되면서 꼬이기 시작합니다.
- Race Condition(경쟁 상태)이라고 합니다.
- 스레드 순서에 따라 결과가 달라지는 상황을 의미합니다.
해결책
여러 스레드 간에 공유되며 동시에 접근이 가능한 객체에 대해 어떠한 코드를 수행할 때 동기화를 해주는 것이 필요합니다.
1. AtomicInteger
원자적으로 연산을 수행합니다.
- 원자적 연산이란, 연산이 수행되는 동안 다른 스레드가 접근하지 못하도록 하는 것을 의미합니다.
- 하나의 기계어로만 연산을 수행하며, 해당 연산을 하고 있는 도중에는 같은 연산을 할 수 없어 값의 동기화를 성공적으로 수행할 수 있습니다.
- 멀티 스레드 환경에서도 동기화되어 안전하게 사용할 수 있습니다.
val atomicCount = AtomicInteger(0)
atomicCount.incrementAndGet()
위와 같이 사용할 수 있습니다.
2. Mutex, Synchronized
하나의 스레드 혹은 코루틴만 공유 데이터에 접근할 수 있도록 lock을 걸어주는 방식입니다.
- 상호 배제(Mutual Exclusion) 방식입니다.
- 동작할 코드가 임계 구역에 있으면, 해당 구역에 대해서 하나의 스레드만 접근할 수 있게 합니다.
- lock이 걸린 경우, 다른 스레드는 unlock이 될 때 까지 대기하거나, 해당 작업을 무시하고 다른 작업을 수행하도록 할 수 있습니다.
- Mutex와 Synchronized는 상호 배제 방식이라는 것에서 비슷합니다.
- 차이점
- Mutex
- 코루틴을 일시 중지시킵니다. 스레드를 중지 시키지 않습니다.
- 코루틴 사용에 최적화 되어 있습니다.
- Synchronized
- 스레드를 일시 중지시킵니다.
- Mutex
- Atomic과의 차이점
- Atomic
- 스레드와 메인 메모리에 있는 값을 비교하여 서로 일치하지 않으면 동기화 시키고 연산을 수행한다.
- lock을 하지 않는다.
- 상호 배제
- 임계 구역 진입 전에 스레드와 메인 메모리의 값을 동기화 시킨다.
- Atomic
val mutex = Mutex()
var mutexCount = 0
launch(Dispatchers.Default) {
repeat(repeatCount) {
mutex.withLock {
mutexCount++
}
// OR
mutex.lock()
try {
mutexCount++
} finally {
mutex.unlock()
}
}
}
- 위와 같이
mutex.withLock()
을 사용하여 간단하게 사용이 가능합니다. withLock()
- 이미 lock이 걸린 경우, 대기합니다.
- lock이 걸리지 않은 경우, lock을 걸고 작업을 수행합니다.
- 작업이 끝나면 unlock 합니다.
3. 단일 스레드를 사용
동기화가 필요한 연산에 대해서 하나의 스레드로만 동작하도록 하는 방식입니다.
var singleThreadCount = 0
val singleThread = newSingleThreadContext("Concurrency")
launch(Dispatchers.Default) {
repeat(repeatCount) {
withContext(singleThread) {
singleThreadCount++
}
}
}
newSingleThreadContext()
를 사용하여 단일 스레드를 생성합니다.withContext()
를 사용하여 해당 스레드에서 작업을 수행합니다.- Default context <-> singleThread context 간에 지속적으로 전환이 발생하기 때문에 성능이 저하될 수 있습니다.
4. Actor
메시지를 전달받아 순차적으로 처리하는 방식으로 작동합니다.
- actor에게 메시지를 전달하면, 해당 메시지를 순차적으로 처리합니다.
- 연산하는 데이터는 actor에서만 접근이 가능합니다.
- actor는 단일 스레드에서 동작하기 때문에 동기화 문제가 해결됩니다.
sealed interface Message
data object Increment : Message
class Result(val response: CompletableDeferred<Int>) : Message
@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<Message> {
var counter = 0
for (message in channel) {
when (message) {
is Increment -> counter++
is Result -> message.response.complete(counter)
}
}
}
val actorCounter = counterActor()
launch(Dispatchers.Default) {
repeat(repeatCount) {
actorCounter.send(Increment)
}
}
val response = CompletableDeferred<Int>()
actorCounter.send(Result(response))
actor()
를 사용하여 actor를 생성합니다.- 해야하는 작업에 대해 메시지를 전달합니다.
actor
에서 요청을 처리하는 것은 단일 스레드에서 동작하기 때문에 동기화 문제가 해결됩니다.
처리 방법 별 성능 비교
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureNanoTime
val mutex = Mutex()
var mutexCount = 0
var singleThreadCount = 0
var synchronizedCount = 0
var atomicCount = AtomicInteger(0)
@OptIn(DelicateCoroutinesApi::class)
val singleThread = newSingleThreadContext("Concurrency")
val syncLock = Any()
sealed interface Message
data object Increment : Message
class Result(val response: CompletableDeferred<Int>) : Message
val resultList = mutableListOf<Pair<String, Double>>()
@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<Message> {
var counter = 0
for (message in channel) {
when (message) {
is Increment -> counter++
is Result -> message.response.complete(counter)
}
}
}
const val repeatCount = 1000000
@OptIn(ObsoleteCoroutinesApi::class)
fun main() = runBlocking {
measureNanoTime {
launch(Dispatchers.Default) {
repeat(repeatCount) {
atomicCount.incrementAndGet()
}
}
launch(Dispatchers.Default) {
repeat(repeatCount) {
atomicCount.incrementAndGet()
}
}
}.also {
resultList.add("Atomic" to it / 1000000.0)
}
measureNanoTime {
launch(Dispatchers.Default) {
repeat(repeatCount) {
mutex.withLock {
mutexCount++
}
}
}
launch(Dispatchers.Default) {
repeat(repeatCount) {
mutex.withLock {
mutexCount++
}
}
}
}.also {
resultList.add("Mutex" to it / 1000000.0)
}
measureNanoTime {
launch(Dispatchers.Default) {
repeat(repeatCount) {
synchronized(syncLock) {
synchronizedCount++
}
}
}
launch(Dispatchers.Default) {
repeat(repeatCount) {
synchronized(syncLock) {
synchronizedCount++
}
}
}
}.also {
resultList.add("Synchronized" to it / 1000000.0)
}
measureNanoTime {
launch(Dispatchers.Default) {
repeat(repeatCount) {
withContext(singleThread) {
singleThreadCount++
}
}
}
launch(Dispatchers.Default) {
repeat(repeatCount) {
withContext(singleThread) {
singleThreadCount++
}
}
}
}.also {
resultList.add("Single Thread" to it / 1000000.0)
}
val actorCounter = counterActor()
val response = CompletableDeferred<Int>()
measureNanoTime {
launch(Dispatchers.Default) {
repeat(repeatCount) {
actorCounter.send(Increment)
}
actorCounter.send(Result(response))
}
launch(Dispatchers.Default) {
repeat(repeatCount) {
actorCounter.send(Increment)
}
actorCounter.send(Result(response))
}
}.also {
resultList.add("Actor" to it / 1000000.0)
}
resultList.sortBy {
it.second
}
resultList.forEach {
println("${it.first} : ${it.second}ms")
}
}
처리 방법 | 속도 |
---|---|
Mutex | 1.3443MS |
Actor | 1.4192MS |
Synchronized | 2.7625MS |
Single Thread | 5.9151MS |
Atomic | 13.4533MS |