StateFlow and SharedFlow in ViewModels

December 28, 2022

There are a lot of excellent docs on StateFlow and SharedFlow out there and I won’t rehash it here. Instead, I will just provide a few ideas how to choose the right hot flow as well as a couple of examples of using them in a lifecycle-aware manner, unit testing them in ViewModels.

There is no one-size-fits-all but one of the most common usages of Flow and StateFlow in Android apps today looks like this:

The code samples below will show:

  1. Collecting Flow and exposing StateFlow in ViewModel

  2. Using stateIn Flow extension to collect Flow and expose StateFlow in ViewModel

  3. Using shareIn Flow extension to collect Flow and expose SharedFlow in ViewModel

  4. Collecting State and Shared Flows as UiState in both View-based apps and Compose while respecting View lifecycle

  5. Unit testing ViewModels that convert Flow into State and Shared Flows

Basics of StateFlow vs SharedFlow

We are not going to explore these in depth here—for that you can check out official docs at Kotlin StateFlow, Kotlin SharedFlow and Google docs. We will highlight the important attributes when it comes to using them in ViewModels.

interface SharedFlow<out T> : Flow<T> {
  ...
}

interface StateFlow<out T> : SharedFlow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

StateFlow

  • StateFlow is a specialized SharedFlow which holds the last value emitted into it.

  • StateFlow is more efficient, has simpler API and used more commonly in ViewModels as it is more natural for maintaining state.

  • StateFlow has a fixed replayCache of 1 (similar to LiveData) and replays most recent emissions to new subscribers as well as to existing subscribers on configuration changes.

  • StateFlow makes sense if there is a logical default initialValue which is required to initialize it.

  • Read and write .value property in a non-suspending way

  • Ability to easily integrate with SavedStateHandle in ViewModel to handle process death (see example below)

SharedFlow

  • SharedFlow is a highly configurable generalization of StateFlow.

  • SharedFlow’s replayCache is customizable and has a default of 0. If default is used, the value will not be replayed to new subscribers as well as to existing subscribers on configuration changes (e.g. there is no reason to re-emit an error after configuration change)

  • SharedFlow will re-emit subsequent equal values which makes it more appropriate if you need to trigger the same event over time (e.g. show snackbar event, navigate event, connection unavailable event, etc.)

  • If you push to a SharedFlow in an init of your ViewModel and a collector has not started collecting yet, the collector may miss that emission.

Repository flow will not be re-emitted when a new subscriber is added to StateFlow or SharedFlow. This helps prevent wasting device resources unnecessarily.

Update re: SharedFlow

The more I use SharedFlow, the more edge cases I find with it that I need to handle. For instance, SharedFlow requires you to supply initialValue at the time you start collecting from it using collectAsState() or collectAsStateWithLifecycle(). Often times, this value has no real business logic purpose and just adds to the confusion. Additionally, depending on how you collect a SharedFlow, you may lose emissions in certain lifecycle conditions which reduces predictability and is often unacceptable. You can tweak and do lots of testing and keep using SharedFlow or try to find an alternative.

One such alternative that I found is a Channel. Here is an example of using a Channel in a ViewModel:

private val _effect = Channel<NoteListEffect>()
val effect: Flow<NoteListEffect> = _effect.receiveAsFlow()

Converting cold Flow to hot

We should expose hot flow (StateFlow or SharedFlow) in ViewModel rather then exposing cold flow (Flow). The reason is: if we exposed regular Flow, every new subscriber would trigger new Flow emission from our flow builder in repository which would cause wasting of device resources.

There are 2 common ways to convert cold flows from repository to hot State and Shared Flows:

  1. By collecting Flow and pushing into StateFlow or SharedFlow

  2. By using stateIn extension to push Flow emissions into StateFlow or using shareIn extension to push Flow into SharedFlow

Let’s examine them below:

Collect Flow in ViewModel

@HiltViewModel
class MyViewModel @Inject constructor(
  private val userRepository: UserRepository
) : ViewModel() {

  private val _userFlow = MutableStateFlow<UiState>(UiState.Loading)
  val userFlow: StateFlow<UiState> = _userFlow.asStateFlow()

  fun onRefresh() {
    viewModelScope.launch {
      userRepository
        .getUsers().asResult()
        .collect { result ->
          _userFlow.update {
            when (result) {
              is Result.Loading -> UiState.Loading
              is Result.Success -> UiState.Success(result.data)
              is Result.Error -> UiState.Error(result.exception)
            }
          }
        }
    }
  }
}

Where UiState is a common implementation:

sealed interface UiState {
  object Loading : UiState

  data class Success(
    val data: List<User>
  ) : UiState

  data class Error(
    val throwable: Throwable? = null
  ) : UiState
}

Another benefit of collecting Flow from repository and converting into State and Shared Flow are all the intermidiate Flow operators that we now have access to to manipulate data to fit our use case (e.g. map, filter, take, takeWhile, transform, etc.)

UserRepository interface exposes data as Flow:

interface UserRepository {
  fun getUsers(): Flow<List<User>>
}

And has a very basic implementation producing mock cold Flow:

class InMemoryUserRepository @Inject constructor() : UserRepository {
  override fun getUsers(): Flow<List<User>> = flow {
    val userList = listOf(
      User(
        name = "User 1",
        age = 20
      ),
      User(
        name = "User 2",
        age = 30
      )
    )
    emit(userList.shuffled())
  }
}

Notice that the Flow gets mapped to Result (userRepository.getUsers().asResult()) as follows:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import java.io.IOException

private const val RETRY_TIME_IN_MILLIS = 15_000L
private const val RETRY_ATTEMPT_COUNT = 3

sealed interface Result<out T> {
  data class Success<T>(val data: T) : Result<T>
  data class Error(val exception: Throwable? = null) : Result<Nothing>
  object Loading : Result<Nothing>
}

fun <T> Flow<T>.asResult(): Flow<Result<T>> {
  return this
    .map<T, Result<T>> {
      Result.Success(it)
    }
    .onStart { emit(Result.Loading) }
    .retryWhen { cause, attempt ->
      if (cause is IOException && attempt < RETRY_ATTEMPT_COUNT) {
        delay(RETRY_TIME_IN_MILLIS)
        true
      } else {
        false
      }
    }
    .catch { emit(Result.Error(it)) }
}

This Result wrapper is extracted so it can be reused via .asResult() extension on each Flow emission.

Notice how data is pushed to StateFlow:

// one way to push to MutableStateFlow -- don't do this
_userFlow.value = ...

// a thread-safe way to push to MutableStateFlow introduced in kotlinx.coroutines 1.5.1
// it is especially useful when you copy an existing state and change its value(s) to produce a new state
// e.g. _userFlow.update { it.copy(title = "Something") }
_userFlow.update { ... }

// other ways to update MutableStateFlow that support concurrency are getAndUpdate and updateAndGet 

Other ways to push data into MutableStateFlow are:

// suspending way, e.g. _userFlow.emit(UiState.Loading)
suspend fun emit(value: T)

// non-suspending way, e.g. _userFlow.tryEmit(UiState.Loading)
// returns true if the value was emitted successfully
fun tryEmit(value: T): Boolean

Using stateIn Flow extension

Another way to pipe Flow emissions into StateFlow is using fun <T> Flow<T>.stateIn extension:

private const val DEFAULT_TIMEOUT = 5000L

@HiltViewModel
class MyViewModel @Inject constructor(
  userRepository: UserRepository
) : ViewModel() {

  val userFlow: StateFlow<UiState> = userRepository
    .getUsers()
    .asResult()
    .map { result ->
      when (result) {
        is Result.Loading -> UiState.Loading
        is Result.Success -> UiState.Success(result.data)
        is Result.Error -> UiState.Error(result.exception)
      }
    }
    .stateIn(
      scope = viewModelScope,
      initialValue = UiState.Loading,
      started = SharingStarted.WhileSubscribed(DEFAULT_TIMEOUT)
    )
}

Note that by using stateIn extension, there is no need to create a backing property for MutableStateFlow to push data into this StateFlow.

Notice the handy started param. It guarantees that the Flow stream will not be re-subscribed (StateFlow won’t need to be re-populated) if a configuration change happens. 5 seconds is a reasonable timeout default seen in Google samples.

Another advantage of using stateIn is being able to combine multiple Flows:

val uiState: StateFlow<HomeUiState> = combine(
    topRatedMovies,
    actionMovies
  ) { topRatedResult, actionMoviesResult ->

    val topRated: TopRatedMoviesUiState = when (topRatedResult) {
      is Result.Success -> TopRatedMoviesUiState.Success(topRatedResult.data)
      is Result.Loading -> TopRatedMoviesUiState.Loading
      is Result.Error -> TopRatedMoviesUiState.Error
    }

    val action: ActionMoviesUiState = when (actionMoviesResult) {
      is Result.Success -> ActionMoviesUiState.Success(actionMoviesResult.data)
      is Result.Loading -> ActionMoviesUiState.Loading
      is Result.Error -> ActionMoviesUiState.Error
    }

    HomeUiState(topRated, action)
  }
    .stateIn(
      scope = viewModelScope,
      started = WhileUiSubscribed,
      initialValue = HomeUiState(
        TopRatedMoviesUiState.Loading,
        ActionMoviesUiState.Loading
      )
    )

Using shareIn Flow extension

If your ViewModel is exposing a SharedFlow, you can push data into it using the shareIn extension:

@HiltViewModel
class MyViewModel @Inject constructor(
  userRepository: UserRepository
) : ViewModel() {

  val userFlow: SharedFlow<UiState> = userRepository
    .getUsers()
    .asResult()
    .map { result ->
      when (result) {
        is Result.Loading -> UiState.Loading
        is Result.Success -> UiState.Success(result.data)
        is Result.Error -> UiState.Error(result.exception)
      }
    }
    .shareIn(
      scope = viewModelScope,
      started = SharingStarted.WhileSubscribed(DEFAULT_TIMEOUT)
    )
}

Note that by using the shareIn extension, there is no need to create a backing property for MutableSharedFlow to push data into this SharedFlow.

If collect was used instead of the shareIn, you’d have to use:

private val _userFlow = MutableSharedFlow<UiState>(UiState.Loading)
val userFlow: SharedFlow<UiState> = _userFlow.asSharedFlow()

Collecting State in View -based component

If the StateFlow is collected in an Activity, we’d use this block:

lifecycleScope.launch {
  lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
    viewModel.userFlow.collect {
      render(it)
    }
  }
}

Notice the use of lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED). This will guarantee that our ViewModel is not collecting Flow if the Lifecycle of the View is not between STARTED (inclusive) and STOPPED (exclusive). In other words, the View layer would be collecting data only when in started, resumed or paused states.

This prevents wasting resources, when user is not actively interacting with the app.

If we were to collect in a Fragment, the code would look like this:

viewLifecycleOwner.lifecycleScope.launch {
  viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
    viewModel.userFlow.collect {
      render(it)
    }
  }
}

private fun render(uiState: UiState) {
  when (uiState) {
    UiState.Loading -> TODO()
    is UiState.Success -> TODO()
    is UiState.Error -> TODO()
  }
}

See Restartable lifecycle-aware coroutines for more details.

Collecting State in Jetpack Compose

A StateFlow can be collected in Jetpack Compose as follows:

val uiState by viewModel.userFlow.collectAsStateWithLifecycle()
when (uiState) {
  UiState.Loading -> TODO()
  is UiState.Success -> TODO()
  is UiState.Error -> TODO()
}

A SharedFlow can be collected in Compose in a similar fashion except having to provide the initial value.

val uiState by shareInViewModel.userFlow.collectAsStateWithLifecycle(
  initialValue = UiState.Loading
)

Note that to use fun <T> Flow<T>.collectAsStateWithLifecycle(…) extension, you have to use the following dependency:

implementation 'androidx.lifecycle:lifecycle-runtime-compose:2.6.0-alpha03'

For more details, check out excellent post Consuming Flows Safely in Jetpack Compose by Manuel Vivo

To summarize, using collectAsStateWithLifecycle() guarantees that we only collect from hot flows (StateFlow and SharedFlow) when a screen is visible which prevents wasting system resources and making remote calls unnecessarily. This achieves a behavior similar to LiveData.

SavedStateHandle with StateFlow

Using SavedStateHandle in ViewModels lets us easily handle process death and it integrates with StateFlow nicely. Let’s look at an example:

Depending on your dependencies (including transitive) you may need to add this dependency:

implementation "androidx.lifecycle:lifecycle-viewmodel-savedstate:$version"

A ViewModel may look like this:

private const val SAVED_STATE_HANDLE_KEY_STATE = "MyViewModel.uiState"

@HiltViewModel
class MyViewModel @Inject constructor(
  private val savedStateHandle: SavedStateHandle,
  private val userRepository: UserRepository
) : ViewModel() {

  val userFlow: StateFlow<UiState> = savedStateHandle.getStateFlow(SAVED_STATE_HANDLE_KEY_STATE, UiState.Loading)

  fun onRefresh() {
    viewModelScope.launch {
      userRepository
        .getUsers().asResult()
        .collect { result ->
          savedStateHandle[SAVED_STATE_HANDLE_KEY_STATE] = when (result) {
            is Result.Loading -> UiState.Loading
            is Result.Success -> UiState.Success(result.data)
            is Result.Error -> UiState.Error(result.exception)
          }
        }
    }
  }
}

Now whenever cold Flow is collected, we are saving it as savedStateHandle which pushes it to StateFlow. Now if process death occurs, the StateFlow will be set to what’s in saved state.

Note how easily that task is accomplished by doing:

savedStateHandle.getStateFlow(KEY, "defaultValue")

We no longer need a private _userFlow: MutableStateFlow<UiState> property.

On the calling side in the Activity, you can wrap the call to refresh as follows:

if (savedInstanceState == null) {
  viewModel.onRefresh()
}

No need to call viewModel.onRefresh() if there is no savedInstanceState.

Note that whatever you store in savedStateHandle should be made Parcelable. In this case, we’d have to make the following changes:

plugins {
    ...
    id 'kotlin-parcelize'
}
@Parcelize
sealed class UiState : Parcelable {
  object Loading : UiState()

  data class Success(
    val data: @RawValue List<User>
  ) : UiState()

  data class Error(
    val throwable: Throwable? = null
  ) : UiState()
}

@Parcelize
data class User(
  val name: String,
  val age: Int
) : Parcelable

Note: As far as what we are storing in the savedStateHandle, the above is just an example. Storing a List of data is often not a good idea as the payload may get too large and you may even start running into TransactionTooLargeException. The above code using UiState is just an end-to-end example but you should decide what the appropriate payload is in each case.

Unit testing State and Shared Flows

Since we are testing ViewModel code that involved Android viewModelScope, create the following Dispatcher rule:

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.test.TestDispatcher
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.setMain
import org.junit.rules.TestRule
import org.junit.rules.TestWatcher
import org.junit.runner.Description

/**
 * A JUnit [TestRule] that sets the Main dispatcher to [testDispatcher]
 * for the duration of the test.
 */
class MainDispatcherRule(
  val testDispatcher: TestDispatcher = UnconfinedTestDispatcher()
) : TestWatcher() {
  override fun starting(description: Description) {
    super.starting(description)
    Dispatchers.setMain(testDispatcher)
  }

  override fun finished(description: Description) {
    super.finished(description)
    Dispatchers.resetMain()
  }
}

Use this rule in your ViewModel tests:

class MyViewModelTest {
  @get:Rule
  val mainDispatcherRule = MainDispatcherRule()

  ...
}

Testing StateFlow produced by collect

Let’s unit test the following ViewModel:

@HiltViewModel
class MyViewModel @Inject constructor(
  private val userRepository: UserRepository
) : ViewModel() {

  private val _userFlow = MutableStateFlow<UiState>(UiState.Loading)
  val userFlow: StateFlow<UiState> = _userFlow.asStateFlow()

  fun onRefresh() {
    viewModelScope.launch {
      userRepository
        .getUsers().asResult()
        .collect { result ->
          _userFlow.update {
            when (result) {
              is Result.Loading -> UiState.Loading
              is Result.Success -> UiState.Success(result.data)
              is Result.Error -> UiState.Error(result.exception)
            }
          }
        }
    }
  }
}

The test will look like this:

class MyViewModelTest {
  @get:Rule
  val mainDispatcherRule = MainDispatcherRule()

  private val repository = TestUserRepository()

  @OptIn(ExperimentalCoroutinesApi::class)
  @Test
  fun `when initialized, repository emits loading and data`() = runTest {
    val viewModel = MyViewModel(repository)

    val users = listOf(...)

    assertEquals(UiState.Loading, viewModel.userFlow.value)

    repository.sendUsers(users)

    viewModel.onRefresh()

    assertEquals(UiState.Success(users), viewModel.userFlow.value)
  }

One of the advantages of StateFlow vs SharedFlow is being able access the latest value pushed to it in a non-suspending way—by doing viewModel.userFlow.value

Another approach to testing all things Flow is to use Turbine testing library from Cash App:

  @OptIn(ExperimentalCoroutinesApi::class)
  @Test
  fun `when initialized, repository emits loading and data (using turbine)`() = runTest {
    val viewModel = MyViewModel(repository)

    val users = listOf(...)

    viewModel.userFlow.test {
      val firstItem = awaitItem()
      assertEquals(UiState.Loading, firstItem)

      repository.sendUsers(users)

      viewModel.onRefresh()

      val secondItem = awaitItem()
      assertEquals(UiState.Success(users), secondItem)
    }
  }

Note that we use the following fake UserRepository for testing:

class TestUserRepository : UserRepository {

  /**
   * The backing hot flow for the list of users for testing.
   */
  private val usersFlow =
    MutableSharedFlow<List<User>>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

  override fun getUsers(): Flow<List<User>> {
    return usersFlow
  }

  /**
   * A test-only API to allow controlling the list of users from tests.
   */
  suspend fun sendUsers(users: List<User>) {
    usersFlow.emit(users)
  }
}

Testing StateFlow produced by stateIn

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when initialized, repository emits loading and data`() = runTest {
  val viewModel = MainWithStateInViewModel(repository)

  val users = listOf(...)

  val collectJob = launch(UnconfinedTestDispatcher(testScheduler)) {
    viewModel.userFlow.collect()
  }

  assertEquals(UiState.Loading, viewModel.userFlow.value)

  repository.sendUsers(users)
  assertEquals(UiState.Success(users), viewModel.userFlow.value)

  collectJob.cancel()
}

Notice that because we use stateIn and not collecting directly in the ViewModel, we have to collect the StateFlow in each test:

val collectJob = launch(UnconfinedTestDispatcher(testScheduler)) {
  viewModel.userFlow.collect()
}

// perform some actions and assertions here

collectJob.cancel()

And here is a Turbine version of this test:

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when initialized, repository emits loading and data (using turbine)`() = runTest {
  val viewModel = MainWithShareInViewModel(repository)

  val users = listOf(...)

  repository.sendUsers(users)

  viewModel.userFlow.test {
    val firstItem = awaitItem()
    assertEquals(UiState.Loading, firstItem)

    val secondItem = awaitItem()
    assertEquals(UiState.Success(users), secondItem)
  }
}

Testing SharedFlow produced by sharedIn

I have not found an easy way to test SharedFlow in ViewModels without using Turbine. Here is the test utilizing Turbine:

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when initialized, repository emits loading and data`() = runTest {
  val viewModel = MainWithShareInViewModel(repository)

  val users = listOf(...)

  repository.sendUsers(users)

  viewModel.userFlow.test {
    val firstItem = awaitItem()
    assertEquals(UiState.Loading, firstItem)

    val secondItem = awaitItem()
    assertEquals(UiState.Success(users), secondItem)
  }
}

For more info on testing Flows, check out Google documentation at https://developer.android.com/kotlin/flow/test

The full source code for this blog post can be found at https://github.com/jshvarts/ViewModelFlowDemo

Next
Next

Testing functions with lambdas using MockK