feat: remove snapshot on ProjectionRepository
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Tests / build (push) Has started running

This commit is contained in:
2025-04-17 01:24:25 +02:00
parent b6e8a2f347
commit 70be95e7ee
36 changed files with 478 additions and 884 deletions

View File

@@ -2,28 +2,20 @@ package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.projection.GameList import eventDemo.business.event.projection.GameList
import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameListRepository
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.apply import eventDemo.business.event.projection.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.libs.event.projection.ProjectionRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
/** /**
* Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus]. * Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus].
*/ */
class GameListRepositoryInMemory( class GameListRepositoryInMemory : GameListRepository {
eventStore: GameEventStore, private val projectionsRepository =
snapshotConfig: SnapshotConfig = SnapshotConfig(), ProjectionRepositoryInMemory(
) : GameListRepository {
private val projectionsSnapshot =
ProjectionSnapshotRepositoryInMemory(
name = GameListRepositoryInMemory::class,
eventStore = eventStore,
snapshotCacheConfig = snapshotConfig,
applyToProjection = GameList::apply, applyToProjection = GameList::apply,
initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) },
) )
@@ -32,11 +24,11 @@ class GameListRepositoryInMemory(
projectionBus: GameProjectionBus, projectionBus: GameProjectionBus,
eventBus: GameEventBus, eventBus: GameEventBus,
) { ) {
// On new event was received, build snapshot and publish it to the projection bus // On new event was received, build projection and publish it to the projection bus
eventBus.subscribe { event -> eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {
projectionsSnapshot projectionsRepository
.applyAndPutToCache(event) .applyAndSave(event)
.also { projectionBus.publish(it) } .also { projectionBus.publish(it) }
} }
} }
@@ -48,5 +40,5 @@ class GameListRepositoryInMemory(
* It fetches it from the local cache if possible, otherwise it builds it. * It fetches it from the local cache if possible, otherwise it builds it.
*/ */
override fun getList(): List<GameList> = override fun getList(): List<GameList> =
projectionsSnapshot.getList() projectionsRepository.getList()
} }

View File

@@ -2,14 +2,12 @@ package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.projection.GameList import eventDemo.business.event.projection.GameList
import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameListRepository
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.apply import eventDemo.business.event.projection.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis import eventDemo.libs.event.projection.ProjectionRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import redis.clients.jedis.UnifiedJedis import redis.clients.jedis.UnifiedJedis
@@ -18,14 +16,10 @@ import redis.clients.jedis.UnifiedJedis
* Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus]. * Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus].
*/ */
class GameListRepositoryInRedis( class GameListRepositoryInRedis(
eventStore: GameEventStore,
jedis: UnifiedJedis, jedis: UnifiedJedis,
snapshotConfig: SnapshotConfig = SnapshotConfig(),
) : GameListRepository { ) : GameListRepository {
private val projectionsSnapshot = private val projectionsRepository =
ProjectionSnapshotRepositoryInRedis( ProjectionRepositoryInRedis(
eventStore = eventStore,
snapshotCacheConfig = snapshotConfig,
initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) },
projectionClass = GameList::class, projectionClass = GameList::class,
projectionToJson = { Json.encodeToString(GameList.serializer(), it) }, projectionToJson = { Json.encodeToString(GameList.serializer(), it) },
@@ -40,8 +34,8 @@ class GameListRepositoryInRedis(
) { ) {
eventBus.subscribe { event -> eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {
projectionsSnapshot projectionsRepository
.applyAndPutToCache(event) .applyAndSave(event)
.also { projectionBus.publish(it) } .also { projectionBus.publish(it) }
} }
} }
@@ -53,5 +47,5 @@ class GameListRepositoryInRedis(
* It fetches it from the local cache if possible, otherwise it builds it. * It fetches it from the local cache if possible, otherwise it builds it.
*/ */
override fun getList(): List<GameList> = override fun getList(): List<GameList> =
projectionsSnapshot.getList() projectionsRepository.getList()
} }

View File

@@ -2,28 +2,19 @@ package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.event.GameEvent
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.GameStateRepository
import eventDemo.business.event.projection.apply import eventDemo.business.event.projection.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.libs.event.projection.ProjectionRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
/** /**
* Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus]. * Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus].
*/ */
class GameStateRepositoryInMemory( class GameStateRepositoryInMemory : GameStateRepository {
eventStore: GameEventStore, private val projectionsRepository =
snapshotConfig: SnapshotConfig = SnapshotConfig(), ProjectionRepositoryInMemory(
) : GameStateRepository {
private val projectionsSnapshot =
ProjectionSnapshotRepositoryInMemory(
name = GameStateRepositoryInMemory::class,
eventStore = eventStore,
snapshotCacheConfig = snapshotConfig,
applyToProjection = GameState::apply, applyToProjection = GameState::apply,
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },
) )
@@ -32,33 +23,19 @@ class GameStateRepositoryInMemory(
projectionBus: GameProjectionBus, projectionBus: GameProjectionBus,
eventBus: GameEventBus, eventBus: GameEventBus,
) { ) {
// On new event was received, build snapshot and publish it to the projection bus // On new event was received, build projection and publish it to the projection bus
eventBus.subscribe { event -> eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {
projectionsSnapshot projectionsRepository
.applyAndPutToCache(event) .applyAndSave(event)
.also { projectionBus.publish(it) } .also { projectionBus.publish(it) }
} }
} }
} }
/** /**
* Get the last version of the [GameState] from the all eventStream. * Get the [GameState].
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/ */
override fun getLast(gameId: GameId): GameState = override fun get(gameId: GameId): GameState =
projectionsSnapshot.getLast(gameId) projectionsRepository.get(gameId)
/**
* Get the [GameState] to the specific [event][GameEvent].
* It does not contain the [events][GameEvent] it after this one.
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/
override fun getUntil(event: GameEvent): GameState =
projectionsSnapshot.getUntil(event)
override fun count(gameId: GameId): Int =
projectionsSnapshot.count(gameId)
} }

View File

@@ -2,14 +2,11 @@ package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.event.GameEvent
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.GameStateRepository
import eventDemo.business.event.projection.apply import eventDemo.business.event.projection.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis import eventDemo.libs.event.projection.ProjectionRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import redis.clients.jedis.UnifiedJedis import redis.clients.jedis.UnifiedJedis
@@ -18,14 +15,10 @@ import redis.clients.jedis.UnifiedJedis
* Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus]. * Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus].
*/ */
class GameStateRepositoryInRedis( class GameStateRepositoryInRedis(
eventStore: GameEventStore,
jedis: UnifiedJedis, jedis: UnifiedJedis,
snapshotConfig: SnapshotConfig = SnapshotConfig(),
) : GameStateRepository { ) : GameStateRepository {
private val projectionsSnapshot = private val projectionsRepository =
ProjectionSnapshotRepositoryInRedis( ProjectionRepositoryInRedis(
eventStore = eventStore,
snapshotCacheConfig = snapshotConfig,
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },
projectionClass = GameState::class, projectionClass = GameState::class,
projectionToJson = { Json.encodeToString(GameState.serializer(), it) }, projectionToJson = { Json.encodeToString(GameState.serializer(), it) },
@@ -38,33 +31,19 @@ class GameStateRepositoryInRedis(
projectionBus: GameProjectionBus, projectionBus: GameProjectionBus,
eventBus: GameEventBus, eventBus: GameEventBus,
) { ) {
// On new event was received, build snapshot and publish it to the projection bus // On new event was received, build projection and publish it to the projection bus
eventBus.subscribe { event -> eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {
projectionsSnapshot projectionsRepository
.applyAndPutToCache(event) .applyAndSave(event)
.also { projectionBus.publish(it) } .also { projectionBus.publish(it) }
} }
} }
} }
/** /**
* Get the last version of the [GameState] from the all eventStream. * Get the [GameState].
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/ */
override fun getLast(gameId: GameId): GameState = override fun get(gameId: GameId): GameState =
projectionsSnapshot.getLast(gameId) projectionsRepository.get(gameId)
/**
* Get the [GameState] to the specific [event][GameEvent].
* It does not contain the [events][GameEvent] it after this one.
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/
override fun getUntil(event: GameEvent): GameState =
projectionsSnapshot.getUntil(event)
override fun count(gameId: GameId): Int =
projectionsSnapshot.count(gameId)
} }

View File

@@ -38,7 +38,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) {
// Read the last played card on the game. // Read the last played card on the game.
get<Game.Card> { body -> get<Game.Card> { body ->
gameStateRepository gameStateRepository
.getLast(body.game.id) .get(body.game.id)
.cardOnCurrentStack .cardOnCurrentStack
?.let { call.respond(it) } ?.let { call.respond(it) }
?: call.response.status(HttpStatusCode.BadRequest) ?: call.response.status(HttpStatusCode.BadRequest)
@@ -46,7 +46,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) {
// Read the last played card on the game. // Read the last played card on the game.
get<Game.State> { body -> get<Game.State> { body ->
val state = gameStateRepository.getLast(body.game.id) val state = gameStateRepository.get(body.game.id)
call.respond(state) call.respond(state)
} }
} }

View File

@@ -86,12 +86,12 @@ class GameCommandHandler(
* If the command fail, send an [error notification][CommandErrorNotification], * If the command fail, send an [error notification][CommandErrorNotification],
* if success, send a [success notification][CommandSuccessNotification] * if success, send a [success notification][CommandSuccessNotification]
*/ */
suspend fun handle( fun handle(
player: Player, player: Player,
gameId: GameId, gameId: GameId,
command: GameCommand, command: GameCommand,
sendSuccess: suspend () -> Unit, sendSuccess: () -> Unit,
sendError: suspend (message: String) -> Unit, sendError: (message: String) -> Unit,
) { ) {
if (command.payload.aggregateId.id != gameId.id) { if (command.payload.aggregateId.id != gameId.id) {
logger.warn { "Handle command Refuse, the gameId of the command is not the same" } logger.warn { "Handle command Refuse, the gameId of the command is not the same" }
@@ -114,26 +114,26 @@ class GameCommandHandler(
} }
} }
private fun SendChannel<Notification>.sendSuccess(command: GameCommand): suspend () -> Unit = private fun SendChannel<Notification>.sendSuccess(command: GameCommand): () -> Unit =
{ {
val logger = KotlinLogging.logger { } val logger = KotlinLogging.logger { }
CommandSuccessNotification(commandId = command.id) CommandSuccessNotification(commandId = command.id)
.also { notification -> .also { notification ->
withLoggingContext("notification" to notification.toString(), "commandId" to command.id.toString()) { withLoggingContext("notification" to notification.toString(), "commandId" to command.id.toString()) {
logger.debug { "Notification SUCCESS sent" } logger.debug { "Notification SUCCESS sent" }
send(notification) trySend(notification)
} }
} }
} }
private fun SendChannel<Notification>.sendError(command: GameCommand): suspend (message: String) -> Unit = private fun SendChannel<Notification>.sendError(command: GameCommand): (message: String) -> Unit =
{ {
val logger = KotlinLogging.logger { } val logger = KotlinLogging.logger { }
CommandErrorNotification(message = it, command = command) CommandErrorNotification(message = it, command = command)
.also { notification -> .also { notification ->
withLoggingContext("notification" to notification.toString(), "command" to command.toString()) { withLoggingContext("notification" to notification.toString(), "command" to command.toString()) {
logger.warn { "Notification ERROR sent: ${notification.message}" } logger.warn { "Notification ERROR sent: ${notification.message}" }
send(notification) trySend(notification)
} }
} }
} }

View File

@@ -12,7 +12,7 @@ data class ICantPlay(
private val gameStateRepository: GameStateRepository, private val gameStateRepository: GameStateRepository,
) : CommandAction<ICantPlayCommand, PlayerHavePassEvent> { ) : CommandAction<ICantPlayCommand, PlayerHavePassEvent> {
override fun run(command: ICantPlayCommand): (version: Int) -> PlayerHavePassEvent { override fun run(command: ICantPlayCommand): (version: Int) -> PlayerHavePassEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId) val state = gameStateRepository.get(command.payload.aggregateId)
if (state.currentPlayerTurn != command.payload.player) { if (state.currentPlayerTurn != command.payload.player) {
throw CommandException("Its not your turn!") throw CommandException("Its not your turn!")

View File

@@ -12,7 +12,7 @@ data class IWantToJoinTheGame(
private val gameStateRepository: GameStateRepository, private val gameStateRepository: GameStateRepository,
) : CommandAction<IWantToJoinTheGameCommand, NewPlayerEvent> { ) : CommandAction<IWantToJoinTheGameCommand, NewPlayerEvent> {
override fun run(command: IWantToJoinTheGameCommand): (version: Int) -> NewPlayerEvent { override fun run(command: IWantToJoinTheGameCommand): (version: Int) -> NewPlayerEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId) val state = gameStateRepository.get(command.payload.aggregateId)
if (!state.isStarted) { if (!state.isStarted) {
return { return {
NewPlayerEvent( NewPlayerEvent(

View File

@@ -12,7 +12,7 @@ data class IWantToPlayCard(
private val gameStateRepository: GameStateRepository, private val gameStateRepository: GameStateRepository,
) : CommandAction<IWantToPlayCardCommand, CardIsPlayedEvent> { ) : CommandAction<IWantToPlayCardCommand, CardIsPlayedEvent> {
override fun run(command: IWantToPlayCardCommand): (version: Int) -> CardIsPlayedEvent { override fun run(command: IWantToPlayCardCommand): (version: Int) -> CardIsPlayedEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId) val state = gameStateRepository.get(command.payload.aggregateId)
if (!state.isStarted) { if (!state.isStarted) {
throw CommandException("The game is Not started") throw CommandException("The game is Not started")

View File

@@ -13,7 +13,7 @@ class IamReadyToPlay(
) : CommandAction<IamReadyToPlayCommand, PlayerReadyEvent> { ) : CommandAction<IamReadyToPlayCommand, PlayerReadyEvent> {
@Throws(CommandException::class) @Throws(CommandException::class)
override fun run(command: IamReadyToPlayCommand): (version: Int) -> PlayerReadyEvent { override fun run(command: IamReadyToPlayCommand): (version: Int) -> PlayerReadyEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId) val state = gameStateRepository.get(command.payload.aggregateId)
val playerExist: Boolean = state.players.contains(command.payload.player) val playerExist: Boolean = state.players.contains(command.payload.player)
val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(command.payload.player) val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(command.payload.player)

View File

@@ -1,12 +1,7 @@
package eventDemo.business.event.projection package eventDemo.business.event.projection
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.event.event.GameEvent
interface GameStateRepository { interface GameStateRepository {
fun getLast(gameId: GameId): GameState fun get(gameId: GameId): GameState
fun getUntil(event: GameEvent): GameState
fun count(gameId: GameId): Int
} }

View File

@@ -37,7 +37,7 @@ class ReactionListener(
} }
} }
private suspend fun sendStartGameEvent(state: GameState) { private fun sendStartGameEvent(state: GameState) {
if (state.isReady && !state.isStarted) { if (state.isReady && !state.isStarted) {
val reactionEvent = val reactionEvent =
eventHandler.handle(state.aggregateId) { eventHandler.handle(state.aggregateId) {
@@ -54,7 +54,7 @@ class ReactionListener(
} }
} }
private suspend fun sendWinnerEvent(state: GameState) { private fun sendWinnerEvent(state: GameState) {
val winner = state.playerHasNoCardLeft().firstOrNull() val winner = state.playerHasNoCardLeft().firstOrNull()
if (winner != null) { if (winner != null) {
val reactionEvent = val reactionEvent =

View File

@@ -13,7 +13,6 @@ import eventDemo.business.event.GameEventStore
import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameListRepository
import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.GameStateRepository
import eventDemo.libs.event.projection.SnapshotConfig
import org.koin.core.module.Module import org.koin.core.module.Module
import org.koin.core.module.dsl.singleOf import org.koin.core.module.dsl.singleOf
import org.koin.core.scope.Scope import org.koin.core.scope.Scope
@@ -65,10 +64,10 @@ fun Module.configureDIInfrastructure(config: Configuration) {
singleOf(::GameProjectionBusInRabbitMQ) bind GameProjectionBus::class singleOf(::GameProjectionBusInRabbitMQ) bind GameProjectionBus::class
single { single {
GameStateRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig()) GameStateRepositoryInRedis(get())
} bind GameStateRepository::class } bind GameStateRepository::class
single { single {
GameListRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig()) GameListRepositoryInRedis(get())
} bind GameListRepository::class } bind GameListRepository::class
} }

View File

@@ -1,13 +1,22 @@
package eventDemo.libs.bus package eventDemo.libs.bus
interface Bus<T> { interface Bus<T> {
suspend fun publish(item: T) /**
* Publish a new [message][item] to the bus.
*/
fun publish(item: T)
/** /**
* @param priority The higher the priority, the more it will be called first * Subscribe a [lambda][block] to the bus.
*
* When a message is sent to the bus, the [block] is executed.
*/ */
fun subscribe(block: suspend (T) -> Unit): Subscription fun subscribe(block: (T) -> Unit): Subscription
/**
* The returns of the [subscribe] method.
* It can be called to [cancel][close] the subscription.
*/
interface Subscription : AutoCloseable { interface Subscription : AutoCloseable {
override fun close() override fun close()
} }

View File

@@ -2,28 +2,25 @@ package eventDemo.libs.bus
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.coroutines.coroutineScope
import kotlin.reflect.KClass import kotlin.reflect.KClass
class BusInMemory<E>( class BusInMemory<E>(
val name: KClass<*> = BusInMemory::class, val name: KClass<*> = BusInMemory::class,
) : Bus<E> { ) : Bus<E> {
private val logger = KotlinLogging.logger(name.qualifiedName.toString()) private val logger = KotlinLogging.logger(name.qualifiedName.toString())
private val subscribers: MutableList<suspend (E) -> Unit> = mutableListOf() private val subscribers: MutableList<(E) -> Unit> = mutableListOf()
override suspend fun publish(item: E) { override fun publish(item: E) {
withLoggingContext("busItem" to item.toString()) { withLoggingContext("busItem" to item.toString()) {
logger.info { "Item sent to the bus" } logger.info { "Item sent to the bus" }
subscribers subscribers
.forEach { .forEach {
coroutineScope { it(item)
it(item)
}
} }
} }
} }
override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { override fun subscribe(block: (E) -> Unit): Bus.Subscription {
subscribers.add(block) subscribers.add(block)
return object : Bus.Subscription { return object : Bus.Subscription {
override fun close() { override fun close() {

View File

@@ -42,21 +42,19 @@ class BusInRabbitMQ<E>(
} }
} }
override suspend fun publish(item: E) { override fun publish(item: E) {
connection connection
.createChannel() .createChannel()
.use { .basicPublish(
it.basicPublish( exchangeName,
exchangeName, routingKey,
routingKey, AMQP.BasicProperties(),
AMQP.BasicProperties(), objectToString(item).toByteArray(),
objectToString(item).toByteArray(), )
)
}
logger.info { "Item sent to the bus" } logger.info { "Item sent to the bus" }
} }
override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { override fun subscribe(block: (E) -> Unit): Bus.Subscription {
connection connection
.createChannel() .createChannel()
.also { channel -> .also { channel ->

View File

@@ -44,7 +44,7 @@ class CommandHandler<B : Bus<E>, E : Event<ID>, ID : AggregateId, C : Command>(
* *
* It restricts to run only once the [command]. * It restricts to run only once the [command].
*/ */
suspend fun handle( fun handle(
aggregateId: ID, aggregateId: ID,
command: C, command: C,
callback: CommandCallback<C>, callback: CommandCallback<C>,
@@ -99,10 +99,10 @@ private class EventCommandMap<C : Command, E : Event<*>>(
val event: E, val event: E,
val date: Instant, val date: Instant,
) { ) {
suspend operator fun invoke(error: CommandException? = null) { operator fun invoke(error: CommandException? = null) {
callback(command, error) callback(command, error)
} }
} }
} }
typealias CommandCallback<C> = suspend (command: C, error: CommandException?) -> Unit typealias CommandCallback<C> = (command: C, error: CommandException?) -> Unit

View File

@@ -14,7 +14,7 @@ class CommandRunnerController<C : Command>(
) { ) {
private val executedCommand: ConcurrentHashMap<CommandId, Pair<Boolean, Instant>> = ConcurrentHashMap() private val executedCommand: ConcurrentHashMap<CommandId, Pair<Boolean, Instant>> = ConcurrentHashMap()
suspend fun runOnlyOnce( fun runOnlyOnce(
command: C, command: C,
action: CommandBlock<C>, action: CommandBlock<C>,
) { ) {

View File

@@ -34,7 +34,7 @@ class CommandStreamChannel<C : Command>(
} }
} }
private suspend fun runAndLogStatus( private fun runAndLogStatus(
command: C, command: C,
action: CommandBlock<C>, action: CommandBlock<C>,
) { ) {
@@ -47,4 +47,4 @@ class CommandStreamChannel<C : Command>(
} }
} }
typealias CommandBlock<C> = suspend (C) -> Unit typealias CommandBlock<C> = (C) -> Unit

View File

@@ -4,7 +4,7 @@ package eventDemo.libs.event
* A stream to publish and read the played card event. * A stream to publish and read the played card event.
*/ */
interface EventHandler<E : Event<ID>, ID : AggregateId> { interface EventHandler<E : Event<ID>, ID : AggregateId> {
suspend fun handle( fun handle(
aggregateId: ID, aggregateId: ID,
buildEvent: (version: Int) -> E, buildEvent: (version: Int) -> E,
): E ): E

View File

@@ -19,7 +19,7 @@ class EventHandlerImpl<E : Event<ID>, ID : AggregateId>(
/** /**
* Build Event then send it to the event store and bus. * Build Event then send it to the event store and bus.
*/ */
override suspend fun handle( override fun handle(
aggregateId: ID, aggregateId: ID,
buildEvent: (version: Int) -> E, buildEvent: (version: Int) -> E,
): E = ): E =
@@ -34,13 +34,9 @@ class EventHandlerImpl<E : Event<ID>, ID : AggregateId>(
.also { .also {
withLoggingContext("event" to it.toString()) { withLoggingContext("event" to it.toString()) {
eventStore.publish(it) eventStore.publish(it)
eventBus.publish(it)
} }
} }
}.also { event ->
withLoggingContext("event" to event.toString()) {
// Publish to the bus
eventBus.publish(event)
}
} }
} }
} }

View File

@@ -0,0 +1,34 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
interface ProjectionRepository<E : Event<ID>, P : Projection<ID>, ID : AggregateId> {
/**
* Update projection with the event.
*/
fun apply(event: E): P
/**
* Update projection with the event, and save it.
*/
fun applyAndSave(event: E): P
/**
* Save the projection.
*/
fun save(projection: P)
/**
* Build the list of all [Projections][Projection]
*/
fun getList(
limit: Int = 100,
offset: Int = 0,
): List<P>
/**
* Build the last version of the [Projection] from the cache.
*/
fun get(aggregateId: ID): P
}

View File

@@ -0,0 +1,63 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
/**
* Repository abstraction to declare common process
*/
abstract class ProjectionRepositoryAbs<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val applyToProjection: P.(event: E) -> P,
) : ProjectionRepository<E, P, ID> {
private val logger = KotlinLogging.logger {}
/**
* Update projection with the event.
*
* 1. get the last projection
* 2. apply the new event to the projection
*/
override fun apply(event: E): P =
get(event.aggregateId).applyToProjectionSecure(event)
/**
* Update projection with the event, and save it.
*
* 1. get the last projection
* 2. apply the new event to projection
* 3. save it
*/
override fun applyAndSave(event: E): P =
apply(event)
.also {
withLoggingContext("projection" to it.toString(), "event" to event.toString()) {
save(it)
}
}
/**
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
*/
protected val applyToProjectionSecure: P.(event: E) -> P = { event ->
withLoggingContext("event" to event.toString(), "projection" to this.toString()) {
if (canBeApply(event)) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
"Event is already in the Projection, skip apply.".let {
logger.warn { it }
error(it)
}
} else {
"The version of the event must follow directly after the version of the projection.".let {
logger.error { it }
error(it)
}
}
}
}
private fun P.canBeApply(event: E): Boolean =
event.version == lastEventVersion + 1
}

View File

@@ -0,0 +1,46 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import java.util.concurrent.ConcurrentHashMap
class ProjectionRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val initialStateBuilder: (aggregateId: ID) -> P,
applyToProjection: P.(event: E) -> P,
) : ProjectionRepositoryAbs<E, P, ID>(applyToProjection),
ProjectionRepository<E, P, ID> {
private val projections: ConcurrentHashMap<ID, P> = ConcurrentHashMap()
/**
* Build the list of all [Projections][Projection]
*/
override fun getList(
limit: Int,
offset: Int,
): List<P> =
projections
.values
.drop(offset)
.take(limit)
/**
* Get the [Projection].
*/
override fun get(aggregateId: ID): P =
projections[aggregateId]
?: initialStateBuilder(aggregateId)
/**
* Save the projection.
*/
override fun save(projection: P) {
projections.compute(projection.aggregateId) { id: ID, proj: P? ->
val currentProjection = proj ?: initialStateBuilder(projection.aggregateId)
if (currentProjection.lastEventVersion < projection.lastEventVersion) {
projection
} else {
currentProjection
}
}
}
}

View File

@@ -0,0 +1,80 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import io.github.oshai.kotlinlogging.KotlinLogging
import redis.clients.jedis.UnifiedJedis
import redis.clients.jedis.params.ScanParams
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.reflect.KClass
class ProjectionRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val jedis: UnifiedJedis,
private val initialStateBuilder: (aggregateId: ID) -> P,
private val projectionClass: KClass<P>,
private val projectionToJson: (P) -> String,
private val jsonToProjection: (String) -> P,
applyToProjection: P.(event: E) -> P,
) : ProjectionRepositoryAbs<E, P, ID>(applyToProjection),
ProjectionRepository<E, P, ID> {
val logger = KotlinLogging.logger { }
private val lock = ReentrantLock()
/**
* Get the list of all [Projections][Projection]
*/
override fun getList(
limit: Int,
offset: Int,
): List<P> =
jedis
.hscan(
projectionClass.redisHKey,
offset.toString(),
ScanParams()
.match("*")
.count(limit),
).result
.mapNotNull {
jsonToProjection(it.value)
}
/**
* Get the [Projection].
*/
override fun get(aggregateId: ID): P =
jedis
.hget(
projectionClass.redisHKey,
aggregateId.id.toString(),
).let {
if (it == null || it == "nil") {
initialStateBuilder(aggregateId)
} else {
jsonToProjection(it)
}
}
override fun save(projection: P) {
lock.withLock {
if (get(projection.aggregateId).lastEventVersion < projection.lastEventVersion) {
jedis.hset(
projection.redisHKey,
projection.aggregateId.id.toString(),
projectionToJson(projection),
)
logger.info { "Projection saved" }
} else {
logger.error { "Projection save SKIP (an early version exists)" }
error("Projection save SKIP (an early version exists)")
}
}
}
}
private val <P : Projection<*>> KClass<P>.redisHKey: String get() =
"projection:$simpleName"
private val <P : Projection<*>> P.redisHKey: String get() =
this::class.redisHKey

View File

@@ -1,35 +0,0 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
interface ProjectionSnapshotRepository<E : Event<ID>, P : Projection<ID>, ID : AggregateId> {
/**
* Create a snapshot for the event
*/
suspend fun applyAndPutToCache(event: E): P
fun count(aggregateId: ID): Int
fun countAll(): Int
/**
* Build the list of all [Projections][Projection]
*/
fun getList(
limit: Int = 100,
offset: Int = 0,
): List<P>
/**
* Build the last version of the [Projection] from the cache.
*/
fun getLast(aggregateId: ID): P
/**
* Build the [Projection] to the specific [event][Event].
*
* It does not contain the [events][Event] it after this one.
*/
fun getUntil(event: E): P
}

View File

@@ -1,226 +0,0 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStream
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KClass
class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
val name: KClass<*> = ProjectionSnapshotRepositoryInMemory::class,
private val eventStore: EventStore<E, ID>,
private val initialStateBuilder: (aggregateId: ID) -> P,
private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(),
private val applyToProjection: P.(event: E) -> P,
) : ProjectionSnapshotRepository<E, P, ID> {
private val projectionsSnapshot: ConcurrentHashMap<ID, ConcurrentLinkedQueue<Pair<P, Instant>>> = ConcurrentHashMap()
private val logger = KotlinLogging.logger(name.qualifiedName.toString())
/**
* Create a snapshot for the event
*
* 1. get the last snapshot with a version lower than that of the event
* 2. get the events with a greater version of the snapshot
* 3. apply the event to the snapshot
* 4. apply the new event to the projection
* 5. save it
* 6. remove old one
*/
override suspend fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
withLoggingContext("projection" to it.toString()) {
save(it)
removeOldSnapshot(it.aggregateId)
}
}
override fun count(aggregateId: ID): Int =
projectionsSnapshot[aggregateId]?.count() ?: 0
override fun countAll(): Int =
projectionsSnapshot.mappingCount().toInt()
/**
* Build the list of all [Projections][Projection]
*/
override fun getList(
limit: Int,
offset: Int,
): List<P> =
projectionsSnapshot
.map { (id, b) ->
getLast(id)
}.drop(offset)
.take(limit)
/**
* Build the last version of the [Projection] from the cache.
*
* 1. get the last snapshot
* 2. get the missing event to the snapshot
* 3. apply the missing events to the snapshot
*/
override fun getLast(aggregateId: ID): P {
val lastSnapshot = getLastSnapshot(aggregateId)?.first
val missingEventOfSnapshot = getEventAfterTheSnapshot(aggregateId, lastSnapshot)
return lastSnapshot.applyEvents(aggregateId, missingEventOfSnapshot)
}
/**
* Build the [Projection] to the specific [event][Event].
*
* It does not contain the [events][Event] it after this one.
*
* 1. get the last snapshot before the event
* 2. get the events with a greater version of the snapshot but lower of passed event
* 3. apply the events to the snapshot
*/
override fun getUntil(event: E): P {
val lastSnapshot = getLastSnapshotBeforeOrEqualEvent(event)?.first
if (lastSnapshot?.lastEventVersion == event.version) {
return lastSnapshot
}
val missingEventOfSnapshot =
eventStore
.getStream(event.aggregateId)
// take the last snapshot version +1 to event version
.readVersionBetween(lastSnapshot, event)
return if (lastSnapshot?.lastEventVersion == event.version) {
lastSnapshot
} else {
lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot)
}
}
/**
* Remove the oldest [snapshot][P] of the [queue][projectionsSnapshot].
*
* The rules are pass in the controller.
*/
private fun removeOldSnapshot(aggregateId: ID) {
projectionsSnapshot[aggregateId]?.let { queue ->
if (snapshotCacheConfig.enabled) {
queue
.excludeFirstAndLast()
.excludeTheHeadBySize()
.excludeNewerByDate()
.excludeByModulo()
.forEach { queue.remove(it) }
}
}
}
/**
* Return a new list without the first and last snapshot.
*
* Exclude from deletion the first and the last.
*/
private fun FilteredList<P>.excludeFirstAndLast(): FilteredList<P> =
sortedBy { it.first.lastEventVersion }
.drop(1)
.dropLast(1)
/**
* Return a new list of event filtered by the version modulo.
*
* Exclude from deletion 1 element out of 10 (if modulo 10 in [config][snapshotCacheConfig]).
*/
private fun FilteredList<P>.excludeByModulo(): FilteredList<P> =
filter { (it.first.lastEventVersion % snapshotCacheConfig.modulo) != 1 }
/**
* Return a new list of event filtered by the maximum size.
*
* Exclude from removal all [snapshot][projectionsSnapshot] that in the head of the queue.
*/
private fun FilteredList<P>.excludeTheHeadBySize(): FilteredList<P> {
// filter if size exceeds the limit
return sortedBy { it.first.lastEventVersion }
.dropLast(snapshotCacheConfig.maxSnapshotCacheSize)
}
/**
* Return a new list of event filtered by the maximum date.
*
* Exclude from removal all [snapshot][projectionsSnapshot] that newer of the date (in [config][SnapshotConfig]).
*/
private fun FilteredList<P>.excludeNewerByDate(): FilteredList<P> {
val now = Clock.System.now()
val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl
return filter { deadLine < it.second }
}
/**
* Save the snapshot.
*/
private fun save(projection: P) {
projectionsSnapshot
.computeIfAbsent(projection.aggregateId) { ConcurrentLinkedQueue() }
.add(Pair(projection, Clock.System.now()))
.also { logger.info { "Projection saved" } }
}
/**
* Get the last snapshot when the version is lower of then event version
*/
private fun getLastSnapshotBeforeOrEqualEvent(event: E) =
projectionsSnapshot[event.aggregateId]
?.sortedByDescending { it.first.lastEventVersion }
?.find { it.first.lastEventVersion <= event.version }
/**
* Get the last snapshot (with the higher version).
*/
private fun getLastSnapshot(aggregateId: ID) =
projectionsSnapshot[aggregateId]
?.maxByOrNull { it.first.lastEventVersion }
/**
* Get the events from the [event stream][EventStream] when the version is higher of the snapshot.
*
* If the snapshot is null, it takes all events from the event [event stream][EventStream]
*/
private fun getEventAfterTheSnapshot(
aggregateId: ID,
snapshot: P?,
) =
eventStore
.getStream(aggregateId)
.readGreaterOfVersion(snapshot?.lastEventVersion ?: 0)
/**
* Apply events to the projection.
*/
private fun P?.applyEvents(
aggregateId: ID,
eventsToApply: Set<E>,
): P =
eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure)
/**
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
*/
private val applyToProjectionSecure: P.(event: E) -> P = { event ->
withLoggingContext("event" to event.toString(), "projection" to this.toString()) {
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")
}
}
}
}
private typealias FilteredList<P> = Collection<Pair<P, Instant>>

View File

@@ -1,240 +0,0 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.toRanges
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import redis.clients.jedis.UnifiedJedis
import redis.clients.jedis.params.ScanParams
import redis.clients.jedis.params.SortingParams
import kotlin.reflect.KClass
class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val eventStore: EventStore<E, ID>,
private val jedis: UnifiedJedis,
private val initialStateBuilder: (aggregateId: ID) -> P,
private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(),
private val projectionClass: KClass<P>,
private val projectionToJson: (P) -> String,
private val jsonToProjection: (String) -> P,
private val applyToProjection: P.(event: E) -> P,
) : ProjectionSnapshotRepository<E, P, ID> {
val logger = KotlinLogging.logger { }
/**
* Create a snapshot for the event
*
* 1. get the last snapshot with a version lower than that of the event
* 2. get the events with a greater version of the snapshot
* 3. apply the event to the snapshot
* 4. apply the new event to the projection
* 5. save it
* 6. remove old one
*/
override suspend fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
withLoggingContext(mapOf("projection" to it.toString(), "event" to event.toString())) {
save(it)
removeOldSnapshot(it.aggregateId, event.version)
}
}
override fun count(aggregateId: ID): Int =
jedis.zcount(projectionClass.redisKey(aggregateId), Double.MIN_VALUE, Double.MAX_VALUE).toInt()
override fun countAll(): Int =
jedis.zcount(projectionClass.redisKey, Double.MIN_VALUE, Double.MAX_VALUE).toInt()
/**
* Get the list of all [Projections][Projection]
*/
override fun getList(
limit: Int,
offset: Int,
): List<P> =
jedis
.scan(
offset.toString(),
ScanParams()
.match(projectionClass.redisKeySearchList)
.count(limit),
).result
.mapNotNull { key ->
getLastByKey(key)
?.let(jsonToProjection)
}
/**
* Get the last version of the [Projection] from the cache.
*
* 1. get the last snapshot
* 2. get the missing event to the snapshot
* 3. apply the missing events to the snapshot
*/
override fun getLast(aggregateId: ID): P =
getLastByKey(projectionClass.redisKey(aggregateId))
?.let(jsonToProjection)
?: initialStateBuilder(aggregateId)
private fun getLastByKey(key: String): String? =
jedis
.sort(
key,
SortingParams()
.desc()
.by("score")
.limit(0, 1),
).firstOrNull()
/**
* Build the [Projection] to the specific [event][Event].
*
* It does not contain the [events][Event] it after this one.
*
* 1. get the last snapshot before the event
* 2. get the events with a greater version of the snapshot but lower of passed event
* 3. apply the events to the snapshot
*/
override fun getUntil(event: E): P {
val lastSnapshot =
jedis
.zrangeByScore(
projectionClass.redisKey(event.aggregateId),
1.0,
event.version.toDouble(),
0,
1,
).firstOrNull()
?.let(jsonToProjection)
if (lastSnapshot?.lastEventVersion == event.version) {
return lastSnapshot
}
if (lastSnapshot != null && lastSnapshot.lastEventVersion > event.version) {
logger.error { "Cannot be apply event on more recent snapshot" }
error("Cannot be apply event on more recent snapshot")
}
val missingEventOfSnapshot =
eventStore
.getStream(event.aggregateId)
// take the last snapshot version +1 to event version
.readVersionBetween(lastSnapshot, event)
return if (lastSnapshot?.lastEventVersion == event.version) {
lastSnapshot
} else {
lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot)
}
}
private fun save(projection: P) {
val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection))
if (added < 1) {
logger.error { "Projection NOT saved (already exists)" }
} else {
logger.info { "Projection saved" }
if (snapshotCacheConfig.maxSnapshotCacheTtl.isFinite()) {
jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds)
}
}
}
/**
* Apply events to the projection.
*/
private fun P?.applyEvents(
aggregateId: ID,
eventsToApply: Set<E>,
): P =
eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure)
/**
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
*/
private val applyToProjectionSecure: P.(event: E) -> P = { event ->
withLoggingContext("event" to event.toString(), "projection" to this.toString()) {
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")
}
}
}
fun removeOldSnapshot(
aggregateId: AggregateId,
lastVersion: Int,
) {
if (snapshotCacheConfig.enabled) {
removeByModulo(aggregateId, lastVersion)
removeTheHeadBySize(aggregateId, lastVersion)
}
}
private fun removeByModulo(
aggregateId: AggregateId,
lastVersion: Int,
) {
(lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo))
.let { if (it < 2) 2 else it }
.let { IntRange(it, lastVersion - 1) }
.filter { (it % snapshotCacheConfig.modulo) != 1 }
.toRanges()
.map {
jedis
.zremrangeByScore(
projectionClass.redisKey(aggregateId),
it.first.toDouble(),
it.last.toDouble(),
).also { removedCount ->
if (removedCount > 0) {
logger.debug {
"$removedCount snapshot removed Modulo(${snapshotCacheConfig.modulo}) (${it.first} to ${it.last}) [lastVersion=$lastVersion]"
}
}
}
}
}
private fun removeTheHeadBySize(
aggregateId: AggregateId,
lastVersion: Int,
) {
(lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo))
.toDouble()
.let {
jedis
.zremrangeByScore(
projectionClass.redisKey(aggregateId),
2.0,
it,
).also { removedCount ->
if (removedCount > 0) {
logger.info {
"$removedCount snapshot removed Size(${snapshotCacheConfig.maxSnapshotCacheSize}) (1.0 to $it) [lastVersion=$lastVersion]"
}
}
}
}
}
}
val <P : Projection<*>> KClass<P>.redisKeySearchList: String get() {
return "projection:$simpleName:*"
}
val <P : Projection<*>> P.redisKey: String get() {
return "projection:${this::class.simpleName}:${aggregateId.id}"
}
fun <P : Projection<*>, A : AggregateId> KClass<P>.redisKey(aggregateId: A): String =
"projection:$simpleName:${aggregateId.id}"
val <P : Projection<*>> KClass<P>.redisKey: String get() =
"projection:$simpleName"

View File

@@ -1,26 +0,0 @@
package eventDemo.libs.event.projection
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
data class SnapshotConfig(
/**
* Keep snapshot when is on the head of the queue cache
*/
val maxSnapshotCacheSize: Int = 20,
/**
* Keep snapshot when is newer of
*
* snapshot.date > now + maxSnapshotCacheTtl
*/
val maxSnapshotCacheTtl: Duration = 10.minutes,
/**
* Keep snapshot when version is this modulo
*
* snapshot.lastVersion % modulo == 1
*/
val modulo: Int = 10,
val enabled: Boolean = true,
)
val DISABLED_CONFIG = SnapshotConfig(Int.MAX_VALUE, Duration.INFINITE, Int.MAX_VALUE, enabled = false)

View File

@@ -6,6 +6,7 @@ import eventDemo.business.entity.Deck
import eventDemo.configuration.business.configureGameListener import eventDemo.configuration.business.configureGameListener
import eventDemo.configuration.injection.appKoinModule import eventDemo.configuration.injection.appKoinModule
import eventDemo.configuration.ktor.configuration import eventDemo.configuration.ktor.configuration
import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.server.config.ApplicationConfig import io.ktor.server.config.ApplicationConfig
import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.testing.ApplicationTestBuilder
import io.ktor.server.testing.testApplication import io.ktor.server.testing.testApplication
@@ -39,6 +40,7 @@ fun testApplicationWithConfig(
configBuilder: Koin.() -> Unit = {}, configBuilder: Koin.() -> Unit = {},
block: suspend ApplicationTestBuilder.() -> Unit, block: suspend ApplicationTestBuilder.() -> Unit,
) { ) {
val logger = KotlinLogging.logger {}
testApplication { testApplication {
val conf = ApplicationConfig("application.conf") val conf = ApplicationConfig("application.conf")
environment { environment {
@@ -46,11 +48,18 @@ fun testApplicationWithConfig(
} }
application { application {
logger.info { "Config App" }
val koin = getKoin() val koin = getKoin()
koin.cleanDataTest() koin.cleanDataTest()
configBuilder(koin) runCatching {
logger.info { "Starting A" }
configBuilder(koin)
logger.info { "A finish" }
}
} }
block() logger.info { "Starting B" }
this@testApplication.block()
logger.info { "B finish" }
} }
} }

View File

@@ -8,6 +8,7 @@ import eventDemo.business.event.event.NewPlayerEvent
import eventDemo.business.event.event.PlayerReadyEvent import eventDemo.business.event.event.PlayerReadyEvent
import eventDemo.business.event.projection.GameList import eventDemo.business.event.projection.GameList
import eventDemo.testApplicationWithConfig import eventDemo.testApplicationWithConfig
import io.github.oshai.kotlinlogging.KotlinLogging
import io.kotest.assertions.nondeterministic.eventually import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.collections.shouldContain
@@ -19,18 +20,19 @@ import io.ktor.client.request.get
import io.ktor.client.statement.bodyAsText import io.ktor.client.statement.bodyAsText
import io.ktor.http.ContentType import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.runBlocking
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertTrue import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
val logger = KotlinLogging.logger {}
class GameListRouteTest : class GameListRouteTest :
FunSpec({ FunSpec({
test("/games with no game started") { test("/games with no game started") {
testApplicationWithConfig { testApplicationWithConfig {
val player1 = Player(name = "Nikola") val player1 = Player(name = "Nikola")
logger.info { "Starting player1" }
httpClient() httpClient()
.get("/games") { .get("/games") {
withAuth(player1) withAuth(player1)
@@ -48,16 +50,14 @@ class GameListRouteTest :
val player1 = Player(name = "Nikola") val player1 = Player(name = "Nikola")
testApplicationWithConfig( testApplicationWithConfig(
{ {
runBlocking { get<GameEventHandler>()
get<GameEventHandler>() .handle(gameId) {
.handle(gameId) { NewPlayerEvent(gameId, player1, it)
NewPlayerEvent(gameId, player1, it) }
}
}
}, },
) { ) {
// Wait until the projection is created // Wait until the projection is created
eventually(1.seconds) { eventually(3.seconds) {
httpClient() httpClient()
.get("/games") { .get("/games") {
withAuth(player1) withAuth(player1)
@@ -81,19 +81,17 @@ class GameListRouteTest :
val player2 = Player(name = "Einstein") val player2 = Player(name = "Einstein")
testApplicationWithConfig({ testApplicationWithConfig({
val eventHandler = get<GameEventHandler>() val eventHandler = get<GameEventHandler>()
runBlocking { eventHandler.handle(gameId) { NewPlayerEvent(gameId, player1, it) }
eventHandler.handle(gameId) { NewPlayerEvent(gameId, player1, it) } eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) }
eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } eventHandler.handle(gameId) {
eventHandler.handle(gameId) { GameStartedEvent.new(
GameStartedEvent.new( gameId,
gameId, setOf(player1, player2),
setOf(player1, player2), it,
it, shuffleIsDisabled = true,
shuffleIsDisabled = true, )
)
}
} }
}) { }) {
eventually(1.seconds) { eventually(1.seconds) {

View File

@@ -1,7 +1,6 @@
package eventDemo.adapter.interfaceLayer.query package eventDemo.adapter.interfaceLayer.query
import eventDemo.Tag import eventDemo.Tag
import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInMemory
import eventDemo.business.command.GameCommandHandler import eventDemo.business.command.GameCommandHandler
import eventDemo.business.command.command.GameCommand import eventDemo.business.command.command.GameCommand
import eventDemo.business.command.command.IWantToJoinTheGameCommand import eventDemo.business.command.command.IWantToJoinTheGameCommand
@@ -10,9 +9,9 @@ import eventDemo.business.command.command.IamReadyToPlayCommand
import eventDemo.business.entity.Card import eventDemo.business.entity.Card
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player import eventDemo.business.entity.Player
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.event.disableShuffleDeck import eventDemo.business.event.event.disableShuffleDeck
import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository
import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener
import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.CommandSuccessNotification
import eventDemo.business.notification.ItsTheTurnOfNotification import eventDemo.business.notification.ItsTheTurnOfNotification
@@ -61,9 +60,9 @@ class GameSimulationTest :
var player1HasJoin = false var player1HasJoin = false
testKoinApplicationWithConfig { testKoinApplicationWithConfig {
val commandHandler by inject<GameCommandHandler>() val commandHandler = get<GameCommandHandler>()
val eventStore by inject<GameEventStore>() val playerNotificationListener = get<PlayerNotificationListener>()
val playerNotificationListener by inject<PlayerNotificationListener>() val gameStateRepository = get<GameStateRepository>()
// Run command handler // Run command handler
// In the normal process, these handlers is invoque players connect to the websocket // In the normal process, these handlers is invoque players connect to the websocket
@@ -76,8 +75,8 @@ class GameSimulationTest :
} }
} }
// Consume etch notification of players, and put theses in list. // Consume etch notification of players, and put theses in a list.
// is used later to control when other players can be executing the next action // Is used later to control when other players can execute the next action
val player1Notifications = mutableListOf<Notification>() val player1Notifications = mutableListOf<Notification>()
val player2Notifications = mutableListOf<Notification>() val player2Notifications = mutableListOf<Notification>()
run { run {
@@ -94,7 +93,7 @@ class GameSimulationTest :
} }
} }
// The player 1 actions // Player 1 actions
val player1Job = val player1Job =
launch { launch {
playerNotificationListener.startListening(player1, gameId) { playerNotificationListener.startListening(player1, gameId) {
@@ -132,11 +131,11 @@ class GameSimulationTest :
} }
} }
// The player 2 actions // Player 2 actions
val player2Job = val player2Job =
launch { launch {
// wait the player 1 has join the game // wait player 1 has joined the game
until(1.seconds) { player1HasJoin } until(3.seconds) { player1HasJoin }
playerNotificationListener.startListening(player2, gameId) { playerNotificationListener.startListening(player2, gameId) {
channelNotification2.trySendBlocking(it) channelNotification2.trySendBlocking(it)
@@ -176,7 +175,7 @@ class GameSimulationTest :
joinAll(player1Job, player2Job) joinAll(player1Job, player2Job)
// Build the last state from the event store // Build the last state from the event store
val state = GameStateRepositoryInMemory(eventStore = eventStore).getLast(gameId) val state = gameStateRepository.get(gameId)
// Check if the state is correct // Check if the state is correct
state.aggregateId shouldBeEqual gameId state.aggregateId shouldBeEqual gameId
@@ -192,6 +191,6 @@ class GameSimulationTest :
}) })
private suspend inline fun <reified T : Notification> MutableList<Notification>.waitNotification(crossinline block: T.() -> Boolean): T = private suspend inline fun <reified T : Notification> MutableList<Notification>.waitNotification(crossinline block: T.() -> Boolean): T =
eventually(1.seconds) { eventually(3.seconds) {
filterIsInstance<T>().first { block(it) } filterIsInstance<T>().first { block(it) }
} }

View File

@@ -12,6 +12,7 @@ import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.GameStateRepository
import eventDemo.testApplicationWithConfig import eventDemo.testApplicationWithConfig
import io.kotest.assertions.nondeterministic.eventually import io.kotest.assertions.nondeterministic.eventually
import io.kotest.assertions.nondeterministic.until
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.equals.shouldBeEqual
@@ -53,7 +54,6 @@ class GameStateRouteTest :
val gameId = GameId() val gameId = GameId()
val player1 = Player(name = "Nikola") val player1 = Player(name = "Nikola")
val player2 = Player(name = "Einstein") val player2 = Player(name = "Einstein")
var lastPlayedCard: Card? = null
testApplicationWithConfig({ testApplicationWithConfig({
disableShuffleDeck() disableShuffleDeck()
val eventHandler = get<GameEventHandler>() val eventHandler = get<GameEventHandler>()
@@ -64,9 +64,8 @@ class GameStateRouteTest :
eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) }
lastPlayedCard = eventually { stateRepo.getLast(gameId).playableCards(player1).first() } val lastPlayedCard = eventually(3.seconds) { stateRepo.get(gameId).playableCards(player1).first() }
assertNotNull(lastPlayedCard) assertIs<Card.NumericCard>(lastPlayedCard)
.let { assertIs<Card.NumericCard>(lastPlayedCard) }
.let { .let {
it.number shouldBeEqual 0 it.number shouldBeEqual 0
it.color shouldBeEqual Card.Color.Red it.color shouldBeEqual Card.Color.Red
@@ -74,32 +73,36 @@ class GameStateRouteTest :
eventHandler.handle(gameId) { eventHandler.handle(gameId) {
CardIsPlayedEvent( CardIsPlayedEvent(
gameId, gameId,
assertNotNull(lastPlayedCard), lastPlayedCard,
player1, player1,
it, it,
) )
} }
until(3.seconds) {
stateRepo
.get(gameId)
.deck.discard
.last() == lastPlayedCard
}
} }
}) { }) {
eventually(1.seconds) { httpClient()
httpClient() .get("/games/$gameId/state") {
.get("/games/$gameId/state") { withAuth(player1)
withAuth(player1) accept(ContentType.Application.Json)
accept(ContentType.Application.Json) }.apply {
}.apply { assertEquals(HttpStatusCode.OK, status, message = bodyAsText())
assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) call.body<GameState>().apply {
call.body<GameState>().apply { aggregateId shouldBeEqual gameId
aggregateId shouldBeEqual gameId players shouldHaveSize 2
players shouldHaveSize 2 isStarted shouldBeEqual true
isStarted shouldBeEqual true assertIs<CardIsPlayedEvent>(lastEvent)
assertIs<CardIsPlayedEvent>(lastEvent) readyPlayers shouldBeEqual setOf(player1, player2)
readyPlayers shouldBeEqual setOf(player1, player2) direction shouldBeEqual GameState.Direction.CLOCKWISE
direction shouldBeEqual GameState.Direction.CLOCKWISE assertNotNull(lastCardPlayer) shouldBeEqual player1
assertNotNull(lastCardPlayer) shouldBeEqual player1 assertNotNull(colorOnCurrentStack) shouldBeEqual Card.Color.Red
assertNotNull(colorOnCurrentStack) shouldBeEqual Card.Color.Red
}
} }
} }
} }
} }
@@ -118,9 +121,8 @@ class GameStateRouteTest :
eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) }
eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) }
lastPlayedCard = eventually { stateRepo.getLast(gameId).playableCards(player1).first() } lastPlayedCard = eventually(3.seconds) { stateRepo.get(gameId).playableCards(player1).first() }
assertNotNull(lastPlayedCard) assertIs<Card.NumericCard>(lastPlayedCard)
.let { assertIs<Card.NumericCard>(lastPlayedCard) }
.let { .let {
it.number shouldBeEqual 0 it.number shouldBeEqual 0
it.color shouldBeEqual Card.Color.Red it.color shouldBeEqual Card.Color.Red
@@ -133,18 +135,23 @@ class GameStateRouteTest :
it, it,
) )
} }
until(3.seconds) {
stateRepo
.get(gameId)
.deck.discard
.last() == lastPlayedCard
}
} }
}) { }) {
eventually(1.seconds) { httpClient()
httpClient() .get("/games/$gameId/card/last") {
.get("/games/$gameId/card/last") { withAuth(player1)
withAuth(player1) accept(ContentType.Application.Json)
accept(ContentType.Application.Json) }.apply {
}.apply { assertEquals(HttpStatusCode.OK, status, message = bodyAsText())
assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) assertEquals(assertNotNull(lastPlayedCard), call.body<Card>())
assertEquals(assertNotNull(lastPlayedCard), call.body<Card>()) }
}
}
} }
} }
}) })

View File

@@ -2,7 +2,6 @@ package eventDemo.business.event.projection
import ch.qos.logback.classic.Level import ch.qos.logback.classic.Level
import com.rabbitmq.client.impl.ForgivingExceptionHandler import com.rabbitmq.client.impl.ForgivingExceptionHandler
import com.zaxxer.hikari.pool.ProxyConnection
import eventDemo.Tag import eventDemo.Tag
import eventDemo.business.command.GameCommandHandler import eventDemo.business.command.GameCommandHandler
import eventDemo.business.entity.GameId import eventDemo.business.entity.GameId
@@ -17,7 +16,6 @@ import io.kotest.common.KotestInternal
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.equals.shouldBeEqual
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll import kotlinx.coroutines.joinAll
@@ -41,13 +39,10 @@ class GameStateRepositoryTest :
val eventHandler = get<GameEventHandler>() val eventHandler = get<GameEventHandler>()
eventHandler eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) }
.also { event -> .also {
// Wait until the projection is created // Wait until the projection is created
eventually(1.seconds) { eventually(1.seconds) {
assertNotNull(repo.getUntil(event)).also { assertNotNull(repo.get(aggregateId)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1)
}
assertNotNull(repo.getLast(aggregateId)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1) assertNotNull(it.players) shouldBeEqual setOf(player1)
} }
} }
@@ -68,9 +63,9 @@ class GameStateRepositoryTest :
var state: GameState? = null var state: GameState? = null
projectionBus.subscribe { projectionBus.subscribe {
repo.getLast(aggregateId).also { repo
state = it .get(aggregateId)
} .also { state = it }
} }
eventHandler eventHandler
@@ -86,7 +81,7 @@ class GameStateRepositoryTest :
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) }
.also { .also {
eventually(1.seconds) { eventually(1.seconds) {
assertNotNull(repo.getLast(aggregateId)).also { assertNotNull(repo.get(aggregateId)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1, player2) assertNotNull(it.players) shouldBeEqual setOf(player1, player2)
} }
} }
@@ -95,44 +90,7 @@ class GameStateRepositoryTest :
} }
} }
test("getUntil should build the state until the event") { test("get should be concurrently secure").config(tags = setOf(Tag.Concurrence)) {
withLogLevel(
GameCommandHandler::class.java.name to Level.ERROR,
ForgivingExceptionHandler::class.java.name to Level.OFF,
ProxyConnection::class.java.name to Level.OFF,
Logger.ROOT_LOGGER_NAME to Level.INFO,
) {
repeat(10) {
val aggregateId = GameId()
testKoinApplicationWithConfig {
val repo = get<GameStateRepository>()
val eventHandler = get<GameEventHandler>()
val event1 =
eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) }
.also { event1 ->
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1)
}
}
eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) }
.also { event2 ->
assertNotNull(repo.getUntil(event2)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1, player2)
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1)
}
}
}
}
}
}
test("getUntil should be concurrently secure").config(tags = setOf(Tag.Concurrence)) {
withLogLevel( withLogLevel(
Logger.ROOT_LOGGER_NAME to Level.ERROR, Logger.ROOT_LOGGER_NAME to Level.ERROR,
ForgivingExceptionHandler::class.java.name to Level.OFF, ForgivingExceptionHandler::class.java.name to Level.OFF,
@@ -167,17 +125,12 @@ class GameStateRepositoryTest :
includeFirst = false includeFirst = false
}, },
) { ) {
repo.getLast(aggregateId).run { repo.get(aggregateId).run {
lastEventVersion shouldBeEqual 200 lastEventVersion shouldBeEqual 200
players shouldHaveSize 200 players shouldHaveSize 200
} }
repo.count(aggregateId) shouldBe 21
} }
} }
} }
} }
xtest("get should be concurrently secure") {
tags(Tag.Concurrence)
}
}) })

View File

@@ -0,0 +1,38 @@
package eventDemo.libs.event
import eventDemo.libs.bus.Bus
import eventDemo.libs.bus.BusInMemory
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.equals.shouldBeEqual
class EventHandlerTest :
FunSpec({
test("EventHandler::handle should returns the built event") {
val eventBus: Bus<EventXTest> = BusInMemory()
val eventStore: EventStore<EventXTest, IdTest> = EventStoreInMemory()
val versionBuilder: VersionBuilder = VersionBuilderLocal()
val aggregateId: IdTest = IdTest()
val handler =
EventHandlerImpl(
eventBus,
eventStore,
versionBuilder,
)
// When
val event =
handler.handle(aggregateId) {
EventXTest(aggregateId = aggregateId, version = it, num = 1)
}
// Then
event.aggregateId shouldBeEqual aggregateId
event.version shouldBeEqual 1
}
xtest("EventHandler::handle should publish the event into the store")
xtest("EventHandler::handle should publish the event into the bus")
xtest("EventHandler::handle should publish the event into the bus in incremental order")
})

View File

@@ -1,4 +1,4 @@
package eventDemo.business.event.projection package eventDemo.libs.event.projection
import eventDemo.cleanProjections import eventDemo.cleanProjections
import eventDemo.configuration.serializer.UUIDSerializer import eventDemo.configuration.serializer.UUIDSerializer
@@ -7,12 +7,6 @@ import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStoreInMemory import eventDemo.libs.event.EventStoreInMemory
import eventDemo.libs.event.VersionBuilderLocal import eventDemo.libs.event.VersionBuilderLocal
import eventDemo.libs.event.projection.DISABLED_CONFIG
import eventDemo.libs.event.projection.Projection
import eventDemo.libs.event.projection.ProjectionSnapshotRepository
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.kotest.assertions.nondeterministic.continually import io.kotest.assertions.nondeterministic.continually
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.datatest.withData import io.kotest.datatest.withData
@@ -22,6 +16,7 @@ import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.datetime.Clock import kotlinx.datetime.Clock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
@@ -34,14 +29,14 @@ import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
@OptIn(DelicateCoroutinesApi::class) @OptIn(DelicateCoroutinesApi::class)
class ProjectionSnapshotRepositoryTest : class ProjectionRepositoryTest :
FunSpec({ FunSpec({
data class TestData( data class TestData(
val store: EventStore<TestEvents, IdTest>, val store: EventStore<TestEvents, IdTest>,
val snapshotRepo: ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest>, val repository: ProjectionRepository<TestEvents, ProjectionTest, IdTest>,
) : WithDataTestName { ) : WithDataTestName {
override fun dataTestName(): String = override fun dataTestName(): String =
"${snapshotRepo::class.simpleName} with ${store::class.simpleName}" "${repository::class.simpleName} with ${store::class.simpleName}"
} }
val eventStores = val eventStores =
@@ -50,48 +45,40 @@ class ProjectionSnapshotRepositoryTest :
) )
val projectionRepo = val projectionRepo =
listOf( listOf(
::getSnapshotRepoInMemoryTest, ::getRepoInMemoryTest,
::getSnapshotRepoInRedisTest, ::getRepoInRedisTest,
) )
val list = val list =
eventStores.flatMap { store -> eventStores.flatMap { store ->
projectionRepo.map { repo -> projectionRepo.map { repo ->
store().let { store -> TestData(store, repo(store, DISABLED_CONFIG)) } TestData(store(), repo())
} }
} }
context("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { context("when call applyAndSave, the projection should be built and save to the repository") {
withData(list) { (eventStore, repo) -> withData(list) { (eventStore, repo) ->
val aggregateId = IdTest() val aggregateId = IdTest()
val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest())
eventStore.publish(eventOther) // eventStore.publish(eventOther)
repo.applyAndPutToCache(eventOther) val p = repo.applyAndSave(eventOther)
assertNotNull(repo.getUntil(eventOther)).also { println(p)
assertNotNull(repo.get(eventOther.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "valOther" assertNotNull(it.value) shouldBeEqual "valOther"
} }
val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId)
eventStore.publish(event1) // eventStore.publish(event1)
repo.applyAndPutToCache(event1) repo.applyAndSave(event1)
assertNotNull(repo.getLast(event1.aggregateId)).also { assertNotNull(repo.get(event1.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.value) shouldBeEqual "val1" assertNotNull(it.value) shouldBeEqual "val1"
} }
val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId)
eventStore.publish(event2) // eventStore.publish(event2)
repo.applyAndPutToCache(event2) repo.applyAndSave(event2)
assertNotNull(repo.getLast(event2.aggregateId)).also { assertNotNull(repo.get(event2.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1val2"
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
assertNotNull(repo.getUntil(event2)).also {
assertNotNull(it.value) shouldBeEqual "val1val2" assertNotNull(it.value) shouldBeEqual "val1val2"
} }
} }
@@ -104,18 +91,18 @@ class ProjectionSnapshotRepositoryTest :
val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = otherAggregateId) val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = otherAggregateId)
eventStore.publish(eventOther) eventStore.publish(eventOther)
repo.applyAndPutToCache(eventOther) repo.applyAndSave(eventOther)
assertNotNull(repo.getUntil(eventOther)).also { assertNotNull(repo.get(eventOther.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "valOther" assertNotNull(it.value) shouldBeEqual "valOther"
} }
val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId)
eventStore.publish(event1) eventStore.publish(event1)
repo.applyAndPutToCache(event1) repo.applyAndSave(event1)
val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId)
eventStore.publish(event2) eventStore.publish(event2)
repo.applyAndPutToCache(event2) repo.applyAndSave(event2)
repo.getList().apply { repo.getList().apply {
any { it.aggregateId == otherAggregateId } shouldBeEqual true any { it.aggregateId == otherAggregateId } shouldBeEqual true
@@ -128,7 +115,7 @@ class ProjectionSnapshotRepositoryTest :
} }
} }
context("ProjectionSnapshotRepository should be thread safe") { context("ProjectionRepository should be thread safe") {
continually(1.seconds) { continually(1.seconds) {
withData(list) { (eventStore, repo) -> withData(list) { (eventStore, repo) ->
val aggregateId = IdTest() val aggregateId = IdTest()
@@ -137,43 +124,24 @@ class ProjectionSnapshotRepositoryTest :
(0..9) (0..9)
.map { .map {
GlobalScope.launch { GlobalScope.launch {
(1..10).forEach { repeat(10) {
val eventX = lock.withLock {
lock.withLock { runBlocking {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) EventXTest(
.also { eventStore.publish(it) } num = 1,
version = versionBuilder.buildNextVersion(aggregateId),
aggregateId = aggregateId,
).also { repo.applyAndSave(it) }
} }
repo.applyAndPutToCache(eventX) }
} }
} }
}.joinAll() }.joinAll()
assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 assertNotNull(repo.get(aggregateId)).lastEventVersion shouldBeEqual 100
assertNotNull(repo.count(aggregateId)) shouldBeEqual 100 assertNotNull(repo.get(aggregateId)).num shouldBeEqual 100
} }
} }
} }
context("removeOldSnapshot") {
withData(list) { (eventStore, repo) ->
val versionBuilder = VersionBuilderLocal()
val aggregateId = IdTest()
suspend fun buildEndSendEventX() {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)
.also { eventStore.publish(it) }
.also { repo.applyAndPutToCache(it) }
}
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 1
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 2
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 3
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 4
}
}
}) })
@JvmInline @JvmInline
@@ -217,28 +185,18 @@ private data class EventXTest(
val num: Int, val num: Int,
) : TestEvents ) : TestEvents
private fun getSnapshotRepoInMemoryTest( private fun getRepoInMemoryTest(): ProjectionRepository<TestEvents, ProjectionTest, IdTest> =
eventStore: EventStore<TestEvents, IdTest>, ProjectionRepositoryInMemory(
snapshotConfig: SnapshotConfig,
): ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest> =
ProjectionSnapshotRepositoryInMemory(
eventStore = eventStore,
initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) },
snapshotCacheConfig = snapshotConfig,
applyToProjection = apply, applyToProjection = apply,
) )
private fun getSnapshotRepoInRedisTest( private fun getRepoInRedisTest(): ProjectionRepository<TestEvents, ProjectionTest, IdTest> {
eventStore: EventStore<TestEvents, IdTest>,
snapshotConfig: SnapshotConfig,
): ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest> {
val jedis = JedisPooled("redis://localhost:6379") val jedis = JedisPooled("redis://localhost:6379")
jedis.cleanProjections() jedis.cleanProjections()
return ProjectionSnapshotRepositoryInRedis( return ProjectionRepositoryInRedis(
eventStore = eventStore,
jedis = jedis, jedis = jedis,
initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) },
snapshotCacheConfig = snapshotConfig,
projectionClass = ProjectionTest::class, projectionClass = ProjectionTest::class,
projectionToJson = { Json.encodeToString(it) }, projectionToJson = { Json.encodeToString(it) },
jsonToProjection = { Json.decodeFromString(it) }, jsonToProjection = { Json.decodeFromString(it) },