<코틀린 코루틴의 정석(조세영)> 도서의 "7장. 구조화된 동시성"을 참고해서 작성된 글입니다.
요약
구조화된 동시성이란, 비동기 코드를 계층화하여, 전체 작업을 작은 단위로 나누고, 부모-자식 관계로 연관 지어 관리하는 패턴입니다.
코루틴 생성 시 부모 코루틴의 CoroutineContext가 상속되지만, Job의 경우는 상속되지 않고 부모-자식의 참조로 사용됩니다.
코루틴의 취소는 CoroutineScope 범위 내의 자식 코루틴 방향으로만 전파되며, 부모 코루틴으로는 전파되지 않습니다.
새 Scope 또는 Job을 직접 생성해 사용하면 범위의 밖에서 실행되어, 구조화가 깨지고 취소 전파가 진행되지 않습니다.
1. 구조화된 동시성
구조화된 동시성(Structured Concurrency)
비동기 코드를 계층화하여, 전체 작업을 작은 단위로 나누고, 부모-자식 관계로 연관 지어 관리하는 패턴입니다.
주요 특징은 다음과 같습니다 :
- 부모 코루틴의 CoroutineContext 및 Dispatcher를 자식에게 상속합니다.
- 부모가 취소되면 모든 자식도 자동 취소되도록 전파됩니다.
- 부모는 자식이 완료될 때까지 대기하여 관리 누락을 방지합니다.
- CoroutineScope로 실행 범위를 제한하여, 실행 환경을 명확히 분리합니다.
2. 실행 환경의 상속
(1) 컨텍스트의 상속
부모 컨텍스트 상속
부모 코루틴이 자식 코루틴을 생성하면, 부모 코루틴의 CoroutineContext가 자식 코루틴에게 전달됩니다.
fun main() = runBlocking<Unit> {
val coroutineContext = newSingleThreadContext("MyThread") + CoroutineName("CoroutineA")
launch(coroutineContext){ // 부모 코루틴 생성
println("[${Thread.currentThread().name}] 부모 코루틴 실행")
launch { // 자식 코루틴 생성
println("[${Thread.currentThread().name}] 자식 코루틴 실행")
}
}
}
/*
[MyThread @CoroutineA#2] 부모 코루틴 실행
[MyThread @CoroutineA#3] 자식 코루틴 실행
*/
- CoroutineContext가 설정되지 않은 자식 코루틴에서도, 부모와 동일한 코루틴 이름을 확인할 수 있습니다.
컨텍스트 덮어쓰기
자식 코루틴을 생성하는 코루틴 빌더 함수로 새로운 CoroutineContext 객체가 전달되면 덮어씌울 수 있습니다.
fun main() = runBlocking<Unit> {
val coroutineContext = newSingleThreadContext("MyThread") + CoroutineName("ParentCoroutine")
launch(coroutineContext){ // 부모 코루틴 생성
println("[${Thread.currentThread().name}] 부모 코루틴 실행")
launch(CoroutineName("ChildCoroutine")) { // 자식 코루틴 생성
println("[${Thread.currentThread().name}] 자식 코루틴 실행")
}
}
}
/*
[MyThread @ParentCoroutine#2] 부모 코루틴 실행
[MyThread @ChildCoroutine#3] 자식 코루틴 실행
*/
- 이 과정에서 빌더에 명시된 CoroutineName은 덮어쓰이지만, 명시되지 않은 스레드는 그대로 부모 컨텍스트를 사용합니다.
(2) Job과 구조화
상속되지 않는 Job
다른 CoroutineContext 구성 요소들과 다르게, Job 객체는 상속되지 않고 코루틴 빌더 함수 호출시 새롭게 생성됩니다.
상속을 받게 되면 개별 코루틴의 제어가 어려워지기 때문입니다.
fun main() = runBlocking<Unit> { // 부모 코루틴 생성
val runBlockingJob = coroutineContext[Job] // 부모 코루틴의 CoroutineContext로부터 부모 코루틴의 Job 추출
launch { // 자식 코루틴 생성
val launchJob = coroutineContext[Job] // 자식 코루틴의 CoroutineContext로부터 자식 코루틴의 Job 추출
if (runBlockingJob === launchJob) {
println("runBlocking으로 생성된 Job과 launch로 생성된 Job이 동일합니다")
} else {
println("runBlocking으로 생성된 Job과 launch로 생성된 Job이 다릅니다")
}
}
}
/*
runBlocking으로 생성된 Job과 launch로 생성된 Job이 다릅니다
*/
구조화에 사용되는 Job
자식 코루틴이 부모 코루틴으로부터 전달받은 Job 객체는 상속은 받지 않지만, 코루틴 구조화에 활용됩니다.
자식 Job은 parent 프로퍼티로, 부모 Job은 children 리스트로 서로 참조를 유지합니다.
하나의 코루틴이 복수의 자식 코루틴을 가질 수 있지만, 부모 코루틴은 없거나 최대 하나를 가질 수 있습니다.
fun main() = runBlocking<Unit> { // 부모 코루틴
val parentJob = coroutineContext[Job] // 부모 코루틴의 CoroutineContext로부터 부모 코루틴의 Job 추출
launch { // 자식 코루틴
val childJob = coroutineContext[Job] // 자식 코루틴의 CoroutineContext로부터 자식 코루틴의 Job 추출
println("1. 부모 코루틴과 자식 코루틴의 Job은 같은가? ${parentJob === childJob}")
println("2. 자식 코루틴의 Job이 가지고 있는 parent는 부모 코루틴의 Job인가? ${childJob?.parent === parentJob}")
println("3. 부모 코루틴의 Job은 자식 코루틴의 Job을 참조를 가지는가? ${parentJob?.children?.contains(childJob)}")
}
}
/*
1. 부모 코루틴과 자식 코루틴의 Job은 같은가? false
2. 자식 코루틴의 Job이 가지고 있는 parent는 부모 코루틴의 Job인가? true
3. 부모 코루틴의 Job은 자식 코루틴의 Job을 참조를 가지는가? true
*/
(3) 완료 의존성
부모는 자식이 모두 완료될 때까지 “실행 완료 중” 상태를 유지합니다.
부모 코루틴은 모든 자식 코루틴이 실행 완료되야 완료될 수 있으며, 자식 코루틴에 대해 완료 의존성을 가집니다.
invokeOnCompletion 함수를 통해 실행 완료 혹은 취소 완료 시점에 대한 콜백을 받을 수 있습니다.
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val parentJob = launch { // 부모 코루틴 실행
launch { // 자식 코루틴 실행
delay(1000L) // 1초간 대기
println("[${getElapsedTime(startTime)}] 자식 코루틴 실행 완료")
}
println("[${getElapsedTime(startTime)}] 부모 코루틴이 실행하는 마지막 코드")
}
parentJob.invokeOnCompletion { // 부모 코루틴이 종료될 시 호출되는 콜백 등록
println("[${getElapsedTime(startTime)}] 부모 코루틴 실행 완료")
}
}
/*
[지난 시간: 3ms] 부모 코루틴이 실행하는 마지막 코드
[지난 시간: 1019ms] 자식 코루틴 실행 완료
[지난 시간: 1019ms] 부모 코루틴 실행 완료
*/
3. 코루틴의 취소 전파
취소는 자식 코루틴 방향으로만 전파되며, 부모 코루틴으로는 전파되지 않습니다.
특정 코루틴이 취소가 되면 하위의 모든 코루틴이 취소됩니다.
작업 중간에 부모 코루틴이 취소되면, 자식 코루틴의 작업이 더 이상 진행될 필요가 없어져 리소스 낭비를 막기 위해 취소됩니다.
fun main() = runBlocking<Unit> {
val parentJob = launch(Dispatchers.IO){ // 부모 코루틴 생성
val dbResultsDeferred: List<Deferred<String>> = listOf("db1","db2","db3").map {
async { // 자식 코루틴 생성
delay(1000L)
println("${it}으로부터 데이터를 가져오는데 성공했습니다")
return@async "[${it}]data"
}
}
val dbResults: List<String> = dbResultsDeferred.awaitAll() // 모든 코루틴이 완료될 때까지 대기
println(dbResults)
}
parentJob.cancel() // 부모 코루틴에 취소 요청
}
/*
(출력되지 않음)
*/
4. CoroutineScope를 사용한 코루틴 관리
CoroutineScope 객체
자신의 범위 내에서 생성된 코루틴들에게 실행 환경을 제공하고, 이들의 실행 범위를 관리하는 역할을 합니다.
(1) CoroutineScope 생성 방법
CoroutineScope Interface
코루틴의 실행 환경인 CoroutineContext를 가진 단순한 인터페이스를 구현하는 방법입니다.
// 내부 구현
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
class CustomCoroutineScope : CoroutineScope {
override val coroutineContext: CoroutineContext = Job() + newSingleThreadContext("CustomScopeThread")
}
fun main() {
val coroutineScope = CustomCoroutineScope() // CustomCoroutineScope 인스턴스화
coroutineScope.launch {
println("[${Thread.currentThread().name}] 코루틴 실행 완료")
}
}
/*
[CustomScopeThread @coroutine#1] 코루틴 실행 완료
*/
CoroutineScope 함수
CoroutineContext를 인자로 입력받아 함수로 CoroutineScope 객체를 생성하는 방법입니다.
이때 인자로 입력된 CoroutineContext에 Job 객체가 포함되어 있지 않으면, 새로운 Job 객체를 생성하도록 구현되어 있습니다.
// 내부 구현
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
fun main() {
val coroutineScope = CoroutineScope(Dispatchers.IO)
coroutineScope.launch {
println("[${Thread.currentThread().name}] 코루틴 실행 완료")
}
}
/*
[DefaultDispatcher-worker-1 @coroutine#1] 코루틴 실행 완료
*/
(2) CoroutineScope의 범위
CoroutineScope의 실행 환경 상속
CoroutineScope 수신 객체를 람다식 내부에서 this를 통해 접근할 수 있습니다. (this는 생략 가능합니다.)
fun main() {
customScope.launch(CoroutineName("LaunchCoroutine")) { // this: CoroutineScope
this.coroutineContext // LaunchCoroutine의 실행 환경을 CoroutineScope을 통해 접근
this.launch { // CoroutineScope으로부터 LaunchCoroutine의 실행 환경을 제공 받아 코루틴 실행
// 작업 실행
}
}
}
CoroutineScope에 속한 코루틴의 범위
코루틴 빌더로 생성된 코루틴과, 람다식 내에서 해당 CoroutineScope 객체를 사용해 실행되는 모든 코루틴이 범위에 포함됩니다.
이때, 내부에서 새로운 CoroutineScope를 새로 생성하면 새로운 Job 객체가 생성되어 기존의 계층 구조를 따르지 않게 됩니다.
fun main() = runBlocking<Unit> {
launch(CoroutineName("Coroutine1")) {
launch(CoroutineName("Coroutine2")) {
println("[${Thread.currentThread().name}] 코루틴 실행")
}
// CoroutineScope 새로 생성
CoroutineScope(Dispatchers.IO).launch(CoroutineName("Coroutine3")) {
println("[${Thread.currentThread().name}] 코루틴 실행")
}
}
}
- CoroutineScope로 새로 생성된 Coroutine3은, runBlocking 람다식의 CoroutineScope로 관리되는 범위에서 벗어납니다.
- 이 경우는 코루틴의 구조화를 깨기 때문에, 권장되지 않습니다.
(3) CoroutineScope의 취소 전파
CoroutineScope의 cancel
CoroutineScope 인터페이스는 확장 함수로 cancel 함수를 지원하며, 객체의 범위에 속한 모든 코루틴을 취소합니다.
coroutineContext 프로퍼티를 통해 Job 객체에 접근한 후 cancel을 호출합니다.
// 내부 구현
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ? : error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
동일한 범위 내 전파
코틀린의 구조화에 의해, 취소된 코루틴의 모든 자식 코루틴에게 취소가 전파됩니다.
fun main() = runBlocking<Unit> {
launch(CoroutineName("Coroutine1")) {
launch(CoroutineName("Coroutine3")) {
println("[${Thread.currentThread().name}] 코루틴 실행 완료")
}
launch(CoroutineName("Coroutine4")) {
println("[${Thread.currentThread().name}] 코루틴 실행 완료")
}
this.cancel() // Coroutine1의 CoroutineScope에 cancel 요청
}
launch(CoroutineName("Coroutine2")) {
println("[${Thread.currentThread().name}] 코루틴 실행 완료")
}
}
/*
[main @Coroutine2#3] 코루틴 실행 완료
*/
- CoroutineScope 객체의 범위에 속한 1, 3, 4는 실행 도중 취소되었지만, 속하지 않는 2는 끝까지 진행됩니다.
5. 구조화와 Job
새 스코프 또는 새 Job을 직접 생성해 사용하면 계층 바깥에서 실행되어, 루트가 기다리지 않아 조기 종료될 수 있습니다.
Job으로 구조화 깨기
fun main() = runBlocking<Unit> {
val rootJob = Job()
val newJob = Job()
launch(CoroutineName("Coroutine1") + rootJob) {
launch(CoroutineName("Coroutine2") + newJob) { // 새로운 Job 객체 입력
println("[${Thread.currentThread().name}] 코루틴 실행")
}
}
rootJob.cancel()
}
/*
[main @Coroutine2#2] 코루틴 실행
*/
- 새로운 Job 객체를 생성한 후 코루틴 빌더 함수에 추가하면, Coroutine2는 Coroutine1의 rootJob이 아닌 newJob을 부모 코루틴으로 작동하여 취소가 전파되지 않게 됩니다.
Job의 부모를 명시하기
Job을 생성하는 함수의 경우, parent 인자에 Job 객체를 넘기면 새로운 자식 Job을 생성할 수 있습니다.
// 내부 구현
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)
그러나, 생성된 Job은 자동으로 실행 완료되지 않습니다.
- launch로 생성된 Job : 더 이상 실행할 코드가 없고, 모든 자식 코루틴이 실행 완료되면 자동으로 실행을 종료합니다.
- Job()으로 생성된 Job : 자동으로 실행 완료되지 않고, 명시적으로 complete를 호출해야 합니다.
fun main() = runBlocking<Unit> {
launch(CoroutineName("Coroutine1")) {
val coroutine1Job = this.coroutineContext[Job] // Coroutine1의 Job
val newJob = Job(parent = coroutine1Job)
launch(CoroutineName("Coroutine2") + newJob) {
delay(100L)
println("[${Thread.currentThread().name}] 코루틴 실행")
}
}
}
/*
[main @Coroutine2#3] 코루틴 실행
(프로세스 종료 로그가 출력되지 않음)
*/
- newJob이 자동 종료되지 않아, 부모 코루틴들이 모두 ‘실행 완료 중’ 상태에서 대기하는 상태가 발생합니다.
- 이렇게 Job 객체를 직접 생성하는 경우, 문제가 발생할 수 있습니다.
이처럼 구조화된 동시성은 복잡한 비동기 로직을 부모-자식 계층 구조로 조직하여, 자동 취소 전파와 완료 의존성을 보장함으로써 안정적인 실행 환경을 제공합니다. CoroutineScope와 기본 제공 빌더만 활용해도 안전한 비동기 흐름을 손쉽게 구현할 수 있습니다.
'Coroutine' 카테고리의 다른 글
[Coroutine] 8. 코루틴의 동작 방식과 일시 중단 함수 (1) | 2025.05.20 |
---|---|
[Coroutine] 7. CoroutineExceptionHandler와 예외 전파 (1) | 2025.05.20 |
[Coroutine] 5. CoroutineContext 구성 요소 (0) | 2025.05.19 |
[Coroutine] 4. async와 withContext (0) | 2025.05.19 |
[Coroutine] 3. 코루틴 빌더, 취소, 상태, 그리고 Job (0) | 2025.05.19 |