Event sourcing with Kotlin, pure functions and arrow-kt

Abstraction over business applications

In the landscape of modern software development, building robust, scalable, and maintainable systems is paramount. One architectural pattern that has gained significant traction over the last couple of years for achieving these goals is Event Sourcing.
If we look at almost any business application from a certain level of abstraction, we can consider that it can be represented as follows:

Where:

  • Command – an intention to modify the system’s state.
  • Event – a state change itself, a fact. It represents the decision that has already happened.
  • State – the present system state, shaped by preceding events.
  • Command handler/ Application Service – the place where the decision is made whether the command can be applied
  • Domain Logic – a set of business rules

In the above model, the flow is as follows:
On input, we have some kind of API. After initial validation, such a request becomes a command. It goes to a Command Handler, which may have different name, but its primary function is to retrieve the current state of the system, apply business logic to it, and decide whether there will be a change of state in our system caused by a certain business event.

If we look one level lower in our abstraction, we can see that we can distinguish between two types of systems that implement the above model:

  • StateStored systems – State-stored systems are classic systems that are only storing the current state by overwriting the previous state in the storage.
  • EventSourced systems – Event sourced systems operate on a simple principle: every action or transaction within an application generates an event. These events are immutable and represent a fact that has occurred within the system. Rather than updating the current state directly, event sourcing systems append new events to the event log. To reconstruct the current state of an entity, the system replays these events from the beginning, applying each one in sequence to derive the latest state.

Benefits of Event Sourcing

  1. Full Audit Log: With event sourcing, you have a complete history of every action that has affected the state of your application. This audit log is invaluable for debugging, compliance, and forensic analysis.
  2. Temporal Queries: Because events are stored sequentially, it becomes possible to query the state of the system at any point in time. This temporal aspect allows for insights into past states, enabling features such as time-travel debugging and historical reporting.
  3. Scalability and Performance: Event sourcing can offer performance advantages, especially in scenarios with high write throughput. By appending events rather than updating existing records, it minimizes contention and simplifies concurrency management.
  4. Fault Tolerance and Resilience: The append-only nature of event logs makes them inherently resilient to failures. Even if the application crashes or the database becomes corrupted, the event log remains intact, facilitating recovery and replication.

The list above is a list of advantages of event sourcing, which we can find in almost any blog or book. In reality, event sourcing is not always a good solution. As in any other situation in life, the right solution must be chosen appropriately for the problem faced.
Before we implement our system using event sourcing, we must be sure that we know our domain very well. Perhaps this is why event sourcing is often associated with DDD (also with CQRS, but we don’t always need the Query part).

I worked with both systems and nothing comes without price. Event sourcing is great as long as used properly. Don’t do it when you domain is unstable, not well discovered.

Event sourcing options in JVM ecosystem

There are several frameworks and libraries available in the JVM ecosystem to implement event sourcing.
Some available options are:

Axon Framework

Probably the most popular, originating from the Java world, is AxonFramework. I had the opportunity to work with it and I have mixed feelings. Probably, the description of my experience and opinions about it requires a separate post.
Here I will only point out the biggest downside, which reveals itself from the very beginning of working with AxonFramework – it is very invasive.
On the one hand, Axon’s developers put a lot of emphasis on DDD and domain independence from the so-called infrastructure. On the other hand, axon annotations such as @EventHandler, @CommandHandler, etc. are found everywhere in the domain. If you bind your service to Axon once, there is nothing left to do but use it everywhere. I assume that this is an intentional action.
In addition, what is also eye-catching is that while event sourcing is based on immutable events, all processing in Axon is based on mutable data structures.

Akka

The second very popular solution that I had the opportunity to test is the Akka framework originating from Scala. Although it should not be called a framework, because every time the word framework is used in a Scala context, one of the Scala developers dies.
Being more precise it’s Akka Persistence that allows stateful actors (or entities) to persist their state so that it can be recovered in various scenarios. The core concept is that only the events generated by these actors are stored, not the actual state itself. These events are appended to storage, preserving a full history of state changes. When an actor is recovered, it replays the stored events to rebuild its state.
Having that we can build event sourcing system, I must admit that while the solution seems interesting, it has a rather high entry level and a large learning curve.

No framework, event sourcing with only pure functions

My favorite choice is to implement event sourcing without using a framework, with pure functions ( with a little support of a small library). We are talking about JVM environment, so being precise I prefer Kotlin language and arrow-kt library.
So let’s define the basic building blocks and assumptions of such a system.

Domain Function Contract

The contract is very simple. Our domain function takes a list of events as a parameter, as well as a command. They then return a new event (or list of events).

Domain Function is a pure function

The fundamental properties of a pure function in programming are:

  1. Determinism – A pure function always produces the same output for a given set of inputs, regardless of the context in which it is called. There are no side effects or hidden states that can influence the result
  2. No Side Effects: A pure function does not modify any external state or have any observable side effects beyond computing its result.
  3. Idempotence: An idempotent function is one that, when called multiple times with the same input, produces the same result as calling it once. In the context of pure functions, idempotence ensures that repeated calls to the function do not have any unintended effects or change the state of the system.

Practical example – event sourcing account

Domain

Applying event sourcing in the field of finance seems to be a fairly natural solution. Accordingly, the demo application presents a simplified bank account.
In our domain, we can perform basic operations such as opening an account, depositing money, withdrawing money. In kotlin language, we can represent the intention to perform such operations as:

sealed interface AccountCommand {
    val accountId: AccountId
    val transactionId: TransactionId
}

class OpenAccount(override val accountId: AccountId, val ownerId: OwnerId, override val transactionId: TransactionId) :
    AccountCommand

class DepositMoney(override val accountId: AccountId, val deposit: Money, override val transactionId: TransactionId) :
    AccountCommand

class WithdrawMoney(override val accountId: AccountId, val withdraw: Money, override val transactionId: TransactionId) :
    AccountCommand

Equally easy and fun are the events that can be produced:

sealed interface AccountEvent {
    val accountId: AccountId
    val transactionId: TransactionId
    val version: Long
}

data class AccountOpened(
    override val accountId: AccountId,
    override val version: Long,
    override val transactionId: TransactionId,
    val ownerId: OwnerId,
    val initialBalance: Money,
) : AccountEvent

data class MoneyDeposited(
    override val accountId: AccountId,
    override val version: Long,
    override val transactionId: TransactionId,
    val amountDeposited: Money,
    val balance: Money,
) : AccountEvent

data class MoneyWithdrawn(
    override val accountId: AccountId,
    override val version: Long,
    override val transactionId: TransactionId,
    val amountWithdrawn: Money,
    val balance: Money,
) : AccountEvent

A small thing to pay attention to. In our domain we do not use plain types like String, Int. The only exception is the version field, in events, but we will talk about this later.

Finally, our account can be in two states, which we represent as:

sealed interface AccountState {
    data class OpenedAccount(
        val accountId: AccountId,
        val ownerId: OwnerId,
        val balance: Money,
        val version: Long,
    ) : AccountState

    data object NotInitialized : AccountState
}

As we know, the aggregate state, in the case of event sourcing, is generated by replaying all past events. Therefore, a piece of code, handling this requirement, we can implement as follows:

internal class AccountEventHandler {
    fun state(events: Sequence<AccountEvent>): Either<DomainError, AccountState> {
        val notInitialized: AccountState = AccountState.NotInitialized

        return either {
            events.fold(notInitialized) { currentState, event -> evolve(currentState, event).bind() }
        }
    }

    private fun evolve(
        currentState: AccountState,
        event: AccountEvent,
    ): Either<DomainError, AccountState> =
        either {
            when (currentState) {
                is AccountState.OpenedAccount ->
                    when (event) {
                        is MoneyDeposited ->
                            currentState.copy(
                                balance = currentState.balance.add(event.amountDeposited),
                                version = event.version,
                            )
                        is MoneyWithdrawn ->
                            currentState.copy(
                                balance = currentState.balance.subtract(event.amountWithdrawn),
                                version = event.version,
                            )
                    }
            }
            // The rest of the code ... 
        }
}

Ultimately, we need a piece of code to handle command processing:

internal class AccountCommandHandler(
    private val accountEventHandler: AccountEventHandler,
    private val accountRepository: AccountRepository,
) {
    fun handle(cmd: AccountCommand): Either<DomainError, AccountEvent> {
        return either {
            return accountRepository.getEvents(cmd.accountId).flatMap {
                if (it.any { event -> event.transactionId == cmd.transactionId }) {
                    return Either.Left(
                        DomainError.DuplicatedTransactionError("Transaction with id=${cmd.transactionId.value} already exists"),
                    )
                }
                accountEventHandler.state(it).flatMap { state ->
                    when (cmd) {
                        is OpenAccount -> handleOpenAccount(state, cmd)
                        is DepositMoney -> handleDepositMoney(state, cmd)
                        is WithdrawMoney -> handleWithdrawMoney(state, cmd)
                    }
                }.flatMap { accountEvent -> accountRepository.append(accountEvent) }
            }
        }
    }
    // ... 

That’s basically it as far as the essential code snippets needed to implement event sourcing are concerned. As you can see, we avoid throwing exceptions, and instead the returned type is Either. It represents two possible results of the operation, a domain error – DomainError, or a correct result.
Equally importantly, the domain is free of any dependencies related to frameworks or infrastructure. You won’t find any spring-boot or Hibernate annotations there. Therefore, we can say that it follows the rules of hexagonal architecture.

Testing

Testing such an application is extremely easy and enjoyable. Here is an example:

@Nested
    @DisplayName("When account is initialized")
    inner class AccountInitialized {
        @Test
        fun `then deposit event should increase balance `() {
            // given
            val initEvents =
                sequenceOf(
                    AccountOpened(accountId, version, TransactionId(UUID.randomUUID()), ownerId, initBalance),
                    MoneyDeposited(accountId, version.inc(), TransactionId(UUID.randomUUID()), tenUsd, initBalance),
                )
            val expectedState = AccountState.OpenedAccount(accountId, ownerId, initBalance.add(tenUsd), version.inc())

            // when
            val state = eventHandler.state(initEvents)

            // then
            Assertions.assertEquals(expectedState, state.getOrNull())
        }

Event Store

A key component of our application is the event store. There are several challenges and requirements that it must meet, but I’m not going to focus on them in this post. I will only point out that the event store is the key to ensure consistency in a multi-threaded environment. Therefore, we use sequenceNumber/version in our events to avoid duplicates. The database that is very suitable in our case for providing event consistency is Postgresql.
Basically, we expect two basic operations that our application must provide when interacting with an event store:

internal interface AccountRepository {
    fun getEvents(accountId: AccountId): Either<DomainError, Sequence<AccountEvent>>

    fun append(accountEvent: AccountEvent): Either<DomainError, AccountEvent>
}

Summary

This post doesn’t cover every aspect of event sourcing, but that wasn’t the intention. I wished to present one of the possible implementations. The full source code can be found here: https://github.com/zielichowski/event-sourcing-account . However, keep in mind that event sourcing, while a very useful technique, should only be used when you really need it. It is not a silver bullet, and in addition to benefits, it also brings disadvantages.


Leave a Reply

Your email address will not be published. Required fields are marked *