Learn how to build a Store supporting queries and mutations.
Store
.
Fetcher
Fetcher
creation logic into a separate class called PostFetcherFactory
.
Query
operations in the Fetcher
, because we only
invoke Fetcher
on reads.class PostFetcherFactory(
private val client: PostOperations,
private val trailsDatabase: TrailsDatabase,
) {
fun create(): Fetcher<PostOperation, Output> =
Fetcher.ofFlow { operation ->
require(operation is Operation.Query)
val mutableSharedFlow = MutableSharedFlow<Output>(
replay = 8,
extraBufferCapacity = 20,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
when (operation) {
is Operation.Query.FindOne -> {
findAndEmitOne(operation) { mutableSharedFlow.emit(it) }
}
is Operation.Query.FindMany -> {
findAndEmitMany(operation) { mutableSharedFlow.emit(it) }
}
is Operation.Query.FindAll -> {
findAndEmitAll { mutableSharedFlow.emit(it) }
}
is Operation.Query.ObserveOne -> {
observeOneAndEmitUpdates(operation) { mutableSharedFlow.emit(it) }
}
is Operation.Query.ObserveMany -> {
observeManyAndEmitUpdates(operation) { mutableSharedFlow.emit(it) }
}
}
mutableSharedFlow.asSharedFlow()
}
private suspend fun findAndEmitOne(
operation: Operation.Query.FindOne<Post.Key>,
emit: suspend (Output) -> Unit,
) {
val post = client.findOne(operation.key.id)
emit(Output.Single(post))
}
private suspend fun findAndEmitMany(
operation: Operation.Query.FindMany<Post.Key>,
emit: suspend (Output) -> Unit,
) {
val posts = client.findMany(operation.keys.ids.map { it.id })
emit(Output.Collection(posts))
}
private suspend fun findAndEmitAll(emit: suspend (Output) -> Unit) {
val posts = client.findAll()
emit(Output.Collection(posts))
}
private suspend fun observeOneAndEmitUpdates(
operation: Operation.Query.ObserveOne<Post.Key>,
emit: suspend (Output) -> Unit,
) {
client.observeOne(operation.key.id).collect { post ->
emit(Output.Single(post))
}
}
private suspend fun observeManyAndEmitUpdates(
operation: Operation.Query.ObserveMany<Post.Key>,
emit: suspend (Output) -> Unit,
) {
client.observeMany(operation.keys.map { it.id }).collect { posts ->
emit(Output.Collection(posts))
}
}
}
Fetcher.ofFlow
allows us to support operations that observe changes
over time, such as ObserveOne
and ObserveMany
.Source of Truth
Source of Truth
creation logic into separate classes for better readability. We will create a Reader
to handle read operations and a Writer
to handle create, update, and delete operations.
Reader
Reader
is responsible for reading data from the local database based on the operation requested.
Fetcher
, our Reader
only needs to handle Query
operations.class PostSourceOfTruthReader(
private val trailsDatabase: TrailsDatabase,
coroutineDispatcher: CoroutineDispatcher,
) {
fun handleRead(
operation: Operation.Query<Post.Key, Post.Properties, Post.Edges, Post.Node>,
emit: suspend (Output?) -> Unit,
) {
TODO()
}
}
Writer
Writer
handles writing data to the local database after fetches or mutations.
Writer
needs to handle all Query
and Mutation
operations
because we write to the Source of Truth
on both reads and writes.class PostSourceOfTruthWriter(
private val trailsDatabase: TrailsDatabase,
coroutineDispatcher: CoroutineDispatcher,
) {
fun handleWrite(
operation: Operation<Post.Key, Post.Properties, Post.Edges, Post.Node>,
value: Output,
) {
TODO()
}
}
Source of Truth
Factoryclass PostSourceOfTruthFactory(
private val reader: PostSourceOfTruthReader,
private val writer: PostSourceOfTruthWriter,
) {
fun create(): SourceOfTruth<PostOperation, Output, Output> =
SourceOfTruth.of(
reader = { operation ->
val mutableSharedFlow =
MutableSharedFlow<Output?>(
replay = 8,
extraBufferCapacity = 20,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
reader.handleRead(operation) { mutableSharedFlow.emit(it) }
mutableSharedFlow.asSharedFlow()
},
writer = { operation, output -> writer.handleWrite(operation, output) },
delete = { operation -> writer.handleWrite(operation) },
deleteAll = { writer.handleWrite(Operation.Mutation.Delete.DeleteAll) },
)
}
Reader
null
instead of an empty list. This prevents the Store
from
considering the operation fulfilled and triggers a fetch from the network.class PostSourceOfTruthReader(
private val trailsDatabase: TrailsDatabase,
coroutineDispatcher: CoroutineDispatcher,
) {
private val coroutineScope = CoroutineScope(coroutineDispatcher)
fun handleRead(
operation: Operation.Query<Post.Key, Post.Properties, Post.Edges, Post.Node>,
emit: suspend (Output?) -> Unit,
) {
when (operation) {
is Operation.Query.FindOne -> findOne(operation, emit)
is Operation.Query.FindAll -> findAll(emit)
is Operation.Query.FindMany -> findMany(operation, emit)
is Operation.Query.ObserveOne -> observeOne(operation, emit)
is Operation.Query.ObserveMany -> observeMany(operation, emit)
}
}
private fun findOne(
operation: Operation.Query.FindOne<Post.Key>,
emit: suspend (Output?) -> Unit,
) {
coroutineScope.launch {
val entity =
trailsDatabase
.postQueries
.selectPostById(operation.key.id.toLong())
.executeAsOneOrNull()
val output = entity?.asNode()?.let { Output.Single(it) }
emit(output)
}
}
private fun findMany(
operation: Operation.Query.FindMany<Post.Key>,
emit: suspend (Output?) -> Unit,
) {
coroutineScope.launch {
val entities =
trailsDatabase
.postQueries
.selectPostsByIds(operation.keys.map {}.map { it.toLong() })
.executeAsList()
val output =
if (entities.isEmpty()) {
null
} else {
Output.Collection(entities.map { it.asNode() })
}
emit(output)
}
}
private fun findAll(emit: suspend (Output?) -> Unit) {
coroutineScope.launch {
val entities = trailsDatabase.postQueries.selectAllPosts().executeAsList()
val output =
if (entities.isEmpty()) {
null
} else {
Output.Collection(entities.map { it.asNode() })
}
emit(output)
}
}
private fun observeOne(
operation: Operation.Query.ObserveOne<Post.Key>,
emit: suspend (Output?) -> Unit,
) {
coroutineScope.launch {
trailsDatabase.postQueries.selectPostById(operation.key.id.toLong()).asFlow().collect { query ->
val entity = query.executeAsOneOrNull()
val output = entity?.asNode()?.let { Output.Single(it) }
emit(output)
}
}
}
private fun observeMany(
operation: Operation.Query.ObserveMany<Post.Key>,
emit: suspend (Output?) -> Unit,
) {
coroutineScope.launch {
trailsDatabase
.postQueries
.selectPostsByIds(operation.keys.map {}.map { it.toLong() })
.asFlow()
.collect { query ->
val entities = query.executeAsList()
val output =
if (entities.isEmpty()) {
null
} else {
Output.Collection(entities.map { it.asNode() })
}
emit(output)
}
}
}
}
Writer
class PostSourceOfTruthWriter(
private val trailsDatabase: TrailsDatabase,
) {
suspend fun handleWrite(
operation: PostOperation,
value: Output,
) {
when (operation) {
is Operation.Mutation.Create -> handleCreate(operation, value)
is Operation.Query -> handleQuery(operation, value)
is Operation.Mutation.Update -> handleUpdate(operation, value)
is Operation.Mutation.Delete -> handleDelete(operation)
}
}
private suspend fun handleCreate(
operation: Operation.Mutation.Create<Post.Key, Post.Properties, Post.Edges>,
value: Output,
) {
when (operation) {
is Operation.Mutation.Create.InsertOne -> {
trailsDatabase.postQueries.insertPost(value.item)
}
is Operation.Mutation.Create.InsertMany -> {
value.items.forEach { trailsDatabase.postQueries.insertPost(it) }
}
}
}
private suspend fun handleQuery(
operation: Operation.Query<Post.Key, Post.Properties, Post.Edges, Post.Node>,
value: Output,
) {
when (operation) {
is Operation.Query.FindOne -> {
trailsDatabase.postQueries.insertPostOrIgnore(value.item)
}
is Operation.Query.FindMany -> {
value.items.forEach { trailsDatabase.postQueries.insertPostOrIgnore(it) }
}
is Operation.Query.FindAll -> {
value.items.forEach { trailsDatabase.postQueries.insertPostOrIgnore(it) }
}
is Operation.Query.ObserveOne -> {
trailsDatabase.postQueries.insertPostOrIgnore(value.item)
}
is Operation.Query.ObserveMany -> {
value.items.forEach { trailsDatabase.postQueries.insertPostOrIgnore(it) }
}
}
}
private suspend fun handleUpdate(
operation: Operation.Mutation.Update<Post.Key, Post.Properties, Post.Edges>,
value: Output,
) {
when (operation) {
is Operation.Mutation.Update.UpdateOne -> {
trailsDatabase.postQueries.updatePost(value.item)
}
is Operation.Mutation.Update.UpdateMany -> {
value.items.forEach { trailsDatabase.postQueries.updatePost(it) }
}
}
}
private suspend fun handleDelete(operation: Operation.Mutation.Delete<Post.Key>) {
when (operation) {
is Operation.Mutation.Delete.DeleteOne -> {
trailsDatabase.postQueries.deletePostById(operation.key.id.toLong())
}
is Operation.Mutation.Delete.DeleteMany -> {
operation.keys.forEach { trailsDatabase.postQueries.deletePostById(it.id.toLong()) }
}
is Operation.Mutation.Delete.DeleteAll -> {
trailsDatabase.postQueries.deleteAllPosts()
}
}
}
}
Writer
handles all operation cases, including both
mutations and queries, to maintain consistency between the local data and
remote sources.Updater
Updater
creation logic into a separate class called PostUpdaterFactory
. The Updater
is responsible for synchronizing local mutations with the remote data source.
Updater
on reads if conflicts might exist. This means we need to
handle query operations too. If we do hit code for handling a query operation,
it means we are fetching a Post
but the Bookkeeper
has an unresolved sync
failure for that Post
. So, before we can pull the latest value, we need to
push our latest local value to the network.class PostUpdaterFactory(
private val client: PostOperations,
) {
fun create(): Updater<PostOperation, PostOutput, PostWriteResponse> = Updater.by(
post = { operation, post ->
handleOperation(operation, post)
},
onCompletion = null
)
private suspend fun handleOperation(operation: PostOperation, post: PostOutput): UpdaterResult {
return when (operation) {
is Operation.Mutation.Create.InsertOne -> {
require(post is PostOutput.Single && post.value is Post.Properties)
client.insertOne(post.value as Post.Properties)
}
is Operation.Mutation.Create.InsertMany -> {
require(post is PostOutput.Collection && post.items.all { it is Post.Properties })
client.insertMany(post.items.map { it as Post.Properties })
}
is Operation.Mutation.Update.UpdateOne -> {
require(post is PostOutput.Single && post.value is Post.Node)
client.updateOne(post.value as Post.Node)
}
is Operation.Mutation.Update.UpdateMany -> {
require(post is PostOutput.Collection && post.items.all { it is Post.Node })
client.updateMany(post.items.map { it as Post.Node })
}
is Operation.Mutation.Upsert.UpsertOne -> {
require(post is PostOutput.Single && post.value is Post.Properties)
client.upsertOne(post.value as Post.Properties)
}
is Operation.Mutation.Delete.DeleteOne -> {
client.deleteOne(operation.key)
}
is Operation.Mutation.Delete.DeleteMany -> {
client.deleteMany(operation.keys)
}
is Operation.Mutation.Delete.DeleteAll -> {
client.deleteAll()
}
is Operation.Query.FindOne -> {
require(post is PostOutput.Single && post.value is Post.Node)
client.updateOne(post.value as Post.Node)
}
is Operation.Query.FindMany -> {
require(post is PostOutput.Collection && post.items.all { it is Post.Node })
client.updateMany(post.items.map { it as Post.Node })
}
is Operation.Query.FindAll -> {
require(post is PostOutput.Collection && post.items.all { it is Post.Node })
client.updateMany(post.items.map { it as Post.Node })
}
is Operation.Query.ObserveOne -> {
require(post is PostOutput.Single && post.value is Post.Node)
client.updateOne(post.value as Post.Node)
}
is Operation.Query.ObserveMany -> {
require(post is PostOutput.Collection && post.items.all { it is Post.Node })
client.updateMany(post.items.map { it as Post.Node })
}
}
}
}
Bookkeeper
class PostBookkeeperFactory(
private val trailsDatabase: TrailsDatabase,
) {
fun create(): Bookkeeper<PostOperation> =
Bookkeeper.by(
getLastFailedSync = { operation ->
require(operation is Operation.Query)
handleGetLastFailedSync(operation)
},
setLastFailedSync = { operation, timestamp ->
require(operation is Operation.Mutation)
handleSetLastFailedSync(operation, timestamp)
},
clear = { operation ->
handleClear(operation)
},
clearAll = {
trailsDatabase.postBookkeepingQueries.clearAllFailedSyncs()
true
},
)
private fun handleGetLastFailedSync(operation: Operation.Query<Post.Key, Post.Properties, Post.Edges, Post.Node>): Long? {
return when (operation) {
is Operation.Query.FindOne -> {
return firstFailedSyncOrNull(operation.key.id.toLong())
}
is Operation.Query.FindMany -> {
return operation.keys.firstOrNull { firstFailedSyncOrNull(it.id.toLong()) }
}
is Operation.Query.FindAll -> {
val failedUpdates = trailsDatabase.postBookkeepingQueries.getFailedUpdates().executeAsList()
val failedDeletes = trailsDatabase.postBookkeepingQueries.getFailedDeletes().executeAsList()
return when {
failedUpdates.isEmpty() && failedDeletes.isEmpty() -> null
failedUpdates.isNotEmpty() -> failedUpdates.first().timestamp
else -> failedDeletes.first().timestamp
}
}
is Operation.Query.ObserveOne -> {
return firstFailedSyncOrNull(operation.key.id.toLong())
}
is Operation.Query.ObserveMany -> {
return operation.keys.firstOrNull { firstFailedSyncOrNull(it.id.toLong()) }
}
}
}
private suspend fun handleSetLastFailedSync(
operation: Operation.Mutation<Post.Key, Post.Properties, Post.Edges, Post.Node>,
timestamp: Long,
): Boolean =
when (operation) {
is Operation.Mutation.Create.InsertOne -> {
trailsDatabase.postBookkeepingQueries.insertFailedCreate(operation.properties, timestamp)
true
}
is Operation.Mutation.Create.InsertMany -> {
operation.properties.forEach { properties ->
trailsDatabase.postBookkeepingQueries.insertFailedCreate(properties, timestamp)
}
true
}
is Operation.Mutation.Update.UpdateOne -> {
trailsDatabase.postBookkeepingQueries.insertFailedUpdate(operation.key.id, timestamp)
true
}
is Operation.Mutation.Update.UpdateMany -> {
operation.items.forEach { item ->
trailsDatabase.postBookkeepingQueries.insertFailedUpdate(item.key.id, timestamp)
}
true
}
is Operation.Mutation.Upsert.UpsertOne -> {
trailsDatabase.postBookkeepingQueries.insertFailedUpsert(operation.key.id, timestamp)
true
}
is Operation.Mutation.Update.UpsertMany -> {
operation.items.forEach { item ->
trailsDatabase.postBookkeepingQueries.insertFailedUpsert(item.key.id, timestamp)
}
true
}
is Operation.Mutation.Delete.DeleteOne -> {
trailsDatabase.postBookkeepingQueries.insertFailedDelete(operation.key.id, timestamp)
true
}
is Operation.Mutation.Delete.DeleteMany -> {
operation.keys.forEach { key ->
trailsDatabase.postBookkeepingQueries.insertFailedDelete(key.id, timestamp)
}
true
}
is Operation.Mutation.Delete.DeleteAll -> {
trailsDatabase.postBookkeepingQueries.insertFailedDeleteAll(timestamp)
true
}
}
private fun handleClear(operation: Operation.Mutation<Post.Key, Post.Properties, Post.Edges, Post.Node>) {
when (operation) {
is Operation.Query.FindOne -> {
clearFailedSyncs(operation.key.id.toLong())
}
is Operation.Query.FindMany -> {
operation.keys.forEach { key ->
clearFailedSyncs(key.id.toLong())
}
}
is Operation.Query.FindAll -> {
clearAllFailedSyncs()
}
is Operation.Query.ObserveOne -> {
clearFailedSyncs(operation.key.id.toLong())
}
is Operation.Query.ObserveMany -> {
operation.keys.forEach { key ->
clearFailedSyncs(key.id.toLong())
}
}
is Operation.Mutation.Create.InsertOne -> {
clearFailedSyncs(operation.key.id.toLong())
}
is Operation.Mutation.Create.InsertMany -> {
operation.keys.forEach { key ->
clearFailedSyncs(key.id.toLong())
}
}
is Operation.Mutation.Update.UpdateOne -> {
clearFailedSyncs(operation.key.id.toLong())
}
is Operation.Mutation.Update.UpdateMany -> {
operation.keys.forEach { key ->
clearFailedSyncs(key.id.toLong())
}
}
is Operation.Mutation.Upsert.UpsertOne -> {
clearFailedSyncs(operation.key.id.toLong())
}
is Operation.Mutation.Update.UpsertMany -> {
operation.keys.forEach { key ->
clearFailedSyncs(key.id.toLong())
}
}
is Operation.Mutation.Delete.DeleteOne -> {
clearFailedSyncs(operation.key.id.toLong())
}
is Operation.Mutation.Delete.DeleteMany -> {
operation.keys.forEach { key ->
clearFailedSyncs(key.id.toLong())
}
}
is Operation.Mutation.Delete.DeleteAll -> {
clearAllFailedSyncs()
}
}
}
private suspend fun firstFailedSyncOrNull(id: Long): Long? {
val failedCreates =
trailsDatabase.postBookkeepingQueries
.getFailedCreates(id)
.executeAsList()
val failedUpdates =
trailsDatabase.postBookkeepingQueries
.getFailedUpdates(id)
.executeAsList()
val failedDeletes =
trailsDatabase.postBookkeepingQueries
.getFailedDeletes(id)
.executeAsList()
return failedCreates.firstOrNull()?.timestamp
?: failedUpdates.firstOrNull()?.timestamp
?: failedDeletes.firstOrNull()?.timestamp
}
private suspend fun clearFailedSyncs(id: Long) {
trailsDatabase.postBookkeepingQueries.clearFailedCreates(id)
trailsDatabase.postBookkeepingQueries.clearFailedUpdates(id)
trailsDatabase.postBookkeepingQueries.clearFailedDeletes(id)
}
}
Store
for our Post
use case.
class PostStoreFactory(
private val client: PostOperations,
private val trailsDatabase: TrailsDatabase,
private val coroutineDispatcher: CoroutineDispatcher,
) {
private val fetcherFactory = PostFetcherFactory(client)
private val sourceOfTruthReader = PostSourceOfTruthReader(trailsDatabase, coroutineDispatcher)
private val sourceOfTruthWriter = PostSourceOfTruthWriter(trailsDatabase)
private val sourceOfTruthFactory = PostSourceOfTruthFactory(sourceOfTruthReader, sourceOfTruthWriter)
private val updaterFactory = PostUpdaterFactory(client)
private val bookkeeperFactory = PostBookkeeperFactory(trailsDatabase)
fun create(): PostStore =
MutableStoreBuilder
.from(
fetcher = fetcherFactory.create(),
sourceOfTruth = sourceOfTruthFactory.create(),
converter = createConverter(),
).build(
updater = updaterFactory.create(),
bookkeeper = bookkeeperFactory.create(),
)
private fun createConverter(): Converter<PostNetworkModel, PostEntity, Post> =
Converter
.Builder<PostOutput, PostOutput, PostOutput>()
.fromOutputToLocal { it }
.fromNetworkToLocal { it }
.build()
}
Converter
because we
are not transforming the data anymore. The Converter
simply passes the data
through as-is.