coroutines
Для реализации асинхронной логики и выделения работы на отдельные потоки на наших проектах используется kotlinx.coroutines версии native-mt.
Материалы
Beginner
📄 KotlinLang Docs - Coroutines guide
- Coroutines basics
- Beginner
launch
,delay
,runBlocking
,Structured
concurrency
,coroutineScope
,Job
,Scope builder
- создание первой корутины и suspend функции, запуск корутин параллельно, обращение к корутине через объект Job
- корутины не потеряются и не будет утечек, не потеряются ошибки в корутинах (Structured concurrency)
- отличия между runBlocking и coroutineScope (Scope builder)
- обращение к корутине через объект Job (explicit job)
- пример легковестности корутин по сравнению с потоками
- Beginner
- Cancellation and timeouts
- Beginner
Job
,cancel
,cancelAndJoin
,isActive
,ensureActive
,NonCancellable
,withTimeout
,withTimeoutOrNull
,CancellationException
,TimeoutCancellationException
- в корутине нужно проверять, хотят ли её отменить
- проверка на отмену происходит в каждом suspension point, чтобы проверять чаще используйте: ensureActive. Или проверяйте статус Job с помощью флага isActive (Making computation code cancellable)
- пример правильной и неправильной отмены корутины (Cancellation is cooperative)
- работа корутины по time-out (Timeout)
- работа с ресурсами внутри withTimeout блока, пример проблемы и решения (timeout and resources)
- Beginner
- Composing suspending functions
Beginner
async
,Deferred
,Job
,await
,start
примеры последовательного и асинхронного запуска
отложенный запуск корутины, отличия между запуском start и await (Lazy started async)
корутина, возвращающая значение - async, Deferred
хороший стиль объявления async-функции (Async-style functions)
пример возникновения ошибок в coroutineScope (Structured concurrency with async)
Intermediate
- Coroutine context and dispatchers
- Beginner
CoroutineContext
,CoroutineScope
,Dispatcher
,Unconfined
,newSingleThreadContext
,Job
,join
,asContextElement
- про Dispatcher и как он связаны с потоками (Dispatchers and threads)
- отладка корутин, используя дебаггер в IDEA или логгирование, CoroutineName (Debugging with IDEA)
- работа корутин в разных потоках, освобождение создонного потока (Jumping between threads)
- как получить Job из контекста (Job in the context)
- дочерние и родительские корутины
- описание CoroutineScope
- как использовать несколько контекстов для создания, неассоциативный + (Combining context elements)
- передача локальных данных потока в корутины или между ними (Thread-local data)
- Beginner
- Asynchronous Flow
- Beginner
Flow
,emit
,collect
,flowOf
,asFlow
,operators
,cold
,hot
,builders
,cancel
- что такое Flow, операторы Flow, остановка Flow
- холодные и горячие Flow
- Flow builders
- Intermediate
transform
,take
,numbers
,toList
,toSet
,first
,single
,reduce
,fold
,flow
,flowOn
,buffer
,conflate
,collectLatest
,zip
,combine
,flatMapConcat
,flattenConcat
,flatMapMerge
,flattenMerge
,flatMapLatest
,try catch
,finally
,onCompletion
,launchIn
- Beginner
- Channels
- Beginner
Channel
,send
,receive
,produce
,consumeEach
,capacity
,buffer
,tick
- что такое Channel, типы каналов
- как происходит закрытие канала (Closing and iteration over channels)
- паттерн производитель/потребитель (Building channel producer)
- паттерн конвеер (Pipeline)
- работа с каналом из нескольких корутин одновременно, справедливая работа канала (Fan-in, Fun-out, Channels are fair)
- емкость канала
- Beginner
- Coroutine exceptions handling
- Beginner
cancel
,CancellationException
,CoroutineExceptionHandler
,SupervisionJob
,SupervisionScope
- обработка ошибок в корутинах (Exception propagation)
- настраиваемый обработчик ошибок (CoroutineExceptionHandler)
- игнорируемое исключение CancellationException при завершении корутины (Cancellation and exceptions)
- обработка нескольких исключений от дочерних корутин (Exceptions aggregation)
- SupervisionJob и SupervisionScope
- Beginner
- Shared mutable state and concurrency
- Beginner
shared mutable state
,Thread-safe
- проблема изменения общего значения из параллельных корутин
- решения проблемы: потокобезопасные типы данных, ограничение потоков, mutex (Thread-safe data structures, fine-grained, coarse-grained )
- пример потраченного времени на переключение между потоками
- Intermediate
- что такое mutex (Mutual exclusion)
- Advanced
volatile
,Threadsafe
,mutex
,actor
- Actors
- Beginner
- Debug coroutines using IntelliJ IDEA – tutorial - как дебажить корутины в IDEA
- Debug Kotlin Flow using IntelliJ IDEA – tutorial - как дебажить flow в IDEA
- Coroutines basics
coroutine
,Thread
,suspend
,runBLocking
,launch
,join
,async
,await
,Deferred
,CoroutineContext
,CoroutineDispatсher
,Thread safe
,Actor
,Channel
,send
,receive
- Beginner
- отличия потоков в Java с корутинами в Kotlin
- два простых примера с пояснениями (runBlocking, launch, suspend, join, async, await, Deferred)
- CoroutineContext и CoroutineDispatсher - что это и зачем
- Thread-safe, Actor и Channel
📄 Roman Elizarov - Structured Concurrency
Structured Concurrency
,coroutineScope
- Beginner
- как пришли к Structured Concurrency, какие проблемы решает, какие преимущества дает
- корутины не глобальные, как потоки. Они связаны со своим скоупом, который представляет собой объект с ограниченным временем жизни. Например элемент UI.
🎦 KotlinConf 2019: Asynchronous Data Streams with Kotlin Flow by Roman Elizarov
📄 Structured Concurrency in action! (using Kotlin coroutines)
Structured Concurrency
,cancellation
,Job
,isActive
,CoroutineScope
,CoroutineContext
- Beginner
- основные концепции Structured Concurrency
- описание поведения отмены, примеры
- Параллельная декомпозиция, какие проблемы решает CoroutineScope
- CoroutineScope и CoroutineContext, в чем разница? (Scope - область действия корутины, Context - элементы жизненного цикла корутины, переменные и константы, с которыми она работает)
- демонстрация нарушения Structured Concurrency, если корутине передать Context, отличающающийся от контекста родительского CoroutineScope этой корутины
🎦 Александр Нозик. Кое-что о корутинах
- Beginner
Deferred
,Structured Cuncurrency
,Job
,CoroutineScope
,GlobalScope
,cancel
,- 22:55 хорошее объяснение, как ведут себя корутины при передаче Deferred
- 29:32 проблема с потоками, почему они занимают много памяти
- 52:49 пример проблемы в корутинах, которую решает Structured Concurrency
- 57:38 Job позволяет работать с результатом корутины, вырубать корутину и еще всякое, но это не сама корутина
- 1:00:42 CoroutineScope
- 1:5:28 GlobalScope, это пустышка, нет контекста, нет родителей, не знает о других запусках из GlobalScope, поэтому нарушается Structured Cuncurrency, некуда прокинуть результат и прочее
- 1:25:20 Если корутина была закрыта с помощью cancel, то закрывает только детей, родителей не трогает
- 1:26:57 Поведение при возникновении ошибки: если в корутине возникла ошибка, то она закрывает себя, сообщает родителю что случилось, закрывает всех потомков
- 1:34:10 Нельзя закрывать корутину где угодно, нужно делать в точках расщепления, где проверяется, что она не закрыта
- Beginner
🎦 Александр Нозик. Кое-что о корутинах. Flow, Scope
- Видео хорошее, рекомендуется к просмотру целиком (x1.5)
- Beginner
CoroutineContext
,EmptyCoroutinesContext
,Job
,GlobalScope
launch
,newCoroutinesContext
,cancel
,supervisorScope
,async
,await
,Deferred
,Channel
,isActive
,Flow
,cold
,hot
- 8:15 CoroutineContext
- 12:55 используя EmptyCoroutinesContext вы никак не пользуетесь преимуществами StructuredConcurrency
- для использования StructuredConcurrency необходим объкет Job, который будет помнить детей и родителя этой корутины
- 15:29 GlobalScope - пустышка, хранит и использует EmptyCoroutinesContext
- 19:59 разбор launch, newCoroutinesContext
- 30:00 способ завершить все корутины - отменить корневой Scope
- 34:12 нельзя держать какой-нибудь Scope и периодический давать ему задачи
- 45:26 supervisorScope - когда внутри происходит ошибка, он не обрушивает Scope-родительского
- 52:31 async похож на Job, отличие в том, что он возвращает Deferred, await() - позволяет использовать результат, когда он будет получен.
- 55:11 вместо async можно использовать suspend функцию, единственное отличие в том, что async можно отменить, в отличие от suspend функции
- 57:11 Channel, служит для обмена данными между корутинами, виды каналов
- 57:49 while(isActive){...} - проверяет, отменена ли данная корутина, и если она отменена, то не входит в новую итерацию цикла. Если использовать while(true){...}, то тот же самый эффект будет в ближайшем suspension point
- 1:02:41 типы каналов
- 1:10:42 Flow
- Intermediate
- 8:43 CoroutinesContext, отличия от map - строгое типизирование ( если использовать ключ <Е>, то получите объект типа Е ), сумма двух CoroutineContext не ассоциотивна ( CoroutineContext1 + CoroutineContext2 != CoroutineContext2 + CoroutineContext1 т.к если в CoroutineContext1 есть ключи из CoroutineContext2, то при сумме значения по этим ключам перезапишутся из CoroutineContext2 )
📄 Hands-on: Intro to coroutines and channels
Channel
- Beginner
- Как работает Channel - очень хороший разбор работы канала с примером
🎦 RedMadRobot - Coroutines. Хаотичное изучение. Часть 1
Dispatcher
,withContext
,NonCancellableContext
,viewModelScope
,suspend
,main-thread
- Beginner
🎦 RedMadRobot - Coroutines. Хаотичное изучение. Часть 2
Scope
,CoroutineContext
,Job
,Dispatcher
,CoroutineName
,CoroutineExceptionHandler
,CoroutinesScope
,SupervisorScope
,Deferred
,async
,await
cancellable
- Beginner
- 3:05 CoroutinesScope создает новый Scope, копирует все из Scope-родителя и исполняет переданный ему блок
- 3:59 supervisorScope не упадет, если упадет ребенок. Если упадет сам Scope, то не упадет Scope-родитель
- 5:11 Job - фоновая работа, имеет ЖЦ(active, cancelled). Job предоставляет управление корутиной, можем вызвать Job.cancel и тд, не имеет результата
- 7:06 Deferred это Job, но с результатом, создается myScope.async{...}. Получить значение - myDeferred.await() - вернет занечение или исплючение при ошибке
- 8:55 как запустить работу во внешнем Scope, что делать, если нам не подходит ViewModelScope, потому что работа должна жить дольше чем ViewModel
- 15:21 что такое CoroutineContext, что в нем может находиться (Job, CoroutineDispatcher, CoroutineName, CoroutineExceptionHandler)
- 16:57 разница между CoroutineScope и CoroutineContext
- 19:32 когда Job отменили, корутина не отменится автоматичеки, разбор кейса
🎦 RedMadRobot - Coroutines. Хаотичное изучение. Часть 3
Channel
,capacity
,close
,trySend
,Flow
,buffer
,conflate
,SharedFlow
,hot
,cold
,shareIn
,whileSubscribed
,timeout
,Job
,lifecycle
,repeatOnLifecycle
,DESTROYED
,flowWithLifecycle
- Beginner
- 7:50 Channel - канал для обмена, можно положить и получить, не блокирующий (оперции саспендятся), можно закрыть, разные capacity(RENDEZVOUS, UNLIMITED, CONFLATED, BUFFERED)
- 3:38 SingleLiveEvent, что это и зачем (события нужно обрабатывать только один раз)
- 11:30 SingleLiveEvent используя Channel
- 15:52 как добавить к flow буффер на случай медленного получения
- 17:25 conflate, сокращение для buffer с параметрами CONFLATED и DROP_OLDEST, т.е хранит одно значение и перезаписыват его
- 18:07 flowOn переключает контекст выполнения операторов идущих до него, если операторы без своего контекста
- 20:07 shareIn превращает холодный Flow в горячий SharedFlow
- 21:52 WhileSubscribed запускает корутину при первом подписчике, остонавливает когда пропадет последний подписчик, можно сохранить кэш при выключении или стереть после timeout
- 23:12 На что можно заменить Job.cancel()? можно Lifecycle.repeatOnLifecycle
- 23:39 Lifecycle.repeatOnLifecycle запускает корутину, когда ЖЦ подходит до определенного состояния, когда ЖЦ ниже нужного состояния - отменяет корутину и усылпяет(suspend), когда снова в нужном состоянии - запускает повторно
- 24:42 особенности repeatOnLifecycle, рекомендуется создавать либо в Activity.onCreate() или Fragment.onViewCreated()
- 25:22 поведение при DESTROYED, держет в suspend внешнюю корутину, пока не DESTROYED, когда DESTROYED - отпустит
- 26:20 Flow.flowWithLifecycle это обертка над repeatOnLifecycle, упрощает написание если только 1 продюсер
📄 Guide to UI programming with coroutines
UI
,dispatcher
,context
,Dispatchers.Main
,Dispatchers.JavaFx
,Dispatchers.Swing
,UI coroutine
,cancel UI coroutine
,actor
,RendezvousChannel
,capacity
,ConflatedChannel
,Channel.UNLIMITED
,UI freeze
,Structured concurrency
,lifecycle
,parent-child hierarchy
📄 Best practices for coroutines
Dispatcher
,suspend
,ViewModel
,mutable
,Flow
,test
,TestCoroutineDispatcher
,GlobalScope
,cancel
,cancellable
,ensureActive
- Intermediate
- почему не нужно хардкодить Dispatcher
- suspend функции должны быть безопасны для основного потока, т.е. классы, вызывающие suspend функции не должны беспокоиться о том, какой Dispatcher использовать, эта ответственность лежит на классе, который выполняет эту работу
- ViewModel должен создавать корутины, а не suspend-функции
- предоставляйте неизменяемые типы другим классам
- для классов данных и бизнес-уровня необходимы должны предоставлять suspend функции для одноразовых вызовов и Flow для изменяемых данных
- используйте TestCoroutineDispatcher в тестах
- избегайте GlobalScope (это неконтролируемая область, очень усложняет тестирование, нет обзего CoroutineContext)
- suspend функции должны быть cancellable
📄 Ограничения native-mt версии для iOS таргета.
- single, thread, dispatcher, context, worker, GlobalScope, withContext, freeze, Flow, Channel, Deferred, mutable, Mutex, Semaphore, DetachedObjectGraph
- Intermediate
- все основные объекты связи (Job, Deferred, Channel, BroadcastChannel, Mutex) могут быть замороженны
- любой объект, который передается через Channel или Flow автоматически замораживается
CodeLab от JetBrains с основами применения механизмов корутин и каналов
Advanced
- 🎦 Андрей Бреслав — Асинхронно, но понятно. Сопрограммы в Kotlin
- Немного устаревшее, но про внутрянку наглядно показано.
- 🎦 Корутины в Kotlin — Роман Елизаров, JetBrains
- 📄 Coroutines Codegen
- Документ подробно описывающий что делает компилятор с suspend кодом и что генерируется в результате
Highlights
Внутреннее устройство
Мы не стремимся здесь дать полное объяснение того, как сопрограммы работают под капотом, но примерный смысл того, что происходит, очень важен.
Сопрограммы полностью реализованы с помощью технологии компиляции (поддержка от языковой виртуальной машины, среды исполнения, или операционной системы не требуется), а приостановка работает через преобразование кода. В принципе, каждая функция приостановки (оптимизации могут применяться, но мы не будем вдаваться в эти подробности здесь) преобразуется в конечный автомат, где состояния соответствуют приостановленным вызовам. Прямо перед приостановкой следующее состояние загружается в поле сгенерированного компилятором класса вместе с сопутствующими локальным переменными и т. д. При возобновлении сопрограммы локальные переменные и состояние восстанавливаются, и конечный автомат продолжает свою работу.
Приостановленную сопрограмму можно сохранять и передавать как объект, который хранит её приостановленное состояние и локальные переменные. Типом таких объектов является Continuation, а преобразование кода, описанное здесь, соответствует классическому Continuation-passing style. Следовательно, приостановливаемые функции принимают дополнительный параметр типа Continuation (сохранённое состояние) под капотом.
Более детально о том, как работают сопрограммы, можно узнать в этом проектном документе. Похожие описания async / await в других языках (таких как C# или ECMAScript 2016) актуальны и здесь, хотя особенности их языковых реализаций могут существенно отличаться от сопрограмм Kotlin.
Пример преобразования кода на этапе компиляции:
Исходник:
dummy() // suspend
println(1)
dummy() // suspend
println(2)
При компиляции преобразуется в:
val $result: Any? = null
when (this.label) {
0 -> {
this.label = 1
$result = dummy(this)
if ($result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
goto 1
}
1 -> {
println(1)
this.label = 2
$result = dummy(this)
if ($result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
goto 2
}
2 -> {
println(2)
return Unit
}
else -> {
throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
}
Более подробно можно прочитать в документе
Также можно прочитать статью или посмотреть видео.
CoroutineScope
CoroutineScope задает область действия корутины. Именно используя скоуп производится остановка всех отложенных операций привязанных к некоторому жизненному циклу.
Например, у каждой ViewModel
есть viewModelScope
- это CoroutineScope
, который создается при
создании ViewModel
и отменяется при вызове onCleared
.
Рассмотрим небольшую тестовую ViewModel
:
class TestViewModel: ViewModel() {
init {
viewModelScope.launch {
delay(10000)
println("work done!")
}
}
}
Если мы зайдем на экран, использующий эту ViewModel
, и сразу уйдем (не дожидаясь 10 секунд), то
сообщение не будет выведено в лог. А если подождем 10 секунд - будет. Это как раз потому что при
вызове onCleared
отменяется CoroutineScope
, а вместе с ним и все его корутины. А вызов onCleared
происходит при окончательном уходе с экрана (прямо перед удалением ViewModel
).
Также важно понимать что скоуп может быть вложенным (любой уровень вложенности поддерживается).
Для создания вложенного скоупа используется билдер coroutineScope
. При отмене родительского скоупа
отменяются и все вложенные.
Вложенный скоуп может пригодиться в случаях, когда вам не доступен класс текущего CoroutineScope
,
но нужно вызвать один из билдеров (launch
/async
).
suspend fun doSomeParallelWork() {
coroutineScope {
val task1 = async { callFirstRequest() }
val task2 = async { callSecondRequest() }
awaitAll(task1, task2)
}
}
Все корутины принадлежат какому либо скоупу. Скоуп либо создается вручную
(вызовом CoroutineScope
), либо используется GlobalScope
, который не имеет ограничения жизненного
цикла - живет все время жизни процесса приложения, поэтому не рекомендуется к использованию без
твердой уверенности что это верно.
Использование async
При запуске асинхронных задач (вызов async
) важно учитывать, что возникновение ошибки внутри задачи будет вызывать отмену текущего Scope. Поэтому важно запускать дочерний скоуп используя coroutineScope
для выполнения асинхронных задач - тогда ошибка выполнения этих задач не сломает основной CoroutineScope
, а будет выброшена как результат coroutineScope
и сможет обработаться try-catch
.
Например в следующем коде:
suspend fun main() {
coroutineScope {
launch {
try {
listOf<Deferred<Unit>>(
async { TODO() },
async { TODO() }
).awaitAll()
} catch(exc: Throwable) {
println(exc.stackTraceToString())
}
}
}
}
Блок catch
не отловит ошибку, произойдет вылет приложения.
suspend fun main() {
coroutineScope {
launch {
try {
coroutineScope {
listOf<Deferred<Unit>>(
async { TODO() },
async { TODO() }
).awaitAll()
}
} catch(exc: Throwable) {
println(exc.stackTraceToString())
}
}
}
}
coroutineScope
это suspend-функция, которая создаёт новый скоуп и идёт дальше только после его завершения и не привязывается к родительскому скоупу. Когда скоуп, созданный coroutineScope
, упал, падает сама suspend-функция, а не весь родительский scope.
CoroutineContext
Корутина, запущенная другой, наследует весь контекст той, внутри которой она запустилась и становится для нее дочерней. Если остановить родителскую корутину, то остановятся и все дочерние корутины. Способы переопределения дефолтного поведения наследования:
- явно указать DifferentScope.launch
- передать другой объект Job в качестве контекста корутины
В обоих случаях корутина не привяжется к области, из которой она была запущенна.
Также, родительская корутина всегда дожидается завершения дочерних корутин, для этого не требуется явно отслеживать всех потомков или делать Job.join
Dispatcher
Dispatcher определяет, какой поток или потоки использует корутина для выполнения. Может ограничить выполнение корутины одним потоком, отправить корутину в пулл потоков или никак ее не ограничивать (None, Dispatchers.Unconfined, Dispatchers.Default, newSingleThreadContext)
Механика delay
По началу delay
все интерпретируют как Thread.sleep
и считают что текущий поток будет остановлен
на N миллисекунд.
Но это не так
. Рассмотрим следующий блок кода:
suspend fun startTimer() {
println("show message at start")
delay(1000)
println("show message after second")
}
При компиляции данный код будет преобразован (если сильно упростить) в нечто похожее на:
fun startTimer() {
println("show message at start")
delayCallback(1000) {
println("show message after second")
}
}
То есть вместо вызова sleep
на весь поток, вся работа после suspend-point (delay
это suspend
функция), будет "завернута в callback", закинута в очередь текущего потока (через Dispatcher
корутин) и данный калбек будет выполнен через секунду (когда будет получен из очереди диспатчером).
Вывод - вызов delay
не останавливает работу потока.
Более детальное сравнение delay
и Thread.sleep
можете прочитать в статье
Механика отмены канала
Для закрытия канала используется метод close() Он посылает специальный токен закрытия и итерация получения элементов из канала остановится, когда токен будет получен Это дает гарантию, что все элементы, отправленные до закрытия, будут получены
Coroutineexceptionhandler
Дочерние корутины прокидывают свои необработанные ошибки родительской, и так вплоть до корневой. Они не используют ExceptionHandler, установленный в их области действия
Если несколько дочерних корутин выбрасывают исклюения одновременно, то пробрасывается и обрабатывается только первое, а остальные присоединяются к первому как подавленные
Supervision
Используя SupervisionJob при неудачном завершении дочерней корутины завершение не распростанится на родительскую корутину и других детей Supervision scope распространяет отмену только в отношении дочерних корутин, отменяет всех только если сам завершился с ошибкой. Ожидает завершения всех детей, так же как и coroutineScope. Каждый дочерний элемент обязан самостоятельно обрабатывать свои исключения. Корутины внутри SupervisionScope используют ExceptionHandler установленный в их области действия
Операторы Flow
- Intermediate flow operators - основыные операторы похожи на map и filter, отличие от последовательностей в том, что внутри этих операторов можно вызвать suspend функцию
- Transform operator - применяет функцию в блоке transform для какждого значения
- Size-limiting operators - отменяют выполнение потока когда лимит достигнут. Отмена происходит с помощью испключения, так что все блоки завершатся корректно
- Terminal flow operators - toList, toSet, first, reduce, fold (про first и reduce не очень понятно)
- Flows are sequential - каждый сбор flow по дефолту выполняется последовательно, операторы коллекции выполняются в той же корутине, в которой они были запущены, новые не создаются, каждое переданное значение выполняется всеми промежуточными операторами
- Flow context - сбор flow всегда проихсодит в контексте вызывающей корутины
- Wrong emission withContext - нельзя менять контекст внутри flow
- flowOn operator - приавильный способ изменить контекст для собирания flow
- Buffering - буферизует выбросы потока через канал указанной емкости и запускает сборщик в отдельной сопрограмме.
- Conflation - emitter не приостановится из-за медленного коллектора, а удерживает свои элементы, пока коллектор их не запросит, а когда запросит, отправит самый новый элемент из тех, которые накопиились, потом заново начнет копить
- Processing the latest value - когда flow выдает новое значениеЮ блок действий для старого значения отменяется
Check Yourself
TODO