StateFlow and SharedFlow in ViewModels
December 28, 2022
There are a lot of excellent docs on StateFlow and SharedFlow out there and I won’t rehash it here. Instead, I will just provide a few ideas how to choose the right hot flow as well as a couple of examples of using them in a lifecycle-aware manner, unit testing them in ViewModels.
There is no one-size-fits-all but one of the most common usages of Flow and StateFlow in Android apps today looks like this:
The code samples below will show:
Collecting Flow and exposing StateFlow in ViewModel
Using
stateIn
Flow extension to collect Flow and expose StateFlow in ViewModelUsing
shareIn
Flow extension to collect Flow and expose SharedFlow in ViewModelCollecting State and Shared Flows as UiState in both View-based apps and Compose while respecting View lifecycle
Unit testing ViewModels that convert Flow into State and Shared Flows
Basics of StateFlow vs SharedFlow
We are not going to explore these in depth here—for that you can check out official docs at Kotlin StateFlow, Kotlin SharedFlow and Google docs. We will highlight the important attributes when it comes to using them in ViewModels.
interface SharedFlow<out T> : Flow<T> { ... } interface StateFlow<out T> : SharedFlow<T> { /** * The current value of this state flow. */ public val value: T }
StateFlow
StateFlow is a specialized SharedFlow which holds the last value emitted into it.
StateFlow is more efficient, has simpler API and used more commonly in ViewModels as it is more natural for maintaining state.
StateFlow has a fixed
replayCache
of 1 (similar toLiveData
) and replays most recent emissions to new subscribers as well as to existing subscribers on configuration changes.StateFlow makes sense if there is a logical default
initialValue
which is required to initialize it.Read and write
.value
property in a non-suspending wayAbility to easily integrate with
SavedStateHandle
in ViewModel to handle process death (see example below)
SharedFlow
SharedFlow is a highly configurable generalization of StateFlow.
SharedFlow’s
replayCache
is customizable and has a default of 0. If default is used, the value will not be replayed to new subscribers as well as to existing subscribers on configuration changes (e.g. there is no reason to re-emit an error after configuration change)SharedFlow will re-emit subsequent equal values which makes it more appropriate if you need to trigger the same event over time (e.g. show snackbar event, navigate event, connection unavailable event, etc.)
If you push to a SharedFlow in an
init
of your ViewModel and a collector has not started collecting yet, the collector may miss that emission.
Repository flow will not be re-emitted when a new subscriber is added to StateFlow or SharedFlow. This helps prevent wasting device resources unnecessarily.
Update re: SharedFlow
The more I use SharedFlow
, the more edge cases I find with it that I need to handle. For instance, SharedFlow
requires you to supply initialValue
at the time you start collecting from it using collectAsState()
or collectAsStateWithLifecycle()
. Often times, this value has no real business logic purpose and just adds to the confusion. Additionally, depending on how you collect a SharedFlow
, you may lose emissions in certain lifecycle conditions which reduces predictability and is often unacceptable. You can tweak and do lots of testing and keep using SharedFlow
or try to find an alternative.
One such alternative that I found is a Channel
. Here is an example of using a Channel
in a ViewModel
:
private val _effect = Channel<NoteListEffect>() val effect: Flow<NoteListEffect> = _effect.receiveAsFlow()
Converting cold Flow to hot
We should expose hot flow (StateFlow
or SharedFlow
) in ViewModel rather then exposing cold flow (Flow
). The reason is: if we exposed regular Flow
, every new subscriber would trigger new Flow
emission from our flow builder in repository which would cause wasting of device resources.
There are 2 common ways to convert cold flows from repository to hot State and Shared Flows:
By
collect
ing Flow and pushing into StateFlow or SharedFlowBy using
stateIn
extension to push Flow emissions into StateFlow or usingshareIn
extension to push Flow into SharedFlow
Let’s examine them below:
Collect Flow in ViewModel
@HiltViewModel class MyViewModel @Inject constructor( private val userRepository: UserRepository ) : ViewModel() { private val _userFlow = MutableStateFlow<UiState>(UiState.Loading) val userFlow: StateFlow<UiState> = _userFlow.asStateFlow() fun onRefresh() { viewModelScope.launch { userRepository .getUsers().asResult() .collect { result -> _userFlow.update { when (result) { is Result.Loading -> UiState.Loading is Result.Success -> UiState.Success(result.data) is Result.Error -> UiState.Error(result.exception) } } } } } }
Where UiState
is a common implementation:
sealed interface UiState { object Loading : UiState data class Success( val data: List<User> ) : UiState data class Error( val throwable: Throwable? = null ) : UiState }
Another benefit of collecting Flow from repository and converting into State and Shared Flow are all the intermidiate Flow
operators that we now have access to to manipulate data to fit our use case (e.g. map
, filter
, take
, takeWhile
, transform
, etc.)
UserRepository
interface exposes data as Flow
:
interface UserRepository { fun getUsers(): Flow<List<User>> }
And has a very basic implementation producing mock cold Flow
:
class InMemoryUserRepository @Inject constructor() : UserRepository { override fun getUsers(): Flow<List<User>> = flow { val userList = listOf( User( name = "User 1", age = 20 ), User( name = "User 2", age = 30 ) ) emit(userList.shuffled()) } }
Notice that the Flow gets mapped to Result (userRepository.getUsers().asResult()
) as follows:
import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import java.io.IOException private const val RETRY_TIME_IN_MILLIS = 15_000L private const val RETRY_ATTEMPT_COUNT = 3 sealed interface Result<out T> { data class Success<T>(val data: T) : Result<T> data class Error(val exception: Throwable? = null) : Result<Nothing> object Loading : Result<Nothing> } fun <T> Flow<T>.asResult(): Flow<Result<T>> { return this .map<T, Result<T>> { Result.Success(it) } .onStart { emit(Result.Loading) } .retryWhen { cause, attempt -> if (cause is IOException && attempt < RETRY_ATTEMPT_COUNT) { delay(RETRY_TIME_IN_MILLIS) true } else { false } } .catch { emit(Result.Error(it)) } }
This Result
wrapper is extracted so it can be reused via .asResult()
extension on each Flow
emission.
Notice how data is pushed to StateFlow
:
// one way to push to MutableStateFlow -- don't do this _userFlow.value = ... // a thread-safe way to push to MutableStateFlow introduced in kotlinx.coroutines 1.5.1 // it is especially useful when you copy an existing state and change its value(s) to produce a new state // e.g. _userFlow.update { it.copy(title = "Something") } _userFlow.update { ... } // other ways to update MutableStateFlow that support concurrency are getAndUpdate and updateAndGet
Other ways to push data into MutableStateFlow
are:
// suspending way, e.g. _userFlow.emit(UiState.Loading) suspend fun emit(value: T) // non-suspending way, e.g. _userFlow.tryEmit(UiState.Loading) // returns true if the value was emitted successfully fun tryEmit(value: T): Boolean
Using stateIn Flow extension
Another way to pipe Flow
emissions into StateFlow
is using fun <T> Flow<T>.stateIn
extension:
private const val DEFAULT_TIMEOUT = 5000L @HiltViewModel class MyViewModel @Inject constructor( userRepository: UserRepository ) : ViewModel() { val userFlow: StateFlow<UiState> = userRepository .getUsers() .asResult() .map { result -> when (result) { is Result.Loading -> UiState.Loading is Result.Success -> UiState.Success(result.data) is Result.Error -> UiState.Error(result.exception) } } .stateIn( scope = viewModelScope, initialValue = UiState.Loading, started = SharingStarted.WhileSubscribed(DEFAULT_TIMEOUT) ) }
Note that by using stateIn
extension, there is no need to create a backing property for MutableStateFlow
to push data into this StateFlow
.
Notice the handy started
param. It guarantees that the Flow stream will not be re-subscribed (StateFlow won’t need to be re-populated) if a configuration change happens. 5 seconds is a reasonable timeout default seen in Google samples.
Another advantage of using stateIn
is being able to combine
multiple Flows:
val uiState: StateFlow<HomeUiState> = combine( topRatedMovies, actionMovies ) { topRatedResult, actionMoviesResult -> val topRated: TopRatedMoviesUiState = when (topRatedResult) { is Result.Success -> TopRatedMoviesUiState.Success(topRatedResult.data) is Result.Loading -> TopRatedMoviesUiState.Loading is Result.Error -> TopRatedMoviesUiState.Error } val action: ActionMoviesUiState = when (actionMoviesResult) { is Result.Success -> ActionMoviesUiState.Success(actionMoviesResult.data) is Result.Loading -> ActionMoviesUiState.Loading is Result.Error -> ActionMoviesUiState.Error } HomeUiState(topRated, action) } .stateIn( scope = viewModelScope, started = WhileUiSubscribed, initialValue = HomeUiState( TopRatedMoviesUiState.Loading, ActionMoviesUiState.Loading ) )
Using shareIn Flow extension
If your ViewModel is exposing a SharedFlow
, you can push data into it using the shareIn
extension:
@HiltViewModel class MyViewModel @Inject constructor( userRepository: UserRepository ) : ViewModel() { val userFlow: SharedFlow<UiState> = userRepository .getUsers() .asResult() .map { result -> when (result) { is Result.Loading -> UiState.Loading is Result.Success -> UiState.Success(result.data) is Result.Error -> UiState.Error(result.exception) } } .shareIn( scope = viewModelScope, started = SharingStarted.WhileSubscribed(DEFAULT_TIMEOUT) ) }
Note that by using the shareIn
extension, there is no need to create a backing property for MutableSharedFlow
to push data into this SharedFlow
.
If collect
was used instead of the shareIn
, you’d have to use:
private val _userFlow = MutableSharedFlow<UiState>(UiState.Loading) val userFlow: SharedFlow<UiState> = _userFlow.asSharedFlow()
Collecting State in View -based component
If the StateFlow
is collected in an Activity, we’d use this block:
lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { viewModel.userFlow.collect { render(it) } } }
Notice the use of lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED)
. This will guarantee that our ViewModel is not collecting Flow
if the Lifecycle of the View is not between STARTED
(inclusive) and STOPPED
(exclusive). In other words, the View layer would be collecting data only when in started, resumed or paused states.
This prevents wasting resources, when user is not actively interacting with the app.
If we were to collect
in a Fragment, the code would look like this:
viewLifecycleOwner.lifecycleScope.launch { viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) { viewModel.userFlow.collect { render(it) } } } private fun render(uiState: UiState) { when (uiState) { UiState.Loading -> TODO() is UiState.Success -> TODO() is UiState.Error -> TODO() } }
See Restartable lifecycle-aware coroutines for more details.
Collecting State in Jetpack Compose
A StateFlow
can be collected in Jetpack Compose as follows:
val uiState by viewModel.userFlow.collectAsStateWithLifecycle() when (uiState) { UiState.Loading -> TODO() is UiState.Success -> TODO() is UiState.Error -> TODO() }
A SharedFlow
can be collected in Compose in a similar fashion except having to provide the initial value.
val uiState by shareInViewModel.userFlow.collectAsStateWithLifecycle( initialValue = UiState.Loading )
Note that to use fun <T> Flow<T>.collectAsStateWithLifecycle(…)
extension, you have to use the following dependency:
implementation 'androidx.lifecycle:lifecycle-runtime-compose:2.6.0-alpha03'
For more details, check out excellent post Consuming Flows Safely in Jetpack Compose by Manuel Vivo
To summarize, using collectAsStateWithLifecycle()
guarantees that we only collect from hot flows (StateFlow
and SharedFlow
) when a screen is visible which prevents wasting system resources and making remote calls unnecessarily. This achieves a behavior similar to LiveData
.
SavedStateHandle with StateFlow
Using SavedStateHandle in ViewModels lets us easily handle process death and it integrates with StateFlow nicely. Let’s look at an example:
Depending on your dependencies (including transitive) you may need to add this dependency:
implementation "androidx.lifecycle:lifecycle-viewmodel-savedstate:$version"
A ViewModel may look like this:
private const val SAVED_STATE_HANDLE_KEY_STATE = "MyViewModel.uiState" @HiltViewModel class MyViewModel @Inject constructor( private val savedStateHandle: SavedStateHandle, private val userRepository: UserRepository ) : ViewModel() { val userFlow: StateFlow<UiState> = savedStateHandle.getStateFlow(SAVED_STATE_HANDLE_KEY_STATE, UiState.Loading) fun onRefresh() { viewModelScope.launch { userRepository .getUsers().asResult() .collect { result -> savedStateHandle[SAVED_STATE_HANDLE_KEY_STATE] = when (result) { is Result.Loading -> UiState.Loading is Result.Success -> UiState.Success(result.data) is Result.Error -> UiState.Error(result.exception) } } } } }
Now whenever cold Flow is collected, we are saving it as savedStateHandle
which pushes it to StateFlow
. Now if process death occurs, the StateFlow will be set to what’s in saved state.
Note how easily that task is accomplished by doing:
savedStateHandle.getStateFlow(KEY, "defaultValue")
We no longer need a private _userFlow: MutableStateFlow<UiState>
property.
On the calling side in the Activity, you can wrap the call to refresh as follows:
if (savedInstanceState == null) { viewModel.onRefresh() }
No need to call viewModel.onRefresh()
if there is no savedInstanceState
.
Note that whatever you store in savedStateHandle
should be made Parcelable
. In this case, we’d have to make the following changes:
plugins { ... id 'kotlin-parcelize' }
@Parcelize sealed class UiState : Parcelable { object Loading : UiState() data class Success( val data: @RawValue List<User> ) : UiState() data class Error( val throwable: Throwable? = null ) : UiState() } @Parcelize data class User( val name: String, val age: Int ) : Parcelable
Note: As far as what we are storing in the savedStateHandle
, the above is just an example. Storing a List
of data is often not a good idea as the payload may get too large and you may even start running into TransactionTooLargeException
. The above code using UiState
is just an end-to-end example but you should decide what the appropriate payload is in each case.
Unit testing State and Shared Flows
Since we are testing ViewModel code that involved Android viewModelScope
, create the following Dispatcher rule:
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.test.TestDispatcher import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.resetMain import kotlinx.coroutines.test.setMain import org.junit.rules.TestRule import org.junit.rules.TestWatcher import org.junit.runner.Description /** * A JUnit [TestRule] that sets the Main dispatcher to [testDispatcher] * for the duration of the test. */ class MainDispatcherRule( val testDispatcher: TestDispatcher = UnconfinedTestDispatcher() ) : TestWatcher() { override fun starting(description: Description) { super.starting(description) Dispatchers.setMain(testDispatcher) } override fun finished(description: Description) { super.finished(description) Dispatchers.resetMain() } }
Use this rule in your ViewModel tests:
class MyViewModelTest { @get:Rule val mainDispatcherRule = MainDispatcherRule() ... }
Testing StateFlow produced by collect
Let’s unit test the following ViewModel:
@HiltViewModel class MyViewModel @Inject constructor( private val userRepository: UserRepository ) : ViewModel() { private val _userFlow = MutableStateFlow<UiState>(UiState.Loading) val userFlow: StateFlow<UiState> = _userFlow.asStateFlow() fun onRefresh() { viewModelScope.launch { userRepository .getUsers().asResult() .collect { result -> _userFlow.update { when (result) { is Result.Loading -> UiState.Loading is Result.Success -> UiState.Success(result.data) is Result.Error -> UiState.Error(result.exception) } } } } } }
The test will look like this:
class MyViewModelTest { @get:Rule val mainDispatcherRule = MainDispatcherRule() private val repository = TestUserRepository() @OptIn(ExperimentalCoroutinesApi::class) @Test fun `when initialized, repository emits loading and data`() = runTest { val viewModel = MyViewModel(repository) val users = listOf(...) assertEquals(UiState.Loading, viewModel.userFlow.value) repository.sendUsers(users) viewModel.onRefresh() assertEquals(UiState.Success(users), viewModel.userFlow.value) }
One of the advantages of StateFlow
vs SharedFlow
is being able access the latest value pushed to it in a non-suspending way—by doing viewModel.userFlow.value
Another approach to testing all things Flow is to use Turbine testing library from Cash App:
@OptIn(ExperimentalCoroutinesApi::class) @Test fun `when initialized, repository emits loading and data (using turbine)`() = runTest { val viewModel = MyViewModel(repository) val users = listOf(...) viewModel.userFlow.test { val firstItem = awaitItem() assertEquals(UiState.Loading, firstItem) repository.sendUsers(users) viewModel.onRefresh() val secondItem = awaitItem() assertEquals(UiState.Success(users), secondItem) } }
Note that we use the following fake UserRepository
for testing:
class TestUserRepository : UserRepository { /** * The backing hot flow for the list of users for testing. */ private val usersFlow = MutableSharedFlow<List<User>>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) override fun getUsers(): Flow<List<User>> { return usersFlow } /** * A test-only API to allow controlling the list of users from tests. */ suspend fun sendUsers(users: List<User>) { usersFlow.emit(users) } }
Testing StateFlow produced by stateIn
@OptIn(ExperimentalCoroutinesApi::class) @Test fun `when initialized, repository emits loading and data`() = runTest { val viewModel = MainWithStateInViewModel(repository) val users = listOf(...) val collectJob = launch(UnconfinedTestDispatcher(testScheduler)) { viewModel.userFlow.collect() } assertEquals(UiState.Loading, viewModel.userFlow.value) repository.sendUsers(users) assertEquals(UiState.Success(users), viewModel.userFlow.value) collectJob.cancel() }
Notice that because we use stateIn
and not collecting directly in the ViewModel, we have to collect
the StateFlow
in each test:
val collectJob = launch(UnconfinedTestDispatcher(testScheduler)) { viewModel.userFlow.collect() } // perform some actions and assertions here collectJob.cancel()
And here is a Turbine version of this test:
@OptIn(ExperimentalCoroutinesApi::class) @Test fun `when initialized, repository emits loading and data (using turbine)`() = runTest { val viewModel = MainWithShareInViewModel(repository) val users = listOf(...) repository.sendUsers(users) viewModel.userFlow.test { val firstItem = awaitItem() assertEquals(UiState.Loading, firstItem) val secondItem = awaitItem() assertEquals(UiState.Success(users), secondItem) } }
Testing SharedFlow produced by sharedIn
I have not found an easy way to test SharedFlow
in ViewModels without using Turbine. Here is the test utilizing Turbine:
@OptIn(ExperimentalCoroutinesApi::class) @Test fun `when initialized, repository emits loading and data`() = runTest { val viewModel = MainWithShareInViewModel(repository) val users = listOf(...) repository.sendUsers(users) viewModel.userFlow.test { val firstItem = awaitItem() assertEquals(UiState.Loading, firstItem) val secondItem = awaitItem() assertEquals(UiState.Success(users), secondItem) } }
For more info on testing Flow
s, check out Google documentation at https://developer.android.com/kotlin/flow/test
The full source code for this blog post can be found at https://github.com/jshvarts/ViewModelFlowDemo