동시성을 고려하여 동기화가 필요한 경우


var count = 0
const val repeatCount = 1000000

fun main() = runBlocking {
    launch(Dispatchers.Default) {
        repeat(repeatCount) {
            count++
        }
        println(count)
    }

    launch(Dispatchers.Default) {
        repeat(repeatCount) {
            count++
        }
        println(count)
    }
    println("기대값 : ${repeatCount * 2}")
}
기대값 : 2000000
981272
1862564

2000000이라는 기대값과는 다르게 981272, 1862564이 결과로 출력되었습니다.

이는 두 개의 코루틴이 동기화를 하지 않고 count 변수에 대해 연산을 수행하기 때문에 발생하는 문제입니다.

kotlin-concurrency-동기화 필요성 drawio

Thread 2에서 count++를 수행한 결과 값이 반영되기 전에 Thread 3에서 read countcount 1을 읽게 되면서 꼬이기 시작합니다.

  • Race Condition(경쟁 상태)이라고 합니다.
    • 스레드 순서에 따라 결과가 달라지는 상황을 의미합니다.

해결책


여러 스레드 간에 공유되며 동시에 접근이 가능한 객체에 대해 어떠한 코드를 수행할 때 동기화를 해주는 것이 필요합니다.

1. AtomicInteger


원자적으로 연산을 수행합니다.

  • 원자적 연산이란, 연산이 수행되는 동안 다른 스레드가 접근하지 못하도록 하는 것을 의미합니다.
  • 하나의 기계어로만 연산을 수행하며, 해당 연산을 하고 있는 도중에는 같은 연산을 할 수 없어 값의 동기화를 성공적으로 수행할 수 있습니다.
  • 멀티 스레드 환경에서도 동기화되어 안전하게 사용할 수 있습니다.

val atomicCount = AtomicInteger(0)
atomicCount.incrementAndGet()

위와 같이 사용할 수 있습니다.

2. Mutex, Synchronized


하나의 스레드 혹은 코루틴만 공유 데이터에 접근할 수 있도록 lock을 걸어주는 방식입니다.

  • 상호 배제(Mutual Exclusion) 방식입니다.
  • 동작할 코드가 임계 구역에 있으면, 해당 구역에 대해서 하나의 스레드만 접근할 수 있게 합니다.
  • lock이 걸린 경우, 다른 스레드는 unlock이 될 때 까지 대기하거나, 해당 작업을 무시하고 다른 작업을 수행하도록 할 수 있습니다.
  • MutexSynchronized는 상호 배제 방식이라는 것에서 비슷합니다.
  • 차이점
    • Mutex
      • 코루틴을 일시 중지시킵니다. 스레드를 중지 시키지 않습니다.
      • 코루틴 사용에 최적화 되어 있습니다.
    • Synchronized
      • 스레드를 일시 중지시킵니다.
  • Atomic과의 차이점
    • Atomic
      • 스레드와 메인 메모리에 있는 값을 비교하여 서로 일치하지 않으면 동기화 시키고 연산을 수행한다.
      • lock을 하지 않는다.
    • 상호 배제
      • 임계 구역 진입 전에 스레드와 메인 메모리의 값을 동기화 시킨다.

kotlin-concurrency drawio


val mutex = Mutex()
var mutexCount = 0

launch(Dispatchers.Default) {
    repeat(repeatCount) {
        mutex.withLock {
            mutexCount++
        }

        // OR

        mutex.lock()
        try {
            mutexCount++
        } finally {
            mutex.unlock()
        }
    }
}

  • 위와 같이 mutex.withLock()을 사용하여 간단하게 사용이 가능합니다.
  • withLock()
    • 이미 lock이 걸린 경우, 대기합니다.
    • lock이 걸리지 않은 경우, lock을 걸고 작업을 수행합니다.
    • 작업이 끝나면 unlock 합니다.

3. 단일 스레드를 사용


동기화가 필요한 연산에 대해서 하나의 스레드로만 동작하도록 하는 방식입니다.


var singleThreadCount = 0
val singleThread = newSingleThreadContext("Concurrency")

launch(Dispatchers.Default) {
    repeat(repeatCount) {
        withContext(singleThread) {
            singleThreadCount++
        }
    }
}

  • newSingleThreadContext()를 사용하여 단일 스레드를 생성합니다.
  • withContext()를 사용하여 해당 스레드에서 작업을 수행합니다.
  • Default context <-> singleThread context 간에 지속적으로 전환이 발생하기 때문에 성능이 저하될 수 있습니다.

4. Actor


메시지를 전달받아 순차적으로 처리하는 방식으로 작동합니다.

  • actor에게 메시지를 전달하면, 해당 메시지를 순차적으로 처리합니다.
  • 연산하는 데이터는 actor에서만 접근이 가능합니다.
  • actor는 단일 스레드에서 동작하기 때문에 동기화 문제가 해결됩니다.

sealed interface Message

data object Increment : Message

class Result(val response: CompletableDeferred<Int>) : Message

@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<Message> {
    var counter = 0
    for (message in channel) {
        when (message) {
            is Increment -> counter++
            is Result -> message.response.complete(counter)
        }
    }
}

val actorCounter = counterActor()

launch(Dispatchers.Default) {
    repeat(repeatCount) {
        actorCounter.send(Increment)
    }
}

val response = CompletableDeferred<Int>()
actorCounter.send(Result(response))

  • actor()를 사용하여 actor를 생성합니다.
  • 해야하는 작업에 대해 메시지를 전달합니다.
  • actor에서 요청을 처리하는 것은 단일 스레드에서 동작하기 때문에 동기화 문제가 해결됩니다.

처리 방법 별 성능 비교

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureNanoTime

val mutex = Mutex()
var mutexCount = 0
var singleThreadCount = 0
var synchronizedCount = 0
var atomicCount = AtomicInteger(0)

@OptIn(DelicateCoroutinesApi::class)
val singleThread = newSingleThreadContext("Concurrency")

val syncLock = Any()

sealed interface Message

data object Increment : Message

class Result(val response: CompletableDeferred<Int>) : Message

val resultList = mutableListOf<Pair<String, Double>>()

@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<Message> {
    var counter = 0
    for (message in channel) {
        when (message) {
            is Increment -> counter++
            is Result -> message.response.complete(counter)
        }
    }
}

const val repeatCount = 1000000

@OptIn(ObsoleteCoroutinesApi::class)
fun main() = runBlocking {

    measureNanoTime {
        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                atomicCount.incrementAndGet()
            }
        }

        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                atomicCount.incrementAndGet()
            }
        }
    }.also {
        resultList.add("Atomic" to it / 1000000.0)
    }

    measureNanoTime {
        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                mutex.withLock {
                    mutexCount++
                }
            }
        }

        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                mutex.withLock {
                    mutexCount++
                }
            }
        }
    }.also {
        resultList.add("Mutex" to it / 1000000.0)
    }


    measureNanoTime {
        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                synchronized(syncLock) {
                    synchronizedCount++
                }
            }
        }

        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                synchronized(syncLock) {
                    synchronizedCount++
                }
            }
        }

        
        
    }.also {
        resultList.add("Synchronized" to it / 1000000.0)
    }

    measureNanoTime {
        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                withContext(singleThread) {
                    singleThreadCount++
                }
            }
        }

        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                withContext(singleThread) {
                    singleThreadCount++
                }
            }
        }
        
        
    }.also {
        resultList.add("Single Thread" to it / 1000000.0)
    }

    val actorCounter = counterActor()
    val response = CompletableDeferred<Int>()

    measureNanoTime {

        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                actorCounter.send(Increment)
            }
            actorCounter.send(Result(response))
        }

        launch(Dispatchers.Default) {
            repeat(repeatCount) {
                actorCounter.send(Increment)
            }
            actorCounter.send(Result(response))
        }
    }.also {
        resultList.add("Actor" to it / 1000000.0)
    }

    resultList.sortBy {
        it.second
    }

    resultList.forEach {
        println("${it.first} : ${it.second}ms")
    }

}
처리 방법 속도
Mutex 1.3443MS
Actor 1.4192MS
Synchronized 2.7625MS
Single Thread 5.9151MS
Atomic 13.4533MS