Introduction

This is the second part of our two-part guide on handling CRUD (Create, Read, Update, Delete) operations in Store. In the first part, we focused on defining a flexible data model and operations to accommodate complex queries and mutations. This part will focus on implementing these CRUD operations in our Store.

Prerequisites:

  • Complete Part 1 of this guide.
  • Have an understanding of the core Store concepts.

The code for this example is available in the Trails repository.

Implementing the Fetcher

We’ll begin by extracting the Fetcher creation logic into a separate class called PostFetcherFactory.

We only need to handle Query operations in the Fetcher, because we only invoke Fetcher on reads.

class PostFetcherFactory(
    private val client: PostOperations,
    private val trailsDatabase: TrailsDatabase,
) {

    fun create(): Fetcher<PostOperation, Output> =
        Fetcher.ofFlow { operation ->

            require(operation is Operation.Query)

            val mutableSharedFlow = MutableSharedFlow<Output>(
                replay = 8,
                extraBufferCapacity = 20,
                onBufferOverflow = BufferOverflow.DROP_OLDEST,
            )

            when (operation) {
                is Operation.Query.FindOne -> {
                    findAndEmitOne(operation) { mutableSharedFlow.emit(it) }
                }
                is Operation.Query.FindMany -> {
                    findAndEmitMany(operation) { mutableSharedFlow.emit(it) }
                }
                is Operation.Query.FindAll -> {
                    findAndEmitAll { mutableSharedFlow.emit(it) }
                }
                is Operation.Query.ObserveOne -> {
                    observeOneAndEmitUpdates(operation) { mutableSharedFlow.emit(it) }
                }
                is Operation.Query.ObserveMany -> {
                    observeManyAndEmitUpdates(operation) { mutableSharedFlow.emit(it) }
                }
            }

            mutableSharedFlow.asSharedFlow()
        }

    private suspend fun findAndEmitOne(
        operation: Operation.Query.FindOne<Post.Key>,
        emit: suspend (Output) -> Unit,
    ) {
        val post = client.findOne(operation.key.id)
        emit(Output.Single(post))
    }

    private suspend fun findAndEmitMany(
        operation: Operation.Query.FindMany<Post.Key>,
        emit: suspend (Output) -> Unit,
    ) {
        val posts = client.findMany(operation.keys.ids.map { it.id })
        emit(Output.Collection(posts))
    }

    private suspend fun findAndEmitAll(emit: suspend (Output) -> Unit) {
        val posts = client.findAll()
        emit(Output.Collection(posts))
    }

    private suspend fun observeOneAndEmitUpdates(
        operation: Operation.Query.ObserveOne<Post.Key>,
        emit: suspend (Output) -> Unit,
    ) {
        client.observeOne(operation.key.id).collect { post ->
            emit(Output.Single(post))
        }
    }

    private suspend fun observeManyAndEmitUpdates(
        operation: Operation.Query.ObserveMany<Post.Key>,
        emit: suspend (Output) -> Unit,
    ) {
        client.observeMany(operation.keys.map { it.id }).collect { posts ->
            emit(Output.Collection(posts))
        }
    }
}

Using Fetcher.ofFlow allows us to support operations that observe changes over time, such as ObserveOne and ObserveMany.

Implementing the Source of Truth

Next, we’ll extract the Source of Truth creation logic into separate classes for better readability. We will create a Reader to handle read operations and a Writer to handle create, update, and delete operations.

Defining the Reader

The Reader is responsible for reading data from the local database based on the operation requested.

Similar to Fetcher, our Reader only needs to handle Query operations.

class PostSourceOfTruthReader(
    private val trailsDatabase: TrailsDatabase,
    coroutineDispatcher: CoroutineDispatcher,
) {
    fun handleRead(
        operation: Operation.Query<Post.Key, Post.Properties, Post.Edges, Post.Node>,
        emit: suspend (Output?) -> Unit,
    ) {
        TODO()
    }
}

Defining the Writer

The Writer handles writing data to the local database after fetches or mutations.

However, our Writer needs to handle all Query and Mutation operations because we write to the Source of Truth on both reads and writes.

class PostSourceOfTruthWriter(
    private val trailsDatabase: TrailsDatabase,
    coroutineDispatcher: CoroutineDispatcher,
) {
    fun handleWrite(
        operation: Operation<Post.Key, Post.Properties, Post.Edges, Post.Node>,
        value: Output,
    ) {
        TODO()
    }
}

Setting Up the Source of Truth Factory

class PostSourceOfTruthFactory(
    private val reader: PostSourceOfTruthReader,
    private val writer: PostSourceOfTruthWriter,
) {
    fun create(): SourceOfTruth<PostOperation, Output, Output> =
        SourceOfTruth.of(
            reader = { operation ->
                val mutableSharedFlow =
                    MutableSharedFlow<Output?>(
                        replay = 8,
                        extraBufferCapacity = 20,
                        onBufferOverflow = BufferOverflow.DROP_OLDEST,
                    )

                reader.handleRead(operation) { mutableSharedFlow.emit(it) }

                mutableSharedFlow.asSharedFlow()
            },
            writer = { operation, output -> writer.handleWrite(operation, output) },
            delete = { operation -> writer.handleWrite(operation) },
            deleteAll = { writer.handleWrite(Operation.Mutation.Delete.DeleteAll) },
        )
}

Implementing the Reader

When emitting data, if there is no value to emit (e.g., no entities found), we return null instead of an empty list. This prevents the Store from considering the operation fulfilled and triggers a fetch from the network.

class PostSourceOfTruthReader(
    private val trailsDatabase: TrailsDatabase,
    coroutineDispatcher: CoroutineDispatcher,
) {
    private val coroutineScope = CoroutineScope(coroutineDispatcher)

    fun handleRead(
        operation: Operation.Query<Post.Key, Post.Properties, Post.Edges, Post.Node>,
        emit: suspend (Output?) -> Unit,
    ) {
        when (operation) {
            is Operation.Query.FindOne -> findOne(operation, emit)
            is Operation.Query.FindAll -> findAll(emit)
            is Operation.Query.FindMany -> findMany(operation, emit)
            is Operation.Query.ObserveOne -> observeOne(operation, emit)
            is Operation.Query.ObserveMany -> observeMany(operation, emit)
        }
    }

    private fun findOne(
        operation: Operation.Query.FindOne<Post.Key>,
        emit: suspend (Output?) -> Unit,
    ) {
        coroutineScope.launch {
            val entity =
                trailsDatabase
                    .postQueries
                    .selectPostById(operation.key.id.toLong())
                    .executeAsOneOrNull()
            val output = entity?.asNode()?.let { Output.Single(it) }
            emit(output)
        }
    }

    private fun findMany(
        operation: Operation.Query.FindMany<Post.Key>,
        emit: suspend (Output?) -> Unit,
    ) {
        coroutineScope.launch {
            val entities =
                trailsDatabase
                    .postQueries
                    .selectPostsByIds(operation.keys.map {}.map { it.toLong() })
                    .executeAsList()

            val output =
                if (entities.isEmpty()) {
                    null
                } else {
                    Output.Collection(entities.map { it.asNode() })
                }
            emit(output)
        }
    }

    private fun findAll(emit: suspend (Output?) -> Unit) {
        coroutineScope.launch {
            val entities = trailsDatabase.postQueries.selectAllPosts().executeAsList()

            val output =
                if (entities.isEmpty()) {
                    null
                } else {
                    Output.Collection(entities.map { it.asNode() })
                }
            emit(output)
        }
    }

    private fun observeOne(
        operation: Operation.Query.ObserveOne<Post.Key>,
        emit: suspend (Output?) -> Unit,
    ) {
        coroutineScope.launch {
            trailsDatabase.postQueries.selectPostById(operation.key.id.toLong()).asFlow().collect { query ->
                val entity = query.executeAsOneOrNull()
                val output = entity?.asNode()?.let { Output.Single(it) }
                emit(output)
            }
        }
    }

    private fun observeMany(
        operation: Operation.Query.ObserveMany<Post.Key>,
        emit: suspend (Output?) -> Unit,
    ) {
        coroutineScope.launch {
            trailsDatabase
                .postQueries
                .selectPostsByIds(operation.keys.map {}.map { it.toLong() })
                .asFlow()
                .collect { query ->
                    val entities = query.executeAsList()

                    val output =
                        if (entities.isEmpty()) {
                            null
                        } else {
                            Output.Collection(entities.map { it.asNode() })
                        }
                    emit(output)
                }
        }
    }
}

Implementing the Writer

class PostSourceOfTruthWriter(
    private val trailsDatabase: TrailsDatabase,
) {
    suspend fun handleWrite(
        operation: PostOperation,
        value: Output,
    ) {
        when (operation) {
            is Operation.Mutation.Create -> handleCreate(operation, value)
            is Operation.Query -> handleQuery(operation, value)
            is Operation.Mutation.Update -> handleUpdate(operation, value)
            is Operation.Mutation.Delete -> handleDelete(operation)
        }
    }

    private suspend fun handleCreate(
        operation: Operation.Mutation.Create<Post.Key, Post.Properties, Post.Edges>,
        value: Output,
    ) {
        when (operation) {
            is Operation.Mutation.Create.InsertOne -> {
                trailsDatabase.postQueries.insertPost(value.item)
            }
            is Operation.Mutation.Create.InsertMany -> {
                value.items.forEach { trailsDatabase.postQueries.insertPost(it) }
            }
        }
    }

    private suspend fun handleQuery(
        operation: Operation.Query<Post.Key, Post.Properties, Post.Edges, Post.Node>,
        value: Output,
    ) {
        when (operation) {
            is Operation.Query.FindOne -> {
                trailsDatabase.postQueries.insertPostOrIgnore(value.item)
            }
            is Operation.Query.FindMany -> {
                value.items.forEach { trailsDatabase.postQueries.insertPostOrIgnore(it) }
            }
            is Operation.Query.FindAll -> {
                value.items.forEach { trailsDatabase.postQueries.insertPostOrIgnore(it) }
            }
            is Operation.Query.ObserveOne -> {
                trailsDatabase.postQueries.insertPostOrIgnore(value.item)
            }
            is Operation.Query.ObserveMany -> {
                value.items.forEach { trailsDatabase.postQueries.insertPostOrIgnore(it) }
            }
        }
    }

    private suspend fun handleUpdate(
        operation: Operation.Mutation.Update<Post.Key, Post.Properties, Post.Edges>,
        value: Output,
    ) {
        when (operation) {
            is Operation.Mutation.Update.UpdateOne -> {
                trailsDatabase.postQueries.updatePost(value.item)
            }
            is Operation.Mutation.Update.UpdateMany -> {
                value.items.forEach { trailsDatabase.postQueries.updatePost(it) }
            }
        }
    }

    private suspend fun handleDelete(operation: Operation.Mutation.Delete<Post.Key>) {
        when (operation) {
            is Operation.Mutation.Delete.DeleteOne -> {
                trailsDatabase.postQueries.deletePostById(operation.key.id.toLong())
            }
            is Operation.Mutation.Delete.DeleteMany -> {
                operation.keys.forEach { trailsDatabase.postQueries.deletePostById(it.id.toLong()) }
            }
            is Operation.Mutation.Delete.DeleteAll -> {
                trailsDatabase.postQueries.deleteAllPosts()
            }
        }
    }
}

Ensure that your Writer handles all operation cases, including both mutations and queries, to maintain consistency between the local data and remote sources.

Implementing the Updater

Next, we’ll extract the Updater creation logic into a separate class called PostUpdaterFactory. The Updater is responsible for synchronizing local mutations with the remote data source.

We invoke Updater on reads if conflicts might exist. This means we need to handle query operations too. If we do hit code for handling a query operation, it means we are fetching a Post but the Bookkeeper has an unresolved sync failure for that Post. So, before we can pull the latest value, we need to push our latest local value to the network.

class PostUpdaterFactory(
    private val client: PostOperations,
) {

    fun create(): Updater<PostOperation, PostOutput, PostWriteResponse> = Updater.by(
        post = { operation, post ->
            handleOperation(operation, post)
        },
        onCompletion = null
    )

    private suspend fun handleOperation(operation: PostOperation, post: PostOutput): UpdaterResult {
        return when (operation) {
            is Operation.Mutation.Create.InsertOne -> {
                require(post is PostOutput.Single && post.value is Post.Properties)
                client.insertOne(post.value as Post.Properties)
            }

            is Operation.Mutation.Create.InsertMany -> {
                require(post is PostOutput.Collection && post.items.all { it is Post.Properties })
                client.insertMany(post.items.map { it as Post.Properties })
            }

            is Operation.Mutation.Update.UpdateOne -> {
                require(post is PostOutput.Single && post.value is Post.Node)
                client.updateOne(post.value as Post.Node)
            }

            is Operation.Mutation.Update.UpdateMany -> {
                require(post is PostOutput.Collection && post.items.all { it is Post.Node })
                client.updateMany(post.items.map { it as Post.Node })
            }

            is Operation.Mutation.Upsert.UpsertOne -> {
                require(post is PostOutput.Single && post.value is Post.Properties)
                client.upsertOne(post.value as Post.Properties)
            }
            is Operation.Mutation.Delete.DeleteOne -> {
                client.deleteOne(operation.key)
            }

            is Operation.Mutation.Delete.DeleteMany -> {
                client.deleteMany(operation.keys)
            }

            is Operation.Mutation.Delete.DeleteAll -> {
                client.deleteAll()
            }

            is Operation.Query.FindOne -> {
                require(post is PostOutput.Single && post.value is Post.Node)
                client.updateOne(post.value as Post.Node)
            }
            is Operation.Query.FindMany -> {
                require(post is PostOutput.Collection && post.items.all { it is Post.Node })
                client.updateMany(post.items.map { it as Post.Node })
            }
            is Operation.Query.FindAll -> {
                require(post is PostOutput.Collection && post.items.all { it is Post.Node })
                client.updateMany(post.items.map { it as Post.Node })
            }
            is Operation.Query.ObserveOne -> {
                require(post is PostOutput.Single && post.value is Post.Node)
                client.updateOne(post.value as Post.Node)
            }
            is Operation.Query.ObserveMany -> {
                require(post is PostOutput.Collection && post.items.all { it is Post.Node })
                client.updateMany(post.items.map { it as Post.Node })
            }
        }
    }
}

Implementing the Bookkeeper

We only check for failed syncs on reads. We only set failed syncs on writes.

class PostBookkeeperFactory(
    private val trailsDatabase: TrailsDatabase,
) {
    fun create(): Bookkeeper<PostOperation> =
        Bookkeeper.by(
            getLastFailedSync = { operation ->
                require(operation is Operation.Query)

                handleGetLastFailedSync(operation)
            },
            setLastFailedSync = { operation, timestamp ->
                require(operation is Operation.Mutation)
                handleSetLastFailedSync(operation, timestamp)
            },
            clear = { operation ->
                handleClear(operation)
            },
            clearAll = {
                trailsDatabase.postBookkeepingQueries.clearAllFailedSyncs()
                true
            },
        )

    private fun handleGetLastFailedSync(operation: Operation.Query<Post.Key, Post.Properties, Post.Edges, Post.Node>): Long? {
        return when (operation) {
            is Operation.Query.FindOne -> {
                return firstFailedSyncOrNull(operation.key.id.toLong())
            }

            is Operation.Query.FindMany -> {
                return operation.keys.firstOrNull { firstFailedSyncOrNull(it.id.toLong()) }
            }

            is Operation.Query.FindAll -> {
                val failedUpdates = trailsDatabase.postBookkeepingQueries.getFailedUpdates().executeAsList()
                val failedDeletes = trailsDatabase.postBookkeepingQueries.getFailedDeletes().executeAsList()

                return when {
                    failedUpdates.isEmpty() && failedDeletes.isEmpty() -> null
                    failedUpdates.isNotEmpty() -> failedUpdates.first().timestamp
                    else -> failedDeletes.first().timestamp
                }
            }

            is Operation.Query.ObserveOne -> {
                return firstFailedSyncOrNull(operation.key.id.toLong())
            }

            is Operation.Query.ObserveMany -> {
                return operation.keys.firstOrNull { firstFailedSyncOrNull(it.id.toLong()) }
            }
        }
    }

    private suspend fun handleSetLastFailedSync(
        operation: Operation.Mutation<Post.Key, Post.Properties, Post.Edges, Post.Node>,
        timestamp: Long,
    ): Boolean =
        when (operation) {
            is Operation.Mutation.Create.InsertOne -> {
                trailsDatabase.postBookkeepingQueries.insertFailedCreate(operation.properties, timestamp)
                true
            }

            is Operation.Mutation.Create.InsertMany -> {
                operation.properties.forEach { properties ->
                    trailsDatabase.postBookkeepingQueries.insertFailedCreate(properties, timestamp)
                }
                true
            }

            is Operation.Mutation.Update.UpdateOne -> {
                trailsDatabase.postBookkeepingQueries.insertFailedUpdate(operation.key.id, timestamp)
                true
            }

            is Operation.Mutation.Update.UpdateMany -> {
                operation.items.forEach { item ->
                    trailsDatabase.postBookkeepingQueries.insertFailedUpdate(item.key.id, timestamp)
                }
                true
            }

            is Operation.Mutation.Upsert.UpsertOne -> {
                trailsDatabase.postBookkeepingQueries.insertFailedUpsert(operation.key.id, timestamp)
                true
            }

            is Operation.Mutation.Update.UpsertMany -> {
                operation.items.forEach { item ->
                    trailsDatabase.postBookkeepingQueries.insertFailedUpsert(item.key.id, timestamp)
                }
                true
            }

            is Operation.Mutation.Delete.DeleteOne -> {
                trailsDatabase.postBookkeepingQueries.insertFailedDelete(operation.key.id, timestamp)
                true
            }

            is Operation.Mutation.Delete.DeleteMany -> {
                operation.keys.forEach { key ->
                    trailsDatabase.postBookkeepingQueries.insertFailedDelete(key.id, timestamp)
                }
                true
            }

            is Operation.Mutation.Delete.DeleteAll -> {
                trailsDatabase.postBookkeepingQueries.insertFailedDeleteAll(timestamp)
                true
            }
        }

    private fun handleClear(operation: Operation.Mutation<Post.Key, Post.Properties, Post.Edges, Post.Node>) {
        when (operation) {
            is Operation.Query.FindOne -> {
                clearFailedSyncs(operation.key.id.toLong())
            }

            is Operation.Query.FindMany -> {
                operation.keys.forEach { key ->
                    clearFailedSyncs(key.id.toLong())
                }
            }

            is Operation.Query.FindAll -> {
                clearAllFailedSyncs()
            }

            is Operation.Query.ObserveOne -> {
                clearFailedSyncs(operation.key.id.toLong())
            }

            is Operation.Query.ObserveMany -> {
                operation.keys.forEach { key ->
                    clearFailedSyncs(key.id.toLong())
                }
            }
            is Operation.Mutation.Create.InsertOne -> {
                clearFailedSyncs(operation.key.id.toLong())
            }

            is Operation.Mutation.Create.InsertMany -> {
                operation.keys.forEach { key ->
                    clearFailedSyncs(key.id.toLong())
                }
            }

            is Operation.Mutation.Update.UpdateOne -> {
                clearFailedSyncs(operation.key.id.toLong())
            }

            is Operation.Mutation.Update.UpdateMany -> {
                operation.keys.forEach { key ->
                    clearFailedSyncs(key.id.toLong())
                }
            }

            is Operation.Mutation.Upsert.UpsertOne -> {
                clearFailedSyncs(operation.key.id.toLong())
            }

            is Operation.Mutation.Update.UpsertMany -> {
                operation.keys.forEach { key ->
                    clearFailedSyncs(key.id.toLong())
                }
            }

            is Operation.Mutation.Delete.DeleteOne -> {
                clearFailedSyncs(operation.key.id.toLong())
            }

            is Operation.Mutation.Delete.DeleteMany -> {
                operation.keys.forEach { key ->
                    clearFailedSyncs(key.id.toLong())
                }
            }

            is Operation.Mutation.Delete.DeleteAll -> {
                clearAllFailedSyncs()
            }
        }
    }

    private suspend fun firstFailedSyncOrNull(id: Long): Long? {
        val failedCreates =
            trailsDatabase.postBookkeepingQueries
                .getFailedCreates(id)
                .executeAsList()
        val failedUpdates =
            trailsDatabase.postBookkeepingQueries
                .getFailedUpdates(id)
                .executeAsList()
        val failedDeletes =
            trailsDatabase.postBookkeepingQueries
                .getFailedDeletes(id)
                .executeAsList()

        return failedCreates.firstOrNull()?.timestamp
            ?: failedUpdates.firstOrNull()?.timestamp
            ?: failedDeletes.firstOrNull()?.timestamp
    }

    private suspend fun clearFailedSyncs(id: Long) {
        trailsDatabase.postBookkeepingQueries.clearFailedCreates(id)
        trailsDatabase.postBookkeepingQueries.clearFailedUpdates(id)
        trailsDatabase.postBookkeepingQueries.clearFailedDeletes(id)
    }
}

Conclusion

Finally, we can use these components to create a Store for our Post use case.

class PostStoreFactory(
    private val client: PostOperations,
    private val trailsDatabase: TrailsDatabase,
    private val coroutineDispatcher: CoroutineDispatcher,
) {
    private val fetcherFactory = PostFetcherFactory(client)
    private val sourceOfTruthReader = PostSourceOfTruthReader(trailsDatabase, coroutineDispatcher)
    private val sourceOfTruthWriter = PostSourceOfTruthWriter(trailsDatabase)
    private val sourceOfTruthFactory = PostSourceOfTruthFactory(sourceOfTruthReader, sourceOfTruthWriter)
    private val updaterFactory = PostUpdaterFactory(client)
    private val bookkeeperFactory = PostBookkeeperFactory(trailsDatabase)

    fun create(): PostStore =
        MutableStoreBuilder
            .from(
                fetcher = fetcherFactory.create(),
                sourceOfTruth = sourceOfTruthFactory.create(),
                converter = createConverter(),
            ).build(
                updater = updaterFactory.create(),
                bookkeeper = bookkeeperFactory.create(),
            )

    private fun createConverter(): Converter<PostNetworkModel, PostEntity, Post> =
        Converter
            .Builder<PostOutput, PostOutput, PostOutput>()
            .fromOutputToLocal { it }
            .fromNetworkToLocal { it }
            .build()
}

We use the same type for all three parameters in the Converter because we are not transforming the data anymore. The Converter simply passes the data through as-is.

Organizing our code into separate classes and factories makes it more maintainable and easier to understand. Each component has a clear responsibility.

By implementing these components, we’ve built a Store that supports complex CRUD operations, enabling efficient data synchronization between local storage and remote sources. This modular approach allows for easier testing, maintenance, and potential reuse in other parts of your app.