feat: remove snapshot on ProjectionRepository
Some checks failed
Tests / build (push) Failing after 6m58s
Tests / lint (push) Has been skipped
Tests / test (push) Has been skipped

This commit is contained in:
2025-04-17 01:24:25 +02:00
parent b6e8a2f347
commit cba9971ca1
37 changed files with 478 additions and 871 deletions

View File

@@ -2,28 +2,20 @@ package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.projection.GameList import eventDemo.business.event.projection.GameList
import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameListRepository
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.apply import eventDemo.business.event.projection.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.libs.event.projection.ProjectionRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
/** /**
* Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus]. * Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus].
*/ */
class GameListRepositoryInMemory( class GameListRepositoryInMemory : GameListRepository {
eventStore: GameEventStore, private val projectionsRepository =
snapshotConfig: SnapshotConfig = SnapshotConfig(), ProjectionRepositoryInMemory(
) : GameListRepository {
private val projectionsSnapshot =
ProjectionSnapshotRepositoryInMemory(
name = GameListRepositoryInMemory::class,
eventStore = eventStore,
snapshotCacheConfig = snapshotConfig,
applyToProjection = GameList::apply, applyToProjection = GameList::apply,
initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) },
) )
@@ -32,11 +24,11 @@ class GameListRepositoryInMemory(
projectionBus: GameProjectionBus, projectionBus: GameProjectionBus,
eventBus: GameEventBus, eventBus: GameEventBus,
) { ) {
// On new event was received, build snapshot and publish it to the projection bus // On new event was received, build projection and publish it to the projection bus
eventBus.subscribe { event -> eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {
projectionsSnapshot projectionsRepository
.applyAndPutToCache(event) .applyAndSave(event)
.also { projectionBus.publish(it) } .also { projectionBus.publish(it) }
} }
} }
@@ -48,5 +40,5 @@ class GameListRepositoryInMemory(
* It fetches it from the local cache if possible, otherwise it builds it. * It fetches it from the local cache if possible, otherwise it builds it.
*/ */
override fun getList(): List<GameList> = override fun getList(): List<GameList> =
projectionsSnapshot.getList() projectionsRepository.getList()
} }

View File

@@ -2,14 +2,12 @@ package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.projection.GameList import eventDemo.business.event.projection.GameList
import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameListRepository
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.apply import eventDemo.business.event.projection.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis import eventDemo.libs.event.projection.ProjectionRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import redis.clients.jedis.UnifiedJedis import redis.clients.jedis.UnifiedJedis
@@ -18,14 +16,10 @@ import redis.clients.jedis.UnifiedJedis
* Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus]. * Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus].
*/ */
class GameListRepositoryInRedis( class GameListRepositoryInRedis(
eventStore: GameEventStore,
jedis: UnifiedJedis, jedis: UnifiedJedis,
snapshotConfig: SnapshotConfig = SnapshotConfig(),
) : GameListRepository { ) : GameListRepository {
private val projectionsSnapshot = private val projectionsRepository =
ProjectionSnapshotRepositoryInRedis( ProjectionRepositoryInRedis(
eventStore = eventStore,
snapshotCacheConfig = snapshotConfig,
initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) },
projectionClass = GameList::class, projectionClass = GameList::class,
projectionToJson = { Json.encodeToString(GameList.serializer(), it) }, projectionToJson = { Json.encodeToString(GameList.serializer(), it) },
@@ -40,8 +34,8 @@ class GameListRepositoryInRedis(
) { ) {
eventBus.subscribe { event -> eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {
projectionsSnapshot projectionsRepository
.applyAndPutToCache(event) .applyAndSave(event)
.also { projectionBus.publish(it) } .also { projectionBus.publish(it) }
} }
} }
@@ -53,5 +47,5 @@ class GameListRepositoryInRedis(
* It fetches it from the local cache if possible, otherwise it builds it. * It fetches it from the local cache if possible, otherwise it builds it.
*/ */
override fun getList(): List<GameList> = override fun getList(): List<GameList> =
projectionsSnapshot.getList() projectionsRepository.getList()
} }

View File

@@ -2,28 +2,19 @@ package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.event.GameEvent
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.GameStateRepository
import eventDemo.business.event.projection.apply import eventDemo.business.event.projection.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.libs.event.projection.ProjectionRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
/** /**
* Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus]. * Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus].
*/ */
class GameStateRepositoryInMemory( class GameStateRepositoryInMemory : GameStateRepository {
eventStore: GameEventStore, private val projectionsRepository =
snapshotConfig: SnapshotConfig = SnapshotConfig(), ProjectionRepositoryInMemory(
) : GameStateRepository {
private val projectionsSnapshot =
ProjectionSnapshotRepositoryInMemory(
name = GameStateRepositoryInMemory::class,
eventStore = eventStore,
snapshotCacheConfig = snapshotConfig,
applyToProjection = GameState::apply, applyToProjection = GameState::apply,
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },
) )
@@ -32,33 +23,19 @@ class GameStateRepositoryInMemory(
projectionBus: GameProjectionBus, projectionBus: GameProjectionBus,
eventBus: GameEventBus, eventBus: GameEventBus,
) { ) {
// On new event was received, build snapshot and publish it to the projection bus // On new event was received, build projection and publish it to the projection bus
eventBus.subscribe { event -> eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {
projectionsSnapshot projectionsRepository
.applyAndPutToCache(event) .applyAndSave(event)
.also { projectionBus.publish(it) } .also { projectionBus.publish(it) }
} }
} }
} }
/** /**
* Get the last version of the [GameState] from the all eventStream. * Get the [GameState].
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/ */
override fun getLast(gameId: GameId): GameState = override fun get(gameId: GameId): GameState =
projectionsSnapshot.getLast(gameId) projectionsRepository.get(gameId)
/**
* Get the [GameState] to the specific [event][GameEvent].
* It does not contain the [events][GameEvent] it after this one.
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/
override fun getUntil(event: GameEvent): GameState =
projectionsSnapshot.getUntil(event)
override fun count(gameId: GameId): Int =
projectionsSnapshot.count(gameId)
} }

View File

@@ -2,14 +2,11 @@ package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.event.GameEvent
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.GameStateRepository
import eventDemo.business.event.projection.apply import eventDemo.business.event.projection.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis import eventDemo.libs.event.projection.ProjectionRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import redis.clients.jedis.UnifiedJedis import redis.clients.jedis.UnifiedJedis
@@ -18,14 +15,10 @@ import redis.clients.jedis.UnifiedJedis
* Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus]. * Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus].
*/ */
class GameStateRepositoryInRedis( class GameStateRepositoryInRedis(
eventStore: GameEventStore,
jedis: UnifiedJedis, jedis: UnifiedJedis,
snapshotConfig: SnapshotConfig = SnapshotConfig(),
) : GameStateRepository { ) : GameStateRepository {
private val projectionsSnapshot = private val projectionsRepository =
ProjectionSnapshotRepositoryInRedis( ProjectionRepositoryInRedis(
eventStore = eventStore,
snapshotCacheConfig = snapshotConfig,
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },
projectionClass = GameState::class, projectionClass = GameState::class,
projectionToJson = { Json.encodeToString(GameState.serializer(), it) }, projectionToJson = { Json.encodeToString(GameState.serializer(), it) },
@@ -38,33 +31,19 @@ class GameStateRepositoryInRedis(
projectionBus: GameProjectionBus, projectionBus: GameProjectionBus,
eventBus: GameEventBus, eventBus: GameEventBus,
) { ) {
// On new event was received, build snapshot and publish it to the projection bus // On new event was received, build projection and publish it to the projection bus
eventBus.subscribe { event -> eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {
projectionsSnapshot projectionsRepository
.applyAndPutToCache(event) .applyAndSave(event)
.also { projectionBus.publish(it) } .also { projectionBus.publish(it) }
} }
} }
} }
/** /**
* Get the last version of the [GameState] from the all eventStream. * Get the [GameState].
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/ */
override fun getLast(gameId: GameId): GameState = override fun get(gameId: GameId): GameState =
projectionsSnapshot.getLast(gameId) projectionsRepository.get(gameId)
/**
* Get the [GameState] to the specific [event][GameEvent].
* It does not contain the [events][GameEvent] it after this one.
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/
override fun getUntil(event: GameEvent): GameState =
projectionsSnapshot.getUntil(event)
override fun count(gameId: GameId): Int =
projectionsSnapshot.count(gameId)
} }

View File

@@ -38,7 +38,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) {
// Read the last played card on the game. // Read the last played card on the game.
get<Game.Card> { body -> get<Game.Card> { body ->
gameStateRepository gameStateRepository
.getLast(body.game.id) .get(body.game.id)
.cardOnCurrentStack .cardOnCurrentStack
?.let { call.respond(it) } ?.let { call.respond(it) }
?: call.response.status(HttpStatusCode.BadRequest) ?: call.response.status(HttpStatusCode.BadRequest)
@@ -46,7 +46,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) {
// Read the last played card on the game. // Read the last played card on the game.
get<Game.State> { body -> get<Game.State> { body ->
val state = gameStateRepository.getLast(body.game.id) val state = gameStateRepository.get(body.game.id)
call.respond(state) call.respond(state)
} }
} }

View File

@@ -86,12 +86,12 @@ class GameCommandHandler(
* If the command fail, send an [error notification][CommandErrorNotification], * If the command fail, send an [error notification][CommandErrorNotification],
* if success, send a [success notification][CommandSuccessNotification] * if success, send a [success notification][CommandSuccessNotification]
*/ */
suspend fun handle( fun handle(
player: Player, player: Player,
gameId: GameId, gameId: GameId,
command: GameCommand, command: GameCommand,
sendSuccess: suspend () -> Unit, sendSuccess: () -> Unit,
sendError: suspend (message: String) -> Unit, sendError: (message: String) -> Unit,
) { ) {
if (command.payload.aggregateId.id != gameId.id) { if (command.payload.aggregateId.id != gameId.id) {
logger.warn { "Handle command Refuse, the gameId of the command is not the same" } logger.warn { "Handle command Refuse, the gameId of the command is not the same" }
@@ -114,26 +114,26 @@ class GameCommandHandler(
} }
} }
private fun SendChannel<Notification>.sendSuccess(command: GameCommand): suspend () -> Unit = private fun SendChannel<Notification>.sendSuccess(command: GameCommand): () -> Unit =
{ {
val logger = KotlinLogging.logger { } val logger = KotlinLogging.logger { }
CommandSuccessNotification(commandId = command.id) CommandSuccessNotification(commandId = command.id)
.also { notification -> .also { notification ->
withLoggingContext("notification" to notification.toString(), "commandId" to command.id.toString()) { withLoggingContext("notification" to notification.toString(), "commandId" to command.id.toString()) {
logger.debug { "Notification SUCCESS sent" } logger.debug { "Notification SUCCESS sent" }
send(notification) trySend(notification)
} }
} }
} }
private fun SendChannel<Notification>.sendError(command: GameCommand): suspend (message: String) -> Unit = private fun SendChannel<Notification>.sendError(command: GameCommand): (message: String) -> Unit =
{ {
val logger = KotlinLogging.logger { } val logger = KotlinLogging.logger { }
CommandErrorNotification(message = it, command = command) CommandErrorNotification(message = it, command = command)
.also { notification -> .also { notification ->
withLoggingContext("notification" to notification.toString(), "command" to command.toString()) { withLoggingContext("notification" to notification.toString(), "command" to command.toString()) {
logger.warn { "Notification ERROR sent: ${notification.message}" } logger.warn { "Notification ERROR sent: ${notification.message}" }
send(notification) trySend(notification)
} }
} }
} }

View File

@@ -12,7 +12,7 @@ data class ICantPlay(
private val gameStateRepository: GameStateRepository, private val gameStateRepository: GameStateRepository,
) : CommandAction<ICantPlayCommand, PlayerHavePassEvent> { ) : CommandAction<ICantPlayCommand, PlayerHavePassEvent> {
override fun run(command: ICantPlayCommand): (version: Int) -> PlayerHavePassEvent { override fun run(command: ICantPlayCommand): (version: Int) -> PlayerHavePassEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId) val state = gameStateRepository.get(command.payload.aggregateId)
if (state.currentPlayerTurn != command.payload.player) { if (state.currentPlayerTurn != command.payload.player) {
throw CommandException("Its not your turn!") throw CommandException("Its not your turn!")

View File

@@ -12,7 +12,7 @@ data class IWantToJoinTheGame(
private val gameStateRepository: GameStateRepository, private val gameStateRepository: GameStateRepository,
) : CommandAction<IWantToJoinTheGameCommand, NewPlayerEvent> { ) : CommandAction<IWantToJoinTheGameCommand, NewPlayerEvent> {
override fun run(command: IWantToJoinTheGameCommand): (version: Int) -> NewPlayerEvent { override fun run(command: IWantToJoinTheGameCommand): (version: Int) -> NewPlayerEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId) val state = gameStateRepository.get(command.payload.aggregateId)
if (!state.isStarted) { if (!state.isStarted) {
return { return {
NewPlayerEvent( NewPlayerEvent(

View File

@@ -12,7 +12,7 @@ data class IWantToPlayCard(
private val gameStateRepository: GameStateRepository, private val gameStateRepository: GameStateRepository,
) : CommandAction<IWantToPlayCardCommand, CardIsPlayedEvent> { ) : CommandAction<IWantToPlayCardCommand, CardIsPlayedEvent> {
override fun run(command: IWantToPlayCardCommand): (version: Int) -> CardIsPlayedEvent { override fun run(command: IWantToPlayCardCommand): (version: Int) -> CardIsPlayedEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId) val state = gameStateRepository.get(command.payload.aggregateId)
if (!state.isStarted) { if (!state.isStarted) {
throw CommandException("The game is Not started") throw CommandException("The game is Not started")

View File

@@ -13,7 +13,7 @@ class IamReadyToPlay(
) : CommandAction<IamReadyToPlayCommand, PlayerReadyEvent> { ) : CommandAction<IamReadyToPlayCommand, PlayerReadyEvent> {
@Throws(CommandException::class) @Throws(CommandException::class)
override fun run(command: IamReadyToPlayCommand): (version: Int) -> PlayerReadyEvent { override fun run(command: IamReadyToPlayCommand): (version: Int) -> PlayerReadyEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId) val state = gameStateRepository.get(command.payload.aggregateId)
val playerExist: Boolean = state.players.contains(command.payload.player) val playerExist: Boolean = state.players.contains(command.payload.player)
val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(command.payload.player) val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(command.payload.player)

View File

@@ -1,12 +1,7 @@
package eventDemo.business.event.projection package eventDemo.business.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.event.GameEvent
interface GameStateRepository { interface GameStateRepository {
fun getLast(gameId: GameId): GameState fun get(gameId: GameId): GameState
fun getUntil(event: GameEvent): GameState
fun count(gameId: GameId): Int
} }

View File

@@ -37,7 +37,7 @@ class ReactionListener(
} }
} }
private suspend fun sendStartGameEvent(state: GameState) { private fun sendStartGameEvent(state: GameState) {
if (state.isReady && !state.isStarted) { if (state.isReady && !state.isStarted) {
val reactionEvent = val reactionEvent =
eventHandler.handle(state.aggregateId) { eventHandler.handle(state.aggregateId) {
@@ -54,7 +54,7 @@ class ReactionListener(
} }
} }
private suspend fun sendWinnerEvent(state: GameState) { private fun sendWinnerEvent(state: GameState) {
val winner = state.playerHasNoCardLeft().firstOrNull() val winner = state.playerHasNoCardLeft().firstOrNull()
if (winner != null) { if (winner != null) {
val reactionEvent = val reactionEvent =

View File

@@ -13,7 +13,6 @@ import eventDemo.business.event.GameEventStore
import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameListRepository
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.GameStateRepository
import eventDemo.libs.event.projection.SnapshotConfig
import org.koin.core.module.Module import org.koin.core.module.Module
import org.koin.core.module.dsl.singleOf import org.koin.core.module.dsl.singleOf
import org.koin.core.scope.Scope import org.koin.core.scope.Scope
@@ -65,10 +64,10 @@ fun Module.configureDIInfrastructure(config: Configuration) {
singleOf(::GameProjectionBusInRabbitMQ) bind GameProjectionBus::class singleOf(::GameProjectionBusInRabbitMQ) bind GameProjectionBus::class
single { single {
GameStateRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig()) GameStateRepositoryInRedis(get())
} bind GameStateRepository::class } bind GameStateRepository::class
single { single {
GameListRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig()) GameListRepositoryInRedis(get())
} bind GameListRepository::class } bind GameListRepository::class
} }

View File

@@ -1,13 +1,22 @@
package eventDemo.libs.bus package eventDemo.libs.bus
interface Bus<T> { interface Bus<T> {
suspend fun publish(item: T) /**
* Publish a new [message][item] to the bus.
*/
fun publish(item: T)
/** /**
* @param priority The higher the priority, the more it will be called first * Subscribe a [lambda][block] to the bus.
*
* When a message is sent to the bus, the [block] is executed.
*/ */
fun subscribe(block: suspend (T) -> Unit): Subscription fun subscribe(block: (T) -> Unit): Subscription
/**
* The returns of the [subscribe] method.
* It can be called to [cancel][close] the subscription.
*/
interface Subscription : AutoCloseable { interface Subscription : AutoCloseable {
override fun close() override fun close()
} }

View File

@@ -9,21 +9,19 @@ class BusInMemory<E>(
val name: KClass<*> = BusInMemory::class, val name: KClass<*> = BusInMemory::class,
) : Bus<E> { ) : Bus<E> {
private val logger = KotlinLogging.logger(name.qualifiedName.toString()) private val logger = KotlinLogging.logger(name.qualifiedName.toString())
private val subscribers: MutableList<suspend (E) -> Unit> = mutableListOf() private val subscribers: MutableList<(E) -> Unit> = mutableListOf()
override suspend fun publish(item: E) { override fun publish(item: E) {
withLoggingContext("busItem" to item.toString()) { withLoggingContext("busItem" to item.toString()) {
logger.info { "Item sent to the bus" } logger.info { "Item sent to the bus" }
subscribers subscribers
.forEach { .forEach {
coroutineScope { it(item)
it(item)
}
} }
} }
} }
override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { override fun subscribe(block: (E) -> Unit): Bus.Subscription {
subscribers.add(block) subscribers.add(block)
return object : Bus.Subscription { return object : Bus.Subscription {
override fun close() { override fun close() {

View File

@@ -42,21 +42,19 @@ class BusInRabbitMQ<E>(
} }
} }
override suspend fun publish(item: E) { override fun publish(item: E) {
connection connection
.createChannel() .createChannel()
.use { .basicPublish(
it.basicPublish( exchangeName,
exchangeName, routingKey,
routingKey, AMQP.BasicProperties(),
AMQP.BasicProperties(), objectToString(item).toByteArray(),
objectToString(item).toByteArray(), )
)
}
logger.info { "Item sent to the bus" } logger.info { "Item sent to the bus" }
} }
override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { override fun subscribe(block: (E) -> Unit): Bus.Subscription {
connection connection
.createChannel() .createChannel()
.also { channel -> .also { channel ->

View File

@@ -44,7 +44,7 @@ class CommandHandler<B : Bus<E>, E : Event<ID>, ID : AggregateId, C : Command>(
* *
* It restricts to run only once the [command]. * It restricts to run only once the [command].
*/ */
suspend fun handle( fun handle(
aggregateId: ID, aggregateId: ID,
command: C, command: C,
callback: CommandCallback<C>, callback: CommandCallback<C>,
@@ -99,10 +99,10 @@ private class EventCommandMap<C : Command, E : Event<*>>(
val event: E, val event: E,
val date: Instant, val date: Instant,
) { ) {
suspend operator fun invoke(error: CommandException? = null) { operator fun invoke(error: CommandException? = null) {
callback(command, error) callback(command, error)
} }
} }
} }
typealias CommandCallback<C> = suspend (command: C, error: CommandException?) -> Unit typealias CommandCallback<C> = (command: C, error: CommandException?) -> Unit

View File

@@ -14,7 +14,7 @@ class CommandRunnerController<C : Command>(
) { ) {
private val executedCommand: ConcurrentHashMap<CommandId, Pair<Boolean, Instant>> = ConcurrentHashMap() private val executedCommand: ConcurrentHashMap<CommandId, Pair<Boolean, Instant>> = ConcurrentHashMap()
suspend fun runOnlyOnce( fun runOnlyOnce(
command: C, command: C,
action: CommandBlock<C>, action: CommandBlock<C>,
) { ) {

View File

@@ -34,7 +34,7 @@ class CommandStreamChannel<C : Command>(
} }
} }
private suspend fun runAndLogStatus( private fun runAndLogStatus(
command: C, command: C,
action: CommandBlock<C>, action: CommandBlock<C>,
) { ) {
@@ -47,4 +47,4 @@ class CommandStreamChannel<C : Command>(
} }
} }
typealias CommandBlock<C> = suspend (C) -> Unit typealias CommandBlock<C> = (C) -> Unit

View File

@@ -4,7 +4,7 @@ package eventDemo.libs.event
* A stream to publish and read the played card event. * A stream to publish and read the played card event.
*/ */
interface EventHandler<E : Event<ID>, ID : AggregateId> { interface EventHandler<E : Event<ID>, ID : AggregateId> {
suspend fun handle( fun handle(
aggregateId: ID, aggregateId: ID,
buildEvent: (version: Int) -> E, buildEvent: (version: Int) -> E,
): E ): E

View File

@@ -19,7 +19,7 @@ class EventHandlerImpl<E : Event<ID>, ID : AggregateId>(
/** /**
* Build Event then send it to the event store and bus. * Build Event then send it to the event store and bus.
*/ */
override suspend fun handle( override fun handle(
aggregateId: ID, aggregateId: ID,
buildEvent: (version: Int) -> E, buildEvent: (version: Int) -> E,
): E = ): E =
@@ -34,13 +34,9 @@ class EventHandlerImpl<E : Event<ID>, ID : AggregateId>(
.also { .also {
withLoggingContext("event" to it.toString()) { withLoggingContext("event" to it.toString()) {
eventStore.publish(it) eventStore.publish(it)
eventBus.publish(it)
} }
} }
}.also { event ->
withLoggingContext("event" to event.toString()) {
// Publish to the bus
eventBus.publish(event)
}
} }
} }
} }

View File

@@ -0,0 +1,34 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
interface ProjectionRepository<E : Event<ID>, P : Projection<ID>, ID : AggregateId> {
/**
* Update projection with the event.
*/
fun apply(event: E): P
/**
* Update projection with the event, and save it.
*/
fun applyAndSave(event: E): P
/**
* Save the projection.
*/
fun save(projection: P)
/**
* Build the list of all [Projections][Projection]
*/
fun getList(
limit: Int = 100,
offset: Int = 0,
): List<P>
/**
* Build the last version of the [Projection] from the cache.
*/
fun get(aggregateId: ID): P
}

View File

@@ -0,0 +1,64 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import io.ktor.util.collections.ConcurrentMap
/**
* Repository abstraction to declare common process
*/
abstract class ProjectionRepositoryAbs<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val applyToProjection: P.(event: E) -> P,
) : ProjectionRepository<E, P, ID> {
private val logger = KotlinLogging.logger {}
/**
* Update projection with the event.
*
* 1. get the last projection
* 2. apply the new event to the projection
*/
override fun apply(event: E): P =
get(event.aggregateId).applyToProjectionSecure(event)
/**
* Update projection with the event, and save it.
*
* 1. get the last projection
* 2. apply the new event to projection
* 3. save it
*/
override fun applyAndSave(event: E): P =
apply(event)
.also {
withLoggingContext("projection" to it.toString(), "event" to event.toString()) {
save(it)
}
}
/**
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
*/
protected val applyToProjectionSecure: P.(event: E) -> P = { event ->
withLoggingContext("event" to event.toString(), "projection" to this.toString()) {
if (canBeApply(event)) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
"Event is already in the Projection, skip apply.".let {
logger.warn { it }
error(it)
}
} else {
"The version of the event must follow directly after the version of the projection.".let {
logger.error { it }
error(it)
}
}
}
}
private fun P.canBeApply(event: E): Boolean =
event.version == lastEventVersion + 1
}

View File

@@ -0,0 +1,46 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import java.util.concurrent.ConcurrentHashMap
class ProjectionRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val initialStateBuilder: (aggregateId: ID) -> P,
applyToProjection: P.(event: E) -> P,
) : ProjectionRepositoryAbs<E, P, ID>(applyToProjection),
ProjectionRepository<E, P, ID> {
private val projections: ConcurrentHashMap<ID, P> = ConcurrentHashMap()
/**
* Build the list of all [Projections][Projection]
*/
override fun getList(
limit: Int,
offset: Int,
): List<P> =
projections
.values
.drop(offset)
.take(limit)
/**
* Get the [Projection].
*/
override fun get(aggregateId: ID): P =
projections[aggregateId]
?: initialStateBuilder(aggregateId)
/**
* Save the projection.
*/
override fun save(projection: P) {
projections.compute(projection.aggregateId) { id: ID, proj: P? ->
val currentProjection = proj ?: initialStateBuilder(projection.aggregateId)
if (currentProjection.lastEventVersion < projection.lastEventVersion) {
projection
} else {
currentProjection
}
}
}
}

View File

@@ -0,0 +1,80 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import io.github.oshai.kotlinlogging.KotlinLogging
import redis.clients.jedis.UnifiedJedis
import redis.clients.jedis.params.ScanParams
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.reflect.KClass
class ProjectionRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val jedis: UnifiedJedis,
private val initialStateBuilder: (aggregateId: ID) -> P,
private val projectionClass: KClass<P>,
private val projectionToJson: (P) -> String,
private val jsonToProjection: (String) -> P,
applyToProjection: P.(event: E) -> P,
) : ProjectionRepositoryAbs<E, P, ID>(applyToProjection),
ProjectionRepository<E, P, ID> {
val logger = KotlinLogging.logger { }
private val lock = ReentrantLock()
/**
* Get the list of all [Projections][Projection]
*/
override fun getList(
limit: Int,
offset: Int,
): List<P> =
jedis
.hscan(
projectionClass.redisHKey,
offset.toString(),
ScanParams()
.match("*")
.count(limit),
).result
.mapNotNull {
jsonToProjection(it.value)
}
/**
* Get the [Projection].
*/
override fun get(aggregateId: ID): P =
jedis
.hget(
projectionClass.redisHKey,
aggregateId.id.toString(),
).let {
if (it == null || it == "nil") {
initialStateBuilder(aggregateId)
} else {
jsonToProjection(it)
}
}
override fun save(projection: P) {
lock.withLock {
if (get(projection.aggregateId).lastEventVersion < projection.lastEventVersion) {
jedis.hset(
projection.redisHKey,
projection.aggregateId.id.toString(),
projectionToJson(projection),
)
logger.info { "Projection saved" }
} else {
logger.error { "Projection save SKIP (an early version exists)" }
error("Projection save SKIP (an early version exists)")
}
}
}
}
private val <P : Projection<*>> KClass<P>.redisHKey: String get() =
"projection:$simpleName"
private val <P : Projection<*>> P.redisHKey: String get() =
this::class.redisHKey

View File

@@ -1,35 +0,0 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
interface ProjectionSnapshotRepository<E : Event<ID>, P : Projection<ID>, ID : AggregateId> {
/**
* Create a snapshot for the event
*/
suspend fun applyAndPutToCache(event: E): P
fun count(aggregateId: ID): Int
fun countAll(): Int
/**
* Build the list of all [Projections][Projection]
*/
fun getList(
limit: Int = 100,
offset: Int = 0,
): List<P>
/**
* Build the last version of the [Projection] from the cache.
*/
fun getLast(aggregateId: ID): P
/**
* Build the [Projection] to the specific [event][Event].
*
* It does not contain the [events][Event] it after this one.
*/
fun getUntil(event: E): P
}

View File

@@ -1,226 +0,0 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStream
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KClass
class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
val name: KClass<*> = ProjectionSnapshotRepositoryInMemory::class,
private val eventStore: EventStore<E, ID>,
private val initialStateBuilder: (aggregateId: ID) -> P,
private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(),
private val applyToProjection: P.(event: E) -> P,
) : ProjectionSnapshotRepository<E, P, ID> {
private val projectionsSnapshot: ConcurrentHashMap<ID, ConcurrentLinkedQueue<Pair<P, Instant>>> = ConcurrentHashMap()
private val logger = KotlinLogging.logger(name.qualifiedName.toString())
/**
* Create a snapshot for the event
*
* 1. get the last snapshot with a version lower than that of the event
* 2. get the events with a greater version of the snapshot
* 3. apply the event to the snapshot
* 4. apply the new event to the projection
* 5. save it
* 6. remove old one
*/
override suspend fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
withLoggingContext("projection" to it.toString()) {
save(it)
removeOldSnapshot(it.aggregateId)
}
}
override fun count(aggregateId: ID): Int =
projectionsSnapshot[aggregateId]?.count() ?: 0
override fun countAll(): Int =
projectionsSnapshot.mappingCount().toInt()
/**
* Build the list of all [Projections][Projection]
*/
override fun getList(
limit: Int,
offset: Int,
): List<P> =
projectionsSnapshot
.map { (id, b) ->
getLast(id)
}.drop(offset)
.take(limit)
/**
* Build the last version of the [Projection] from the cache.
*
* 1. get the last snapshot
* 2. get the missing event to the snapshot
* 3. apply the missing events to the snapshot
*/
override fun getLast(aggregateId: ID): P {
val lastSnapshot = getLastSnapshot(aggregateId)?.first
val missingEventOfSnapshot = getEventAfterTheSnapshot(aggregateId, lastSnapshot)
return lastSnapshot.applyEvents(aggregateId, missingEventOfSnapshot)
}
/**
* Build the [Projection] to the specific [event][Event].
*
* It does not contain the [events][Event] it after this one.
*
* 1. get the last snapshot before the event
* 2. get the events with a greater version of the snapshot but lower of passed event
* 3. apply the events to the snapshot
*/
override fun getUntil(event: E): P {
val lastSnapshot = getLastSnapshotBeforeOrEqualEvent(event)?.first
if (lastSnapshot?.lastEventVersion == event.version) {
return lastSnapshot
}
val missingEventOfSnapshot =
eventStore
.getStream(event.aggregateId)
// take the last snapshot version +1 to event version
.readVersionBetween(lastSnapshot, event)
return if (lastSnapshot?.lastEventVersion == event.version) {
lastSnapshot
} else {
lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot)
}
}
/**
* Remove the oldest [snapshot][P] of the [queue][projectionsSnapshot].
*
* The rules are pass in the controller.
*/
private fun removeOldSnapshot(aggregateId: ID) {
projectionsSnapshot[aggregateId]?.let { queue ->
if (snapshotCacheConfig.enabled) {
queue
.excludeFirstAndLast()
.excludeTheHeadBySize()
.excludeNewerByDate()
.excludeByModulo()
.forEach { queue.remove(it) }
}
}
}
/**
* Return a new list without the first and last snapshot.
*
* Exclude from deletion the first and the last.
*/
private fun FilteredList<P>.excludeFirstAndLast(): FilteredList<P> =
sortedBy { it.first.lastEventVersion }
.drop(1)
.dropLast(1)
/**
* Return a new list of event filtered by the version modulo.
*
* Exclude from deletion 1 element out of 10 (if modulo 10 in [config][snapshotCacheConfig]).
*/
private fun FilteredList<P>.excludeByModulo(): FilteredList<P> =
filter { (it.first.lastEventVersion % snapshotCacheConfig.modulo) != 1 }
/**
* Return a new list of event filtered by the maximum size.
*
* Exclude from removal all [snapshot][projectionsSnapshot] that in the head of the queue.
*/
private fun FilteredList<P>.excludeTheHeadBySize(): FilteredList<P> {
// filter if size exceeds the limit
return sortedBy { it.first.lastEventVersion }
.dropLast(snapshotCacheConfig.maxSnapshotCacheSize)
}
/**
* Return a new list of event filtered by the maximum date.
*
* Exclude from removal all [snapshot][projectionsSnapshot] that newer of the date (in [config][SnapshotConfig]).
*/
private fun FilteredList<P>.excludeNewerByDate(): FilteredList<P> {
val now = Clock.System.now()
val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl
return filter { deadLine < it.second }
}
/**
* Save the snapshot.
*/
private fun save(projection: P) {
projectionsSnapshot
.computeIfAbsent(projection.aggregateId) { ConcurrentLinkedQueue() }
.add(Pair(projection, Clock.System.now()))
.also { logger.info { "Projection saved" } }
}
/**
* Get the last snapshot when the version is lower of then event version
*/
private fun getLastSnapshotBeforeOrEqualEvent(event: E) =
projectionsSnapshot[event.aggregateId]
?.sortedByDescending { it.first.lastEventVersion }
?.find { it.first.lastEventVersion <= event.version }
/**
* Get the last snapshot (with the higher version).
*/
private fun getLastSnapshot(aggregateId: ID) =
projectionsSnapshot[aggregateId]
?.maxByOrNull { it.first.lastEventVersion }
/**
* Get the events from the [event stream][EventStream] when the version is higher of the snapshot.
*
* If the snapshot is null, it takes all events from the event [event stream][EventStream]
*/
private fun getEventAfterTheSnapshot(
aggregateId: ID,
snapshot: P?,
) =
eventStore
.getStream(aggregateId)
.readGreaterOfVersion(snapshot?.lastEventVersion ?: 0)
/**
* Apply events to the projection.
*/
private fun P?.applyEvents(
aggregateId: ID,
eventsToApply: Set<E>,
): P =
eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure)
/**
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
*/
private val applyToProjectionSecure: P.(event: E) -> P = { event ->
withLoggingContext("event" to event.toString(), "projection" to this.toString()) {
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")
}
}
}
}
private typealias FilteredList<P> = Collection<Pair<P, Instant>>

View File

@@ -1,240 +0,0 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.toRanges
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import redis.clients.jedis.UnifiedJedis
import redis.clients.jedis.params.ScanParams
import redis.clients.jedis.params.SortingParams
import kotlin.reflect.KClass
class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val eventStore: EventStore<E, ID>,
private val jedis: UnifiedJedis,
private val initialStateBuilder: (aggregateId: ID) -> P,
private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(),
private val projectionClass: KClass<P>,
private val projectionToJson: (P) -> String,
private val jsonToProjection: (String) -> P,
private val applyToProjection: P.(event: E) -> P,
) : ProjectionSnapshotRepository<E, P, ID> {
val logger = KotlinLogging.logger { }
/**
* Create a snapshot for the event
*
* 1. get the last snapshot with a version lower than that of the event
* 2. get the events with a greater version of the snapshot
* 3. apply the event to the snapshot
* 4. apply the new event to the projection
* 5. save it
* 6. remove old one
*/
override suspend fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
withLoggingContext(mapOf("projection" to it.toString(), "event" to event.toString())) {
save(it)
removeOldSnapshot(it.aggregateId, event.version)
}
}
override fun count(aggregateId: ID): Int =
jedis.zcount(projectionClass.redisKey(aggregateId), Double.MIN_VALUE, Double.MAX_VALUE).toInt()
override fun countAll(): Int =
jedis.zcount(projectionClass.redisKey, Double.MIN_VALUE, Double.MAX_VALUE).toInt()
/**
* Get the list of all [Projections][Projection]
*/
override fun getList(
limit: Int,
offset: Int,
): List<P> =
jedis
.scan(
offset.toString(),
ScanParams()
.match(projectionClass.redisKeySearchList)
.count(limit),
).result
.mapNotNull { key ->
getLastByKey(key)
?.let(jsonToProjection)
}
/**
* Get the last version of the [Projection] from the cache.
*
* 1. get the last snapshot
* 2. get the missing event to the snapshot
* 3. apply the missing events to the snapshot
*/
override fun getLast(aggregateId: ID): P =
getLastByKey(projectionClass.redisKey(aggregateId))
?.let(jsonToProjection)
?: initialStateBuilder(aggregateId)
private fun getLastByKey(key: String): String? =
jedis
.sort(
key,
SortingParams()
.desc()
.by("score")
.limit(0, 1),
).firstOrNull()
/**
* Build the [Projection] to the specific [event][Event].
*
* It does not contain the [events][Event] it after this one.
*
* 1. get the last snapshot before the event
* 2. get the events with a greater version of the snapshot but lower of passed event
* 3. apply the events to the snapshot
*/
override fun getUntil(event: E): P {
val lastSnapshot =
jedis
.zrangeByScore(
projectionClass.redisKey(event.aggregateId),
1.0,
event.version.toDouble(),
0,
1,
).firstOrNull()
?.let(jsonToProjection)
if (lastSnapshot?.lastEventVersion == event.version) {
return lastSnapshot
}
if (lastSnapshot != null && lastSnapshot.lastEventVersion > event.version) {
logger.error { "Cannot be apply event on more recent snapshot" }
error("Cannot be apply event on more recent snapshot")
}
val missingEventOfSnapshot =
eventStore
.getStream(event.aggregateId)
// take the last snapshot version +1 to event version
.readVersionBetween(lastSnapshot, event)
return if (lastSnapshot?.lastEventVersion == event.version) {
lastSnapshot
} else {
lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot)
}
}
private fun save(projection: P) {
val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection))
if (added < 1) {
logger.error { "Projection NOT saved (already exists)" }
} else {
logger.info { "Projection saved" }
if (snapshotCacheConfig.maxSnapshotCacheTtl.isFinite()) {
jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds)
}
}
}
/**
* Apply events to the projection.
*/
private fun P?.applyEvents(
aggregateId: ID,
eventsToApply: Set<E>,
): P =
eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure)
/**
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
*/
private val applyToProjectionSecure: P.(event: E) -> P = { event ->
withLoggingContext("event" to event.toString(), "projection" to this.toString()) {
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")
}
}
}
fun removeOldSnapshot(
aggregateId: AggregateId,
lastVersion: Int,
) {
if (snapshotCacheConfig.enabled) {
removeByModulo(aggregateId, lastVersion)
removeTheHeadBySize(aggregateId, lastVersion)
}
}
private fun removeByModulo(
aggregateId: AggregateId,
lastVersion: Int,
) {
(lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo))
.let { if (it < 2) 2 else it }
.let { IntRange(it, lastVersion - 1) }
.filter { (it % snapshotCacheConfig.modulo) != 1 }
.toRanges()
.map {
jedis
.zremrangeByScore(
projectionClass.redisKey(aggregateId),
it.first.toDouble(),
it.last.toDouble(),
).also { removedCount ->
if (removedCount > 0) {
logger.debug {
"$removedCount snapshot removed Modulo(${snapshotCacheConfig.modulo}) (${it.first} to ${it.last}) [lastVersion=$lastVersion]"
}
}
}
}
}
private fun removeTheHeadBySize(
aggregateId: AggregateId,
lastVersion: Int,
) {
(lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo))
.toDouble()
.let {
jedis
.zremrangeByScore(
projectionClass.redisKey(aggregateId),
2.0,
it,
).also { removedCount ->
if (removedCount > 0) {
logger.info {
"$removedCount snapshot removed Size(${snapshotCacheConfig.maxSnapshotCacheSize}) (1.0 to $it) [lastVersion=$lastVersion]"
}
}
}
}
}
}
val <P : Projection<*>> KClass<P>.redisKeySearchList: String get() {
return "projection:$simpleName:*"
}
val <P : Projection<*>> P.redisKey: String get() {
return "projection:${this::class.simpleName}:${aggregateId.id}"
}
fun <P : Projection<*>, A : AggregateId> KClass<P>.redisKey(aggregateId: A): String =
"projection:$simpleName:${aggregateId.id}"
val <P : Projection<*>> KClass<P>.redisKey: String get() =
"projection:$simpleName"

View File

@@ -1,26 +0,0 @@
package eventDemo.libs.event.projection
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
data class SnapshotConfig(
/**
* Keep snapshot when is on the head of the queue cache
*/
val maxSnapshotCacheSize: Int = 20,
/**
* Keep snapshot when is newer of
*
* snapshot.date > now + maxSnapshotCacheTtl
*/
val maxSnapshotCacheTtl: Duration = 10.minutes,
/**
* Keep snapshot when version is this modulo
*
* snapshot.lastVersion % modulo == 1
*/
val modulo: Int = 10,
val enabled: Boolean = true,
)
val DISABLED_CONFIG = SnapshotConfig(Int.MAX_VALUE, Duration.INFINITE, Int.MAX_VALUE, enabled = false)

View File

@@ -6,11 +6,17 @@ import eventDemo.business.entity.Deck
import eventDemo.configuration.business.configureGameListener import eventDemo.configuration.business.configureGameListener
import eventDemo.configuration.injection.appKoinModule import eventDemo.configuration.injection.appKoinModule
import eventDemo.configuration.ktor.configuration import eventDemo.configuration.ktor.configuration
import io.github.oshai.kotlinlogging.KotlinLogging
import io.kotest.engine.runBlocking
import io.ktor.server.config.ApplicationConfig import io.ktor.server.config.ApplicationConfig
import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.testing.ApplicationTestBuilder
import io.ktor.server.testing.testApplication import io.ktor.server.testing.testApplication
import io.ktor.utils.io.KtorDsl import io.ktor.utils.io.KtorDsl
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import org.koin.core.Koin import org.koin.core.Koin
import org.koin.core.context.startKoin
import org.koin.core.module.KoinApplicationDslMarker import org.koin.core.module.KoinApplicationDslMarker
import org.koin.dsl.koinApplication import org.koin.dsl.koinApplication
import org.koin.ktor.ext.getKoin import org.koin.ktor.ext.getKoin
@@ -39,6 +45,7 @@ fun testApplicationWithConfig(
configBuilder: Koin.() -> Unit = {}, configBuilder: Koin.() -> Unit = {},
block: suspend ApplicationTestBuilder.() -> Unit, block: suspend ApplicationTestBuilder.() -> Unit,
) { ) {
val logger = KotlinLogging.logger {}
testApplication { testApplication {
val conf = ApplicationConfig("application.conf") val conf = ApplicationConfig("application.conf")
environment { environment {
@@ -46,11 +53,19 @@ fun testApplicationWithConfig(
} }
application { application {
logger.info { "Config App" }
val koin = getKoin() val koin = getKoin()
koin.cleanDataTest() koin.cleanDataTest()
configBuilder(koin) runCatching {
logger.info { "Starting A" }
configBuilder(koin)
logger.info { "A finish" }
}
} }
block() logger.info { "Starting B" }
this@testApplication.block()
logger.info { "B finish" }
} }
} }

View File

@@ -8,6 +8,7 @@ import eventDemo.business.event.event.NewPlayerEvent
import eventDemo.business.event.event.PlayerReadyEvent import eventDemo.business.event.event.PlayerReadyEvent
import eventDemo.business.event.projection.GameList import eventDemo.business.event.projection.GameList
import eventDemo.testApplicationWithConfig import eventDemo.testApplicationWithConfig
import io.github.oshai.kotlinlogging.KotlinLogging
import io.kotest.assertions.nondeterministic.eventually import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.collections.shouldContain
@@ -19,18 +20,20 @@ import io.ktor.client.request.get
import io.ktor.client.statement.bodyAsText import io.ktor.client.statement.bodyAsText
import io.ktor.http.ContentType import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertTrue import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
val logger = KotlinLogging.logger {}
class GameListRouteTest : class GameListRouteTest :
FunSpec({ FunSpec({
test("/games with no game started") { test("/games with no game started") {
testApplicationWithConfig { testApplicationWithConfig {
val player1 = Player(name = "Nikola") val player1 = Player(name = "Nikola")
logger.info { "Starting player1" }
httpClient() httpClient()
.get("/games") { .get("/games") {
withAuth(player1) withAuth(player1)
@@ -48,16 +51,14 @@ class GameListRouteTest :
val player1 = Player(name = "Nikola") val player1 = Player(name = "Nikola")
testApplicationWithConfig( testApplicationWithConfig(
{ {
runBlocking { get<GameEventHandler>()
get<GameEventHandler>() .handle(gameId) {
.handle(gameId) { NewPlayerEvent(gameId, player1, it)
NewPlayerEvent(gameId, player1, it) }
}
}
}, },
) { ) {
// Wait until the projection is created // Wait until the projection is created
eventually(1.seconds) { eventually(3.seconds) {
httpClient() httpClient()
.get("/games") { .get("/games") {
withAuth(player1) withAuth(player1)
@@ -81,7 +82,6 @@ class GameListRouteTest :
val player2 = Player(name = "Einstein") val player2 = Player(name = "Einstein")
testApplicationWithConfig({ testApplicationWithConfig({
val eventHandler = get<GameEventHandler>() val eventHandler = get<GameEventHandler>()
runBlocking {
eventHandler.handle(gameId) { NewPlayerEvent(gameId, player1, it) } eventHandler.handle(gameId) { NewPlayerEvent(gameId, player1, it) }
eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) }
@@ -93,7 +93,6 @@ class GameListRouteTest :
it, it,
shuffleIsDisabled = true, shuffleIsDisabled = true,
) )
}
} }
}) { }) {
eventually(1.seconds) { eventually(1.seconds) {

View File

@@ -1,7 +1,6 @@
package eventDemo.adapter.interfaceLayer.query package eventDemo.adapter.interfaceLayer.query
import eventDemo.Tag import eventDemo.Tag
import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInMemory
import eventDemo.business.command.GameCommandHandler import eventDemo.business.command.GameCommandHandler
import eventDemo.business.command.command.GameCommand import eventDemo.business.command.command.GameCommand
import eventDemo.business.command.command.IWantToJoinTheGameCommand import eventDemo.business.command.command.IWantToJoinTheGameCommand
@@ -10,9 +9,9 @@ import eventDemo.business.command.command.IamReadyToPlayCommand
import eventDemo.business.entity.Card import eventDemo.business.entity.Card
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player import eventDemo.business.entity.Player
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.event.disableShuffleDeck import eventDemo.business.event.event.disableShuffleDeck
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository
import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener
import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.CommandSuccessNotification
import eventDemo.business.notification.ItsTheTurnOfNotification import eventDemo.business.notification.ItsTheTurnOfNotification
@@ -61,9 +60,9 @@ class GameSimulationTest :
var player1HasJoin = false var player1HasJoin = false
testKoinApplicationWithConfig { testKoinApplicationWithConfig {
val commandHandler by inject<GameCommandHandler>() val commandHandler = get<GameCommandHandler>()
val eventStore by inject<GameEventStore>() val playerNotificationListener = get<PlayerNotificationListener>()
val playerNotificationListener by inject<PlayerNotificationListener>() val gameStateRepository = get<GameStateRepository>()
// Run command handler // Run command handler
// In the normal process, these handlers is invoque players connect to the websocket // In the normal process, these handlers is invoque players connect to the websocket
@@ -76,8 +75,8 @@ class GameSimulationTest :
} }
} }
// Consume etch notification of players, and put theses in list. // Consume etch notification of players, and put theses in a list.
// is used later to control when other players can be executing the next action // Is used later to control when other players can execute the next action
val player1Notifications = mutableListOf<Notification>() val player1Notifications = mutableListOf<Notification>()
val player2Notifications = mutableListOf<Notification>() val player2Notifications = mutableListOf<Notification>()
run { run {
@@ -94,7 +93,7 @@ class GameSimulationTest :
} }
} }
// The player 1 actions // Player 1 actions
val player1Job = val player1Job =
launch { launch {
playerNotificationListener.startListening(player1, gameId) { playerNotificationListener.startListening(player1, gameId) {
@@ -132,11 +131,11 @@ class GameSimulationTest :
} }
} }
// The player 2 actions // Player 2 actions
val player2Job = val player2Job =
launch { launch {
// wait the player 1 has join the game // wait player 1 has joined the game
until(1.seconds) { player1HasJoin } until(3.seconds) { player1HasJoin }
playerNotificationListener.startListening(player2, gameId) { playerNotificationListener.startListening(player2, gameId) {
channelNotification2.trySendBlocking(it) channelNotification2.trySendBlocking(it)
@@ -176,7 +175,7 @@ class GameSimulationTest :
joinAll(player1Job, player2Job) joinAll(player1Job, player2Job)
// Build the last state from the event store // Build the last state from the event store
val state = GameStateRepositoryInMemory(eventStore = eventStore).getLast(gameId) val state = gameStateRepository.get(gameId)
// Check if the state is correct // Check if the state is correct
state.aggregateId shouldBeEqual gameId state.aggregateId shouldBeEqual gameId
@@ -192,6 +191,6 @@ class GameSimulationTest :
}) })
private suspend inline fun <reified T : Notification> MutableList<Notification>.waitNotification(crossinline block: T.() -> Boolean): T = private suspend inline fun <reified T : Notification> MutableList<Notification>.waitNotification(crossinline block: T.() -> Boolean): T =
eventually(1.seconds) { eventually(3.seconds) {
filterIsInstance<T>().first { block(it) } filterIsInstance<T>().first { block(it) }
} }

View File

@@ -11,7 +11,9 @@ import eventDemo.business.event.event.disableShuffleDeck
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.GameStateRepository
import eventDemo.testApplicationWithConfig import eventDemo.testApplicationWithConfig
import io.github.oshai.kotlinlogging.KotlinLogging
import io.kotest.assertions.nondeterministic.eventually import io.kotest.assertions.nondeterministic.eventually
import io.kotest.assertions.nondeterministic.until
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.equals.shouldBeEqual
@@ -53,7 +55,6 @@ class GameStateRouteTest :
val gameId = GameId() val gameId = GameId()
val player1 = Player(name = "Nikola") val player1 = Player(name = "Nikola")
val player2 = Player(name = "Einstein") val player2 = Player(name = "Einstein")
var lastPlayedCard: Card? = null
testApplicationWithConfig({ testApplicationWithConfig({
disableShuffleDeck() disableShuffleDeck()
val eventHandler = get<GameEventHandler>() val eventHandler = get<GameEventHandler>()
@@ -64,9 +65,8 @@ class GameStateRouteTest :
eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) }
lastPlayedCard = eventually { stateRepo.getLast(gameId).playableCards(player1).first() } val lastPlayedCard = eventually(3.seconds) { stateRepo.get(gameId).playableCards(player1).first() }
assertNotNull(lastPlayedCard) assertIs<Card.NumericCard>(lastPlayedCard)
.let { assertIs<Card.NumericCard>(lastPlayedCard) }
.let { .let {
it.number shouldBeEqual 0 it.number shouldBeEqual 0
it.color shouldBeEqual Card.Color.Red it.color shouldBeEqual Card.Color.Red
@@ -74,32 +74,36 @@ class GameStateRouteTest :
eventHandler.handle(gameId) { eventHandler.handle(gameId) {
CardIsPlayedEvent( CardIsPlayedEvent(
gameId, gameId,
assertNotNull(lastPlayedCard), lastPlayedCard,
player1, player1,
it, it,
) )
} }
until(3.seconds) {
stateRepo
.get(gameId)
.deck.discard
.last() == lastPlayedCard
}
} }
}) { }) {
eventually(1.seconds) { httpClient()
httpClient() .get("/games/$gameId/state") {
.get("/games/$gameId/state") { withAuth(player1)
withAuth(player1) accept(ContentType.Application.Json)
accept(ContentType.Application.Json) }.apply {
}.apply { assertEquals(HttpStatusCode.OK, status, message = bodyAsText())
assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) call.body<GameState>().apply {
call.body<GameState>().apply { aggregateId shouldBeEqual gameId
aggregateId shouldBeEqual gameId players shouldHaveSize 2
players shouldHaveSize 2 isStarted shouldBeEqual true
isStarted shouldBeEqual true assertIs<CardIsPlayedEvent>(lastEvent)
assertIs<CardIsPlayedEvent>(lastEvent) readyPlayers shouldBeEqual setOf(player1, player2)
readyPlayers shouldBeEqual setOf(player1, player2) direction shouldBeEqual GameState.Direction.CLOCKWISE
direction shouldBeEqual GameState.Direction.CLOCKWISE assertNotNull(lastCardPlayer) shouldBeEqual player1
assertNotNull(lastCardPlayer) shouldBeEqual player1 assertNotNull(colorOnCurrentStack) shouldBeEqual Card.Color.Red
assertNotNull(colorOnCurrentStack) shouldBeEqual Card.Color.Red
}
} }
} }
} }
} }
@@ -118,9 +122,8 @@ class GameStateRouteTest :
eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) }
lastPlayedCard = eventually { stateRepo.getLast(gameId).playableCards(player1).first() } lastPlayedCard = eventually(3.seconds) { stateRepo.get(gameId).playableCards(player1).first() }
assertNotNull(lastPlayedCard) assertIs<Card.NumericCard>(lastPlayedCard)
.let { assertIs<Card.NumericCard>(lastPlayedCard) }
.let { .let {
it.number shouldBeEqual 0 it.number shouldBeEqual 0
it.color shouldBeEqual Card.Color.Red it.color shouldBeEqual Card.Color.Red
@@ -133,18 +136,23 @@ class GameStateRouteTest :
it, it,
) )
} }
until(3.seconds) {
stateRepo
.get(gameId)
.deck.discard
.last() == lastPlayedCard
}
} }
}) { }) {
eventually(1.seconds) { httpClient()
httpClient() .get("/games/$gameId/card/last") {
.get("/games/$gameId/card/last") { withAuth(player1)
withAuth(player1) accept(ContentType.Application.Json)
accept(ContentType.Application.Json) }.apply {
}.apply { assertEquals(HttpStatusCode.OK, status, message = bodyAsText())
assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) assertEquals(assertNotNull(lastPlayedCard), call.body<Card>())
assertEquals(assertNotNull(lastPlayedCard), call.body<Card>()) }
}
}
} }
} }
}) })

View File

@@ -7,6 +7,8 @@ import eventDemo.business.entity.Player
import eventDemo.business.event.event.GameEvent import eventDemo.business.event.event.GameEvent
import eventDemo.business.event.event.NewPlayerEvent import eventDemo.business.event.event.NewPlayerEvent
import eventDemo.libs.event.VersionBuilderLocal import eventDemo.libs.event.VersionBuilderLocal
import eventDemo.testApplicationWithConfig
import io.kotest.assertions.nondeterministic.until
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.equals.shouldBeEqual
@@ -16,6 +18,7 @@ import io.mockk.spyk
import io.mockk.verify import io.mockk.verify
import kotlin.test.assertIs import kotlin.test.assertIs
import kotlin.test.assertNotNull import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.seconds
class GameEventHandlerTest : class GameEventHandlerTest :
FunSpec({ FunSpec({

View File

@@ -2,7 +2,6 @@ package eventDemo.business.event.projection
import ch.qos.logback.classic.Level import ch.qos.logback.classic.Level
import com.rabbitmq.client.impl.ForgivingExceptionHandler import com.rabbitmq.client.impl.ForgivingExceptionHandler
import com.zaxxer.hikari.pool.ProxyConnection
import eventDemo.Tag import eventDemo.Tag
import eventDemo.business.command.GameCommandHandler import eventDemo.business.command.GameCommandHandler
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
@@ -17,7 +16,6 @@ import io.kotest.common.KotestInternal
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.equals.shouldBeEqual
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll import kotlinx.coroutines.joinAll
@@ -41,13 +39,10 @@ class GameStateRepositoryTest :
val eventHandler = get<GameEventHandler>() val eventHandler = get<GameEventHandler>()
eventHandler eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) }
.also { event -> .also {
// Wait until the projection is created // Wait until the projection is created
eventually(1.seconds) { eventually(1.seconds) {
assertNotNull(repo.getUntil(event)).also { assertNotNull(repo.get(aggregateId)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1)
}
assertNotNull(repo.getLast(aggregateId)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1) assertNotNull(it.players) shouldBeEqual setOf(player1)
} }
} }
@@ -68,9 +63,9 @@ class GameStateRepositoryTest :
var state: GameState? = null var state: GameState? = null
projectionBus.subscribe { projectionBus.subscribe {
repo.getLast(aggregateId).also { repo
state = it .get(aggregateId)
} .also { state = it }
} }
eventHandler eventHandler
@@ -86,7 +81,7 @@ class GameStateRepositoryTest :
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) }
.also { .also {
eventually(1.seconds) { eventually(1.seconds) {
assertNotNull(repo.getLast(aggregateId)).also { assertNotNull(repo.get(aggregateId)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1, player2) assertNotNull(it.players) shouldBeEqual setOf(player1, player2)
} }
} }
@@ -95,44 +90,7 @@ class GameStateRepositoryTest :
} }
} }
test("getUntil should build the state until the event") { test("get should be concurrently secure").config(tags = setOf(Tag.Concurrence)) {
withLogLevel(
GameCommandHandler::class.java.name to Level.ERROR,
ForgivingExceptionHandler::class.java.name to Level.OFF,
ProxyConnection::class.java.name to Level.OFF,
Logger.ROOT_LOGGER_NAME to Level.INFO,
) {
repeat(10) {
val aggregateId = GameId()
testKoinApplicationWithConfig {
val repo = get<GameStateRepository>()
val eventHandler = get<GameEventHandler>()
val event1 =
eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) }
.also { event1 ->
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1)
}
}
eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) }
.also { event2 ->
assertNotNull(repo.getUntil(event2)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1, player2)
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1)
}
}
}
}
}
}
test("getUntil should be concurrently secure").config(tags = setOf(Tag.Concurrence)) {
withLogLevel( withLogLevel(
Logger.ROOT_LOGGER_NAME to Level.ERROR, Logger.ROOT_LOGGER_NAME to Level.ERROR,
ForgivingExceptionHandler::class.java.name to Level.OFF, ForgivingExceptionHandler::class.java.name to Level.OFF,
@@ -167,17 +125,12 @@ class GameStateRepositoryTest :
includeFirst = false includeFirst = false
}, },
) { ) {
repo.getLast(aggregateId).run { repo.get(aggregateId).run {
lastEventVersion shouldBeEqual 200 lastEventVersion shouldBeEqual 200
players shouldHaveSize 200 players shouldHaveSize 200
} }
repo.count(aggregateId) shouldBe 21
} }
} }
} }
} }
xtest("get should be concurrently secure") {
tags(Tag.Concurrence)
}
}) })

View File

@@ -0,0 +1,38 @@
package eventDemo.libs.event
import eventDemo.libs.bus.Bus
import eventDemo.libs.bus.BusInMemory
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.equals.shouldBeEqual
class EventHandlerTest :
FunSpec({
test("EventHandler::handle should returns the built event") {
val eventBus: Bus<EventXTest> = BusInMemory()
val eventStore: EventStore<EventXTest, IdTest> = EventStoreInMemory()
val versionBuilder: VersionBuilder = VersionBuilderLocal()
val aggregateId: IdTest = IdTest()
val handler =
EventHandlerImpl(
eventBus,
eventStore,
versionBuilder,
)
// When
val event =
handler.handle(aggregateId) {
EventXTest(aggregateId = aggregateId, version = it, num = 1)
}
// Then
event.aggregateId shouldBeEqual aggregateId
event.version shouldBeEqual 1
}
xtest("EventHandler::handle should publish the event into the store")
xtest("EventHandler::handle should publish the event into the bus")
xtest("EventHandler::handle should publish the event into the bus in incremental order")
})

View File

@@ -1,4 +1,4 @@
package eventDemo.business.event.projection package eventDemo.libs.event.projection
import eventDemo.cleanProjections import eventDemo.cleanProjections
import eventDemo.configuration.serializer.UUIDSerializer import eventDemo.configuration.serializer.UUIDSerializer
@@ -7,12 +7,6 @@ import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStoreInMemory import eventDemo.libs.event.EventStoreInMemory
import eventDemo.libs.event.VersionBuilderLocal import eventDemo.libs.event.VersionBuilderLocal
import eventDemo.libs.event.projection.DISABLED_CONFIG
import eventDemo.libs.event.projection.Projection
import eventDemo.libs.event.projection.ProjectionSnapshotRepository
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.kotest.assertions.nondeterministic.continually import io.kotest.assertions.nondeterministic.continually
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.datatest.withData import io.kotest.datatest.withData
@@ -22,6 +16,7 @@ import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.datetime.Clock import kotlinx.datetime.Clock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
@@ -34,14 +29,14 @@ import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
@OptIn(DelicateCoroutinesApi::class) @OptIn(DelicateCoroutinesApi::class)
class ProjectionSnapshotRepositoryTest : class ProjectionRepositoryTest :
FunSpec({ FunSpec({
data class TestData( data class TestData(
val store: EventStore<TestEvents, IdTest>, val store: EventStore<TestEvents, IdTest>,
val snapshotRepo: ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest>, val repository: ProjectionRepository<TestEvents, ProjectionTest, IdTest>,
) : WithDataTestName { ) : WithDataTestName {
override fun dataTestName(): String = override fun dataTestName(): String =
"${snapshotRepo::class.simpleName} with ${store::class.simpleName}" "${repository::class.simpleName} with ${store::class.simpleName}"
} }
val eventStores = val eventStores =
@@ -50,48 +45,40 @@ class ProjectionSnapshotRepositoryTest :
) )
val projectionRepo = val projectionRepo =
listOf( listOf(
::getSnapshotRepoInMemoryTest, ::getRepoInMemoryTest,
::getSnapshotRepoInRedisTest, ::getRepoInRedisTest,
) )
val list = val list =
eventStores.flatMap { store -> eventStores.flatMap { store ->
projectionRepo.map { repo -> projectionRepo.map { repo ->
store().let { store -> TestData(store, repo(store, DISABLED_CONFIG)) } TestData(store(), repo())
} }
} }
context("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { context("when call applyAndSave, the projection should be built and save to the repository") {
withData(list) { (eventStore, repo) -> withData(list) { (eventStore, repo) ->
val aggregateId = IdTest() val aggregateId = IdTest()
val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest())
eventStore.publish(eventOther) // eventStore.publish(eventOther)
repo.applyAndPutToCache(eventOther) val p = repo.applyAndSave(eventOther)
assertNotNull(repo.getUntil(eventOther)).also { println(p)
assertNotNull(repo.get(eventOther.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "valOther" assertNotNull(it.value) shouldBeEqual "valOther"
} }
val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId)
eventStore.publish(event1) // eventStore.publish(event1)
repo.applyAndPutToCache(event1) repo.applyAndSave(event1)
assertNotNull(repo.getLast(event1.aggregateId)).also { assertNotNull(repo.get(event1.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.value) shouldBeEqual "val1" assertNotNull(it.value) shouldBeEqual "val1"
} }
val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId)
eventStore.publish(event2) // eventStore.publish(event2)
repo.applyAndPutToCache(event2) repo.applyAndSave(event2)
assertNotNull(repo.getLast(event2.aggregateId)).also { assertNotNull(repo.get(event2.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1val2"
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
assertNotNull(repo.getUntil(event2)).also {
assertNotNull(it.value) shouldBeEqual "val1val2" assertNotNull(it.value) shouldBeEqual "val1val2"
} }
} }
@@ -104,18 +91,18 @@ class ProjectionSnapshotRepositoryTest :
val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = otherAggregateId) val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = otherAggregateId)
eventStore.publish(eventOther) eventStore.publish(eventOther)
repo.applyAndPutToCache(eventOther) repo.applyAndSave(eventOther)
assertNotNull(repo.getUntil(eventOther)).also { assertNotNull(repo.get(eventOther.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "valOther" assertNotNull(it.value) shouldBeEqual "valOther"
} }
val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId)
eventStore.publish(event1) eventStore.publish(event1)
repo.applyAndPutToCache(event1) repo.applyAndSave(event1)
val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId)
eventStore.publish(event2) eventStore.publish(event2)
repo.applyAndPutToCache(event2) repo.applyAndSave(event2)
repo.getList().apply { repo.getList().apply {
any { it.aggregateId == otherAggregateId } shouldBeEqual true any { it.aggregateId == otherAggregateId } shouldBeEqual true
@@ -128,7 +115,7 @@ class ProjectionSnapshotRepositoryTest :
} }
} }
context("ProjectionSnapshotRepository should be thread safe") { context("ProjectionRepository should be thread safe") {
continually(1.seconds) { continually(1.seconds) {
withData(list) { (eventStore, repo) -> withData(list) { (eventStore, repo) ->
val aggregateId = IdTest() val aggregateId = IdTest()
@@ -137,43 +124,24 @@ class ProjectionSnapshotRepositoryTest :
(0..9) (0..9)
.map { .map {
GlobalScope.launch { GlobalScope.launch {
(1..10).forEach { repeat(10) {
val eventX = lock.withLock {
lock.withLock { runBlocking {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) EventXTest(
.also { eventStore.publish(it) } num = 1,
version = versionBuilder.buildNextVersion(aggregateId),
aggregateId = aggregateId,
).also { repo.applyAndSave(it) }
} }
repo.applyAndPutToCache(eventX) }
} }
} }
}.joinAll() }.joinAll()
assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 assertNotNull(repo.get(aggregateId)).lastEventVersion shouldBeEqual 100
assertNotNull(repo.count(aggregateId)) shouldBeEqual 100 assertNotNull(repo.get(aggregateId)).num shouldBeEqual 100
} }
} }
} }
context("removeOldSnapshot") {
withData(list) { (eventStore, repo) ->
val versionBuilder = VersionBuilderLocal()
val aggregateId = IdTest()
suspend fun buildEndSendEventX() {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)
.also { eventStore.publish(it) }
.also { repo.applyAndPutToCache(it) }
}
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 1
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 2
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 3
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 4
}
}
}) })
@JvmInline @JvmInline
@@ -217,28 +185,18 @@ private data class EventXTest(
val num: Int, val num: Int,
) : TestEvents ) : TestEvents
private fun getSnapshotRepoInMemoryTest( private fun getRepoInMemoryTest(): ProjectionRepository<TestEvents, ProjectionTest, IdTest> =
eventStore: EventStore<TestEvents, IdTest>, ProjectionRepositoryInMemory(
snapshotConfig: SnapshotConfig,
): ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest> =
ProjectionSnapshotRepositoryInMemory(
eventStore = eventStore,
initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) },
snapshotCacheConfig = snapshotConfig,
applyToProjection = apply, applyToProjection = apply,
) )
private fun getSnapshotRepoInRedisTest( private fun getRepoInRedisTest(): ProjectionRepository<TestEvents, ProjectionTest, IdTest> {
eventStore: EventStore<TestEvents, IdTest>,
snapshotConfig: SnapshotConfig,
): ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest> {
val jedis = JedisPooled("redis://localhost:6379") val jedis = JedisPooled("redis://localhost:6379")
jedis.cleanProjections() jedis.cleanProjections()
return ProjectionSnapshotRepositoryInRedis( return ProjectionRepositoryInRedis(
eventStore = eventStore,
jedis = jedis, jedis = jedis,
initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) },
snapshotCacheConfig = snapshotConfig,
projectionClass = ProjectionTest::class, projectionClass = ProjectionTest::class,
projectionToJson = { Json.encodeToString(it) }, projectionToJson = { Json.encodeToString(it) },
jsonToProjection = { Json.decodeFromString(it) }, jsonToProjection = { Json.decodeFromString(it) },