Skip to main content

coroutines

Для реализации асинхронной логики и выделения работы на отдельные потоки на наших проектах используется kotlinx.coroutines версии native-mt.

Материалы

Beginner

  • 📄 KotlinLang Docs - Coroutines guide

    • Coroutines basics
      • Beginner
        • launch, delay, runBlocking, Structured concurrency, coroutineScope, Job, Scope builder
        • создание первой корутины и suspend функции, запуск корутин параллельно, обращение к корутине через объект Job
        • корутины не потеряются и не будет утечек, не потеряются ошибки в корутинах (Structured concurrency)
        • отличия между runBlocking и coroutineScope (Scope builder)
        • обращение к корутине через объект Job (explicit job)
        • пример легковестности корутин по сравнению с потоками
    • Cancellation and timeouts
      • Beginner
        • Job, cancel, cancelAndJoin, isActive, yield, NonCancellable, withTimeout, withTimeoutOrNull, CancellationException, TimeoutCancellationException
        • в корутине нужно проверять, хотят ли её отменить
        • проверка на отмену происходит в каждом suspension point, чтобы проверять чаще используйте: isActive, yield (Making computation code cancellable)
        • пример правильной и неправильной отмены корутины (Cancellation is cooperative)
        • работа корутины по time-out (Timeout)
        • работа с ресурсами внутри withTimeout блока, пример проблемы и решения (timeout and resources)
    • Composing suspending functions
      • Beginner
        • async, Deferred, Job, await, start
        • примеры последовательного и асинхронного запуска
        • отложенный запуск корутины, отличия между запуском start и await (Lazy started async)
        • корутина, возвращающая значение - async, Deferred
        • хороший стиль объявления async-функции (Async-style functions)
        • пример возникновения ошибок в coroutineScope (Structured concurrency with async)
    • Coroutine context and dispatchers
      • Beginner
        • CoroutineContext, CoroutineScope, Dispatcher, Unconfined, newSingleThreadContext, Job, join, asContextElement
        • про Dispatcher и как он связаны с потоками (Dispatchers and threads)
        • отладка корутин, используя дебаггер в IDEA или логгирование, CoroutineName (Debugging with IDEA)
        • работа корутин в разных потоках, освобождение создонного потока (Jumping between threads)
        • как получить Job из контекста (Job in the context)
        • дочерние и родительские корутины
        • описание CoroutineScope
        • как использовать несколько контекстов для создания, неассоциативный + (Combining context elements)
        • переача локальных данных потока в корутины или между ними (Thread-local data)
    • Asynchronous Flow
      • Beginner
        • Flow, emit, collect, flowOf, asFlow, operators, cold, hot, builders, cancel
        • что такое Flow, операторы Flow, остановка Flow
        • холодные и горячие Flow
        • Flow builders
      • Intermediate
        • transform, take, numbers, toList, toSet, first, single, reduce, fold, flow, flowOn, buffer, conflate, collectLatest, zip, combine, flatMapConcat, flattenConcat, flatMapMerge, flattenMerge, flatMapLatest, try catch, finally, onCompletion, launchIn
    • Channels
      • Beginner
        • Channel, send, receive, produce, consumeEach, capacity, buffer, tick
        • что такое Channel, типы каналов
        • как происходит закрытие канала (Closing and iteration over channels)
        • паттерн производитель/потребитель (Building channel producer)
        • паттерн конвеер (Pipeline)
        • работа с каналом из нескольких корутин одновременно, справедливая работа канала (Fan-in, Fun-out, Channels are fair)
        • емкость канала
    • Coroutine exceptions handling
      • Beginner
        • cancel, CancellationException, CoroutineExceptionHandler, SupervisionJob, SupervisionScope
        • обработка ошибок в корутинах (Exception propagation)
        • настраиваемый обработчик ошибок (CoroutineExceptionHandler)
        • игнорируемое исключение CancellationException при завершении корутины (Cancellation and exceptions)
        • обработка нескольких исключений от дочерних корутин (Exceptions aggregation)
        • SupervisionJob и SupervisionScope
    • Shared mutable state and concurrency
      • Beginner
        • shared mutable state, Thread-safe
        • проблема изменения общего значения из параллельных корутин
        • решения проблемы: потокобезопасные типы данных, ограничение потоков, mutex (Thread-safe data structures, fine-grained, coarse-grained )
        • пример потраченного времени на переключение между потоками
      • Intermediate
        • что такое mutex (Mutual exclusion)
      • Advanced
        • volatile, Threadsafe, mutex, actor
        • Actors
    • Debug coroutines using IntelliJ IDEA – tutorial - как дебажить корутины в IDEA
    • Debug Kotlin Flow using IntelliJ IDEA – tutorial - как дебажить flow в IDEA
  • 📄 Корутины в Kotlin (гайд)

    • coroutine, Thread, suspend, runBLocking, launch, join, async, await, Deferred, CoroutineContext, CoroutineDispatсher, Thread safe, Actor, Channel, send, receive
    • Beginner
      • отличия потоков в Java с корутинами в Kotlin
      • два простых примера с пояснениями (runBlocking, launch, suspend, join, async, await, Deferred)
      • CoroutineContext и CoroutineDispatсher - что это и зачем
      • Thread-safe, Actor и Channel
  • 📄 Roman Elizarov - Structured Concurrency

    • Structured Concurrency, coroutineScope
    • Beginner
      • как пришли к Structured Concurrency, какие проблемы решает, какие преимущества дает
      • корутины не глобальные, как потоки, они связаны с локальной областью действия в вашем приложении, которая представляет собой объект с ограниченным временем жизни, например элемент пользовательского интерфейса
  • 🎦 KotlinConf 2019: Asynchronous Data Streams with Kotlin Flow by Roman Elizarov

    • suspend, Channel, hot, flow, operators, Flowable
    • Beginner
      • 2:15 Введение, как работают suspend функции, проблема runBlocking
      • 4:55 Channel
      • 7:12 горячие каналы, проблемы каналов
      • 9:40 Flow, принцип работы, операторы
      • 13:42 Flow vs List
      • 18:44 Flowable
      • 24:03 Flow
  • 📄 Structured Concurrency in action! (using Kotlin coroutines)

    • Structured Concurrency, cancellation, Job, isActive, CoroutineScope, CoroutineContext
    • Beginner
      • основные концепции Structured Concurrency
      • описание поведения отмены, примеры
      • Параллельная декомпозиция, какие проблемы решает CoroutineScope
      • CoroutineScope и CoroutineContext, в чем разница? (Scope - область действия корутины, Context - элементы жизненного цикла корутины, пременные и константы, с которыми она работает)
      • демонстрация нарушения Structured Concurrency, если корутине передать Context, отличающающийся от контекста родитлеьского CoroutineScope этой корутины
  • 🎦 Александр Нозик. Кое-что о корутинах

    • Beginner
      • Deferred, Structured Cuncurrency, Job, CoroutineScope, GlobalScope, cancel,
      • 22:55 хорошее объяснение, как ведут себя корутины при передаче Deferred
      • 29:32 проблема с потоками, почему они занимают много памяти
      • 52:49 пример проблемы в корутинах, которую решает Structured Concurrency
      • 57:38 Job позволяет работать с результатом корутины, вырубать корутину и еще всякое, но это не сама корутина
      • 1:00:42 CoroutineScope
      • 1:5:28 GlobalScope, это пустышка, нет контекста, нет родителей, не знает о других запусках из GlobalScope, поэтому нарушается Structured Cuncurrency, некуда прокинуть результат и прочее
      • 1:25:20 Если корутина была закрыта с помощью cancel, то закрывает только детей, родителей не трогает
      • 1:26:57 Поведение при возникновении ошибки: если в корутине возникла ошибка, то она закрывает себя, сообщает родителю что случилось, закрывает всех потомков
      • 1:34:10 Нельзя закрывать корутину где угодно, нужно делать в точках расщепления, где проверяется, что она не закрыта
  • 🎦 Александр Нозик. Кое-что о корутинах. Flow, Scope

    • Видео хорошее, рекомендуется к просмотру целиком (x1.5)
    • Beginner
      • CoroutineContext, EmptyCoroutinesContext, Job, GlobalScope launch, newCoroutinesContext, cancel, supervisorScope, async, await, Deferred, Channel, isActive, Flow, cold, hot
      • 8:15 CoroutineContext
      • 12:55 используя EmptyCoroutinesContext вы никак не пользуетесь преимуществами StructuredConcurrency
      • для использования StructuredConcurrency необходим объкет Job, который будет помнить детей и родителя этой корутины
      • 15:29 GlobalScope - пустышка, хранит и использует EmptyCoroutinesContext
      • 19:59 разбор launch, newCoroutinesContext
      • 30:00 способ завершить все корутины - отменить корневой Scope
      • 34:12 нельзя держать какой-нибудь Scope и периодический давать ему задачи
      • 45:26 supervisorScope - когда внутри проихсодит ошибка, он не обрушивает Scope-родительского
      • 52:31 async похож на Job, отличие в том, что он возвращает Deferred, await() - позволяет использовать результат, когда он будет получен.
      • 55:11 вместо async можно использовать suspend функцию, единственное отличие в том, что async можно отменить, в отличие от suspend функции
      • 57:11 Channel, служит для обмена данными между корутинами, виды каналов
      • 57:49 while(isActive){...} - проверяет, отменена ли данная корутина, и если она отменена, то не входит в новую итерацию цикла. Если использовать while(true){...}, то тот же самый эффект будет в ближайшем suspension point
      • 1:02:41 типы каналов
      • 1:10:42 Flow
        • 1:15:43 холодные - следующий элемент вычисляется не тогда, когда он готов, а когда кто-нибудь его запросил из Flow.
        • 1:29:41 горячий - возвращает элемент сразу же, как сгенерит
    • Intermediate
      • 8:43 CoroutinesContext, отличия от map - строгое типизирование ( если использовать ключ <Е>, то получите объект типа Е ), сумма двух CoroutineContext не ассоциотивна ( CoroutineContext1 + CoroutineContext2 != CoroutineContext2 + CoroutineContext1 т.к если в CoroutineContext1 есть ключи из CoroutineContext2, то при сумме значения по этим ключам перезапишутся из CoroutineContext2 )
  • 📄 Hands-on: Intro to coroutines and channels

  • 🎦 RedMadRobot - Coroutines. Хаотичное изучение. Часть 1

    • Dispatcher, withContext, NonCancellableContext, viewModelScope, suspend, main-thread
    • Beginner
      • 4:02 виды Dispatcher
      • 5:18 что такое withContext и пример с NonCancellableContext
      • 8:10 viewModelScope - готовый Scope, привязанный к жизни компонента
      • 9:38 вызовы suspend функций должны быть безопасны для main-thread
  • 🎦 RedMadRobot - Coroutines. Хаотичное изучение. Часть 2

    • Scope, CoroutineContext, Job, Dispatcher, CoroutineName, CoroutineExceptionHandler, CoroutinesScope, SupervisorScope, Deferred, async, await cancellable
    • Beginner
      • 3:05 CoroutinesScope создает новый Scope, копирует все из Scope-родителя и исполняет переданный ему блок
      • 3:59 supervisorScope не упадет, если упадет ребенок. Если упадет сам Scope, то не упадет Scope-родитель
      • 5:11 Job - фоновая работа, имеет ЖЦ(active, cancelled). Job предоставляет управление корутиной, можем вызвать Job.cancel и тд, не имеет результата
      • 7:06 Deferred это Job, но с результатом, создается myScope.async{...}. Получить значение - myDeferred.await() - вернет занечение или исплючение при ошибке
      • 8:55 как запустить работу во внешнем Scope, что делать, если нам не подходит ViewModelScope, потому что работа должна жить дольше чем ViewModel
      • 15:21 что такое CoroutineContext, что в нем может находиться (Job, CoroutineDispatcher, CoroutineName, CoroutineExceptionHandler)
      • 16:57 разница между CoroutineScope и CoroutineContext
      • 19:32 когда Job отменили, корутина не отменится автоматичеки, разбор кейса
  • 🎦 RedMadRobot - Coroutines. Хаотичное изучение. Часть 3

    • Channel, capacity, close, trySend, Flow, buffer, conflate, SharedFlow, hot, cold, shareIn, whileSubscribed, timeout, Job, lifecycle, repeatOnLifecycle, DESTROYED, flowWithLifecycle
    • Beginner
      • 7:50 Channel - канал для обмена, можно положить и получить, не блокирующий (оперции саспендятся), можно закрыть, разные capacity(RENDEZVOUS, UNLIMITED, CONFLATED, BUFFERED)
      • 3:38 SingleLiveEvent, что это и зачем (события нужно обрабатывать только один раз)
      • 11:30 SingleLiveEvent используя Channel
      • 15:52 как добавить к flow буффер на случай медленного получения
      • 17:25 conflate, сокращение для buffer с параметрами CONFLATED и DROP_OLDEST, т.е хранит одно значение и перезаписыват его
      • 18:07 flowOn переключает контекст выполнения операторов идущих до него, если операторы без своего контекста
      • 20:07 shareIn превращает холодный Flow в горячий SharedFlow
      • 21:52 WhileSubscribed запускает корутину при первом подписчике, остонавливает когда пропадет последний подписчик, можно сохранить кэш при выключении или стереть после timeout
      • 23:12 На что можно заменить Job.cancel()? можно Lifecycle.repeatOnLifecycle
      • 23:39 Lifecycle.repeatOnLifecycle запускает корутину, когда ЖЦ подходит до определенного состояния, когда ЖЦ ниже нужного состояния - отменяет корутину и усылпяет(suspend), когда снова в нужном состоянии - запускает повторно
      • 24:42 особенности repeatOnLifecycle, рекомендуется создавать либо в Activity.onCreate() или Fragment.onViewCreated()
      • 25:22 поведение при DESTROYED, держет в suspend внешнюю корутину, пока не DESTROYED, когда DESTROYED - отпустит
      • 26:20 Flow.flowWithLifecycle это обертка над repeatOnLifecycle, упрощает написание если только 1 продюсер
  • 📄 Guide to UI programming with coroutines

    • UI, dispatcher, context, Dispatchers.Main, Dispatchers.JavaFx, Dispatchers.Swing, UI coroutine, cancel UI coroutine, actor, RendezvousChannel, capacity, ConflatedChannel, Channel.UNLIMITED, UI freeze, Structured concurrency, lifecycle, parent-child hierarchy
  • 📄 Best practices for coroutines

    • Dispatcher, suspend, ViewModel, mutable, Flow, test, TestCoroutineDispatcher, GlobalScope, cancel, cancellable, ensureActive
    • Intermediate
      • почему не нужно хардкодить Dispatcher
      • suspend функции должны быть безопасны для основного потока, т.е. классы, вызывающие suspend функции не должны беспокоиться о том, какой Dispatcher использовать, эта ответственность лежит на классе, который выполняет эту работу
      • ViewModel должен создавать корутины, а не suspend-функции
      • предоставляйте неизменяемые типы другим классам
      • для классов данных и бизнес-уровня необходимы должны предоставлять suspend функции для одноразовых вызовов и Flow для изменяемых данных
      • используйте TestCoroutineDispatcher в тестах
      • избегайте GlobalScope (это неконтролируемая область, очень усложняет тестирование, нет обзего CoroutineContext)
      • suspend функции должны быть cancellable
  • 📄 Ограничения native-mt версии для iOS таргета.

    • single, thread, dispatcher, context, worker, GlobalScope, withContext, freeze, Flow, Channel, Deferred, mutable, Mutex, Semaphore, DetachedObjectGraph
    • Intermediate
      • все основные объекты связи (Job, Deferred, Channel, BroadcastChannel, Mutex) могут быть замороженны
      • любой объект, который передается через Channel или Flow автоматически замораживается
  • 🎦 Roman Elizarov — Structured concurrency

Advanced

Highlights

Внутреннее устройство

Мы не стремимся здесь дать полное объяснение того, как сопрограммы работают под капотом, но примерный смысл того, что происходит, очень важен.

Сопрограммы полностью реализованы с помощью технологии компиляции (поддержка от языковой виртуальной машины, среды исполнения, или операционной системы не требуется), а приостановка работает через преобразование кода. В принципе, каждая функция приостановки (оптимизации могут применяться, но мы не будем вдаваться в эти подробности здесь) преобразуется в конечный автомат, где состояния соответствуют приостановленным вызовам. Прямо перед приостановкой следующее состояние загружается в поле сгенерированного компилятором класса вместе с сопутствующими локальным переменными и т. д. При возобновлении сопрограммы локальные переменные и состояние восстанавливаются, и конечный автомат продолжает свою работу.

Приостановленную сопрограмму можно сохранять и передавать как объект, который хранит её приостановленное состояние и локальные переменные. Типом таких объектов является Continuation, а преобразование кода, описанное здесь, соответствует классическому Continuation-passing style. Следовательно, приостановливаемые функции принимают дополнительный параметр типа Continuation (сохранённое состояние) под капотом.

Более детально о том, как работают сопрограммы, можно узнать в этом проектном документе. Похожие описания async / await в других языках (таких как C# или ECMAScript 2016) актуальны и здесь, хотя особенности их языковых реализаций могут существенно отличаться от сопрограмм Kotlin.

Source.

Пример преобразования кода на этапе компиляции:

Исходник:

dummy() // suspend
println(1)
dummy() // suspend
println(2)

При компиляции преобразуется в:

val $result: Any? = null
when (this.label) {
0 -> {
this.label = 1
$result = dummy(this)
if ($result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
goto 1
}
1 -> {
println(1)
this.label = 2
$result = dummy(this)
if ($result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
goto 2
}
2 -> {
println(2)
return Unit
}
else -> {
throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
}

Более подробно можно прочитать в документе

Также можно прочитать статью или посмотреть видео.

CoroutineScope

CoroutineScope задает область действия корутины. Именно используя скоуп производится остановка всех отложенных операций привязанных к некоторому жизненному циклу.

Например, у каждой ViewModel есть viewModelScope - это CoroutineScope, который создается при создании ViewModel и отменяется при вызове onCleared.

Рассмотрим небольшую тестовую ViewModel:

class TestViewModel: ViewModel() {
init {
viewModelScope.launch {
delay(10000)
println("work done!")
}
}
}

Если мы зайдем на экран, использующий эту ViewModel, и сразу уйдем (не дожидаясь 10 секунд), то сообщение не будет выведено в лог. А если подождем 10 секунд - будет. Это как раз потому что при вызове onCleared отменяется CoroutineScope, а вместе с ним и все его корутины. А вызов onCleared происходит при окончательном уходе с экрана (прямо перед удалением ViewModel).

Также важно понимать что скоуп может быть вложенным (любой уровень вложенности поддерживается). Для создания вложенного скоупа используется билдер coroutineScope. При отмене родительского скоупа отменяются и все вложенные.

Вложенный скоуп может пригодиться в случаях, когда вам не доступен класс текущего CoroutineScope, но нужно вызвать один из билдеров (launch/async).

suspend fun doSomeParallelWork() {
coroutineScope {
val task1 = async { callFirstRequest() }
val task2 = async { callSecondRequest() }
awaitAll(task1, task2)
}
}

Все корутины принадлежат какому либо скоупу. Скоуп либо создается вручную (вызовом CoroutineScope), либо используется GlobalScope, который не имеет ограничения жизненного цикла - живет все время жизни процесса приложения, поэтому не рекомендуется к использованию без твердой уверенности что это верно.

Использование async

При запуске асинхронных задач (вызов async) важно учитывать, что возникновение ошибки внутри задачи будет вызывать отмену текущего Scope. Поэтому важно запускать дочерний скоуп используя coroutineScope для выполнения асинхронных задач - тогда ошибка выполнения этих задач не сломает основной CoroutineScope, а будет выброшена как результат coroutineScope и сможет обработаться try-catch.

Например в следующем коде:

suspend fun main() {
coroutineScope {
launch {
try {
listOf<Deferred<Unit>>(
async { TODO() },
async { TODO() }
).awaitAll()
} catch(exc: Throwable) {
println(exc.stackTraceToString())
}
}
}
}

Блок catch не отловит ошибку, произойдет вылет приложения.

Исправленный вариант:

suspend fun main() {
coroutineScope {
launch {
try {
coroutineScope {
listOf<Deferred<Unit>>(
async { TODO() },
async { TODO() }
).awaitAll()
}
} catch(exc: Throwable) {
println(exc.stackTraceToString())
}
}
}
}

coroutineScope это suspend-функция, которая создаёт новый скоуп и идёт дальше только после его завершения и не привязывается к родительскому скоупу. Когда скоуп, созданный coroutineScope, упал, падает сама suspend-функция, а не весь родительский scope.

CoroutineContext

Корутина, запущенная другой, наследует весь контекст той, внутри которой она запустилась и становится для нее дочерней. Если остановить родителскую корутину, то остановятся и все дочерние корутины. Способы переопределения дефолтного поведения наследования:

  • явно указать DifferentScope.launch
  • передать другой объект Job в качестве контекста корутины

В обоих случаях корутина не привяжется к области, из которой она была запущенна.

Также, родительская корутина всегда дожидается завершения дочерних корутин, для этого не требуется явно отслеживать всех потомков или делать Job.join

Dispatcher

Dispatcher определяет, какой поток или потоки использует корутина для выполнения. Может ограничить выполнение корутины одним потоком, отправить корутину в пулл потоков или никак ее не ограничивать (None, Dispatchers.Unconfined, Dispatchers.Default, newSingleThreadContext)

Механика delay

По началу delay все интерпретируют как Thread.sleep и считают что текущий поток будет остановлен на N миллисекунд. Но это не так . Рассмотрим следующий блок кода:

suspend fun startTimer() {
println("show message at start")
delay(1000)
println("show message after second")
}

При компиляции данный код будет преобразован (если сильно упростить) в нечто похожее на:

fun startTimer() {
println("show message at start")
delayCallback(1000) {
println("show message after second")
}
}

То есть вместо вызова sleep на весь поток, вся работа после suspend-point (delay это suspend функция), будет "завернута в callback", закинута в очередь текущего потока (через Dispatcher корутин) и данный калбек будет выполнен через секунду (когда будет получен из очереди диспатчером).

Вывод - вызов delay не останавливает работу потока.

Механика отмены канала

Для закрытия канала используется метод close() Он посылает специальный токен закрытия и итерация получения элементов из канала остановится, когда токен будет получен Это дает гарантию, что все элементы, отправленные до закрытия, будут получены

Coroutineexceptionhandler

Дочерние корутины прокидывают свои необработанные ошибки родительской, и так вплоть до корневой. Они не используют ExceptionHandler, установленный в их области действия

Если несколько дочерних корутин выбрасывают исклюения одновременно, то пробрасывается и обрабатывается только первое, а остальные присоединяются к первому как подавленные

Supervision

Используя SupervisionJob при неудачном завершении дочерней корутины завершение не распростанится на родительскую корутину и других детей Supervision scope распространяет отмену только в отношении дочерних корутин, отменяет всех только если сам завершился с ошибкой. Ожидает завершения всех детей, так же как и coroutineScope. Каждый дочерний элемент обязан самостоятельно обрабатывать свои исключения. Корутины внутри SupervisionScope используют ExceptionHandler установленный в их области действия

Операторы Flow

  • Intermediate flow operators - основыные операторы похожи на map и filter, отличие от последовательностей в том, что внутри этих операторов можно вызвать suspend функцию
  • Transform operator - применяет функцию в блоке transform для какждого значения
  • Size-limiting operators - отменяют выполнение потока когда лимит достигнут. Отмена происходит с помощью испключения, так что все блоки завершатся корректно
  • Terminal flow operators - toList, toSet, first, reduce, fold (про first и reduce не очень понятно)
  • Flows are sequential - каждый сбор flow по дефолту выполняется последовательно, операторы коллекции выполняются в той же корутине, в которой они были запущены, новые не создаются, каждое переданное значение выполняется всеми промежуточными операторами
  • Flow context - сбор flow всегда проихсодит в контексте вызывающей корутины
  • Wrong emission withContext - нельзя менять контекст внутри flow
  • flowOn operator - приавильный способ изменить контекст для собирания flow
  • Buffering - буферизует выбросы потока через канал указанной емкости и запускает сборщик в отдельной сопрограмме.
  • Conflation - emitter не приостановится из-за медленного коллектора, а удерживает свои элементы, пока коллектор их не запросит, а когда запросит, отправит самый новый элемент из тех, которые накопиились, потом заново начнет копить
  • Processing the latest value - когда flow выдает новое значениеЮ блок действий для старого значения отменяется

Check Yourself

TODO