diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt new file mode 100644 index 0000000..e2836a3 --- /dev/null +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt @@ -0,0 +1,63 @@ +package eventDemo.adapter.infrastructureLayer.event.projection + +import eventDemo.business.entity.GameId +import eventDemo.business.event.GameEventBus +import eventDemo.business.event.GameEventStore +import eventDemo.business.event.event.GameEvent +import eventDemo.business.event.projection.GameProjectionBus +import eventDemo.business.event.projection.gameState.GameState +import eventDemo.business.event.projection.gameState.GameStateRepository +import eventDemo.business.event.projection.gameState.apply +import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis +import eventDemo.libs.event.projection.SnapshotConfig +import kotlinx.serialization.json.Json +import redis.clients.jedis.UnifiedJedis + +/** + * Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus]. + */ +class GameStateRepositoryInRedis( + eventStore: GameEventStore, + projectionBus: GameProjectionBus, + eventBus: GameEventBus, + jedis: UnifiedJedis, + snapshotConfig: SnapshotConfig = SnapshotConfig(), +) : GameStateRepository { + private val projectionsSnapshot = + ProjectionSnapshotRepositoryInRedis( + eventStore = eventStore, + snapshotCacheConfig = snapshotConfig, + initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, + projectionClass = GameState::class, + projectionToJson = { Json.encodeToString(GameState.serializer(), it) }, + jsonToProjection = { Json.decodeFromString(GameState.serializer(), it) }, + applyToProjection = GameState::apply, + jedis = jedis, + ) + + init { + // On new event was received, build snapshot and publish it to the projection bus + eventBus.subscribe { event -> + projectionsSnapshot + .applyAndPutToCache(event) + .also { projectionBus.publish(it) } + } + } + + /** + * Get the last version of the [GameState] from the all eventStream. + * + * It fetches it from the local cache if possible, otherwise it builds it. + */ + override fun getLast(gameId: GameId): GameState = + projectionsSnapshot.getLast(gameId) + + /** + * Get the [GameState] to the specific [event][GameEvent]. + * It does not contain the [events][GameEvent] it after this one. + * + * It fetches it from the local cache if possible, otherwise it builds it. + */ + override fun getUntil(event: GameEvent): GameState = + projectionsSnapshot.getUntil(event) +} diff --git a/src/main/kotlin/eventDemo/business/event/event/CardIsPlayedEvent.kt b/src/main/kotlin/eventDemo/business/event/event/CardIsPlayedEvent.kt index 1c80a91..206eb67 100644 --- a/src/main/kotlin/eventDemo/business/event/event/CardIsPlayedEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/CardIsPlayedEvent.kt @@ -3,13 +3,16 @@ package eventDemo.business.event.event import eventDemo.business.entity.Card import eventDemo.business.entity.GameId import eventDemo.business.entity.Player +import eventDemo.configuration.serializer.UUIDSerializer import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable import java.util.UUID /** * An [GameEvent] to represent a played card. */ +@Serializable data class CardIsPlayedEvent( override val aggregateId: GameId, val card: Card, @@ -17,6 +20,7 @@ data class CardIsPlayedEvent( override val version: Int, ) : GameEvent, PlayerActionEvent { + @Serializable(with = UUIDSerializer::class) override val eventId: UUID = UUID.randomUUID() override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/business/event/event/GameStartedEvent.kt b/src/main/kotlin/eventDemo/business/event/event/GameStartedEvent.kt index 83f26e4..7ce22a1 100644 --- a/src/main/kotlin/eventDemo/business/event/event/GameStartedEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/GameStartedEvent.kt @@ -4,19 +4,23 @@ import eventDemo.business.entity.Deck import eventDemo.business.entity.GameId import eventDemo.business.entity.Player import eventDemo.business.entity.initHands +import eventDemo.configuration.serializer.UUIDSerializer import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable import java.util.UUID /** * This [GameEvent] is sent when all players are ready. */ +@Serializable data class GameStartedEvent( override val aggregateId: GameId, val firstPlayer: Player, val deck: Deck, override val version: Int, ) : GameEvent { + @Serializable(with = UUIDSerializer::class) override val eventId: UUID = UUID.randomUUID() override val createdAt: Instant = Clock.System.now() diff --git a/src/main/kotlin/eventDemo/business/event/event/NewPlayerEvent.kt b/src/main/kotlin/eventDemo/business/event/event/NewPlayerEvent.kt index 852d2c9..5b5baac 100644 --- a/src/main/kotlin/eventDemo/business/event/event/NewPlayerEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/NewPlayerEvent.kt @@ -2,18 +2,22 @@ package eventDemo.business.event.event import eventDemo.business.entity.GameId import eventDemo.business.entity.Player +import eventDemo.configuration.serializer.UUIDSerializer import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable import java.util.UUID /** * An [GameEvent] to represent a new player joining the game. */ +@Serializable data class NewPlayerEvent( override val aggregateId: GameId, val player: Player, override val version: Int, ) : GameEvent { + @Serializable(with = UUIDSerializer::class) override val eventId: UUID = UUID.randomUUID() override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/business/event/event/PlayerActionEvent.kt b/src/main/kotlin/eventDemo/business/event/event/PlayerActionEvent.kt index 0c26b54..78f707e 100644 --- a/src/main/kotlin/eventDemo/business/event/event/PlayerActionEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/PlayerActionEvent.kt @@ -1,7 +1,9 @@ package eventDemo.business.event.event import eventDemo.business.entity.Player +import kotlinx.serialization.Serializable +@Serializable sealed interface PlayerActionEvent : GameEvent { val player: Player } diff --git a/src/main/kotlin/eventDemo/business/event/event/PlayerChoseColorEvent.kt b/src/main/kotlin/eventDemo/business/event/event/PlayerChoseColorEvent.kt index 077a7bb..34ada14 100644 --- a/src/main/kotlin/eventDemo/business/event/event/PlayerChoseColorEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/PlayerChoseColorEvent.kt @@ -3,13 +3,16 @@ package eventDemo.business.event.event import eventDemo.business.entity.Card import eventDemo.business.entity.GameId import eventDemo.business.entity.Player +import eventDemo.configuration.serializer.UUIDSerializer import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable import java.util.UUID /** * This [GameEvent] is sent when a player chose a color. */ +@Serializable data class PlayerChoseColorEvent( override val aggregateId: GameId, override val player: Player, @@ -17,6 +20,7 @@ data class PlayerChoseColorEvent( override val version: Int, ) : GameEvent, PlayerActionEvent { + @Serializable(with = UUIDSerializer::class) override val eventId: UUID = UUID.randomUUID() override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/business/event/event/PlayerHavePassEvent.kt b/src/main/kotlin/eventDemo/business/event/event/PlayerHavePassEvent.kt index aa9ba75..bcdd50a 100644 --- a/src/main/kotlin/eventDemo/business/event/event/PlayerHavePassEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/PlayerHavePassEvent.kt @@ -3,13 +3,16 @@ package eventDemo.business.event.event import eventDemo.business.entity.Card import eventDemo.business.entity.GameId import eventDemo.business.entity.Player +import eventDemo.configuration.serializer.UUIDSerializer import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable import java.util.UUID /** * This [GameEvent] is sent when a player can play. */ +@Serializable data class PlayerHavePassEvent( override val aggregateId: GameId, override val player: Player, @@ -17,6 +20,7 @@ data class PlayerHavePassEvent( override val version: Int, ) : GameEvent, PlayerActionEvent { + @Serializable(with = UUIDSerializer::class) override val eventId: UUID = UUID.randomUUID() override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/business/event/event/PlayerReadyEvent.kt b/src/main/kotlin/eventDemo/business/event/event/PlayerReadyEvent.kt index b744323..20f451a 100644 --- a/src/main/kotlin/eventDemo/business/event/event/PlayerReadyEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/PlayerReadyEvent.kt @@ -2,18 +2,22 @@ package eventDemo.business.event.event import eventDemo.business.entity.GameId import eventDemo.business.entity.Player +import eventDemo.configuration.serializer.UUIDSerializer import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable import java.util.UUID /** * This [GameEvent] is sent when a player is ready. */ +@Serializable data class PlayerReadyEvent( override val aggregateId: GameId, val player: Player, override val version: Int, ) : GameEvent { + @Serializable(with = UUIDSerializer::class) override val eventId: UUID = UUID.randomUUID() override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/business/event/event/PlayerWinEvent.kt b/src/main/kotlin/eventDemo/business/event/event/PlayerWinEvent.kt index f2dbf36..4540b46 100644 --- a/src/main/kotlin/eventDemo/business/event/event/PlayerWinEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/PlayerWinEvent.kt @@ -2,18 +2,22 @@ package eventDemo.business.event.event import eventDemo.business.entity.GameId import eventDemo.business.entity.Player +import eventDemo.configuration.serializer.UUIDSerializer import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable import java.util.UUID /** * This [GameEvent] is sent when a player is ready. */ +@Serializable data class PlayerWinEvent( override val aggregateId: GameId, val player: Player, override val version: Int, ) : GameEvent { + @Serializable(with = UUIDSerializer::class) override val eventId: UUID = UUID.randomUUID() override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt index 46e97fe..c70e9cc 100644 --- a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt +++ b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt @@ -4,41 +4,22 @@ import eventDemo.adapter.infrastructureLayer.event.GameEventBusInMemory import eventDemo.adapter.infrastructureLayer.event.GameEventStoreInMemory import eventDemo.adapter.infrastructureLayer.event.projection.GameListRepositoryInMemory import eventDemo.adapter.infrastructureLayer.event.projection.GameProjectionBusInMemory -import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInMemory +import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInRedis import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventStore import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.gameList.GameListRepository import eventDemo.business.event.projection.gameState.GameStateRepository import eventDemo.libs.event.projection.SnapshotConfig -import kotlinx.serialization.KSerializer -import kotlinx.serialization.json.Json -import kotlinx.serialization.serializer import org.koin.core.module.Module import org.koin.core.module.dsl.singleOf import org.koin.dsl.bind import redis.clients.jedis.JedisPooled import redis.clients.jedis.UnifiedJedis -import redis.clients.jedis.json.JsonObjectMapper fun Module.configureDIInfrastructure(redisUrl: String) { factory { - JedisPooled(redisUrl).apply { - setJsonObjectMapper( - object : JsonObjectMapper { - override fun fromJson( - value: String, - valueType: Class, - ): T { - val s: KSerializer = serializer(valueType) as KSerializer - return Json.decodeFromString(s, value) - } - - override fun toJson(value: Any): String = - Json.encodeToString(value) - }, - ) - } + JedisPooled(redisUrl) } bind UnifiedJedis::class singleOf(::GameEventBusInMemory) bind GameEventBus::class @@ -46,7 +27,7 @@ fun Module.configureDIInfrastructure(redisUrl: String) { singleOf(::GameProjectionBusInMemory) bind GameProjectionBus::class single { - GameStateRepositoryInMemory(get(), get(), get(), snapshotConfig = SnapshotConfig()) + GameStateRepositoryInRedis(get(), get(), get(), get(), snapshotConfig = SnapshotConfig()) } bind GameStateRepository::class single { diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt index ad783dc..928b82e 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt @@ -61,7 +61,8 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID override fun getLast(aggregateId: ID): P = jedis .get(projectionClass.redisKeyLatest(aggregateId)) - .let(jsonToProjection) + ?.let(jsonToProjection) + ?: initialStateBuilder(aggregateId) /** * Build the [Projection] to the specific [event][Event]. diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt index 3749b16..5e7f1c6 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt @@ -22,12 +22,12 @@ import eventDemo.business.notification.PlayerAsPlayACardNotification import eventDemo.business.notification.PlayerWasReadyNotification import eventDemo.business.notification.TheGameWasStartedNotification import eventDemo.business.notification.WelcomeToTheGameNotification +import eventDemo.configuration.injection.Configuration import eventDemo.configuration.injection.appKoinModule import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual -import io.mockk.mockk import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope @@ -171,7 +171,7 @@ class GameSimulationTest : } } - koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply { + koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply { val commandHandler by inject() val eventStore by inject() val playerNotificationListener by inject() diff --git a/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt b/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt index 58105a6..ddd1aa1 100644 --- a/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt +++ b/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt @@ -9,53 +9,57 @@ import eventDemo.business.event.projection.projectionListener.ReactionListener import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.Notification import eventDemo.business.notification.WelcomeToTheGameNotification +import eventDemo.configuration.injection.Configuration import eventDemo.configuration.injection.appKoinModule import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.equals.shouldBeEqual -import io.mockk.mockk import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout import org.koin.dsl.koinApplication import kotlin.test.assertIs +import kotlin.time.Duration.Companion.seconds @OptIn(DelicateCoroutinesApi::class) class GameCommandHandlerTest : FunSpec({ test("handle a command should execute the command") { - koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply { - val commandHandler by inject() - val notificationListener by inject() - val gameId = GameId() - val player = Player("Tesla") - val channelCommand = Channel(Channel.BUFFERED) - val channelNotification = Channel(Channel.BUFFERED) - ReactionListener(get(), get()).init() - notificationListener.startListening( - player, - gameId, - ) { channelNotification.trySendBlocking(it) } - - GlobalScope.launch { - commandHandler.handle( + withTimeout(1.seconds) { + koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply { + val commandHandler by inject() + val notificationListener by inject() + val gameId = GameId() + val player = Player("Tesla") + val channelCommand = Channel(Channel.BUFFERED) + val channelNotification = Channel(Channel.BUFFERED) + ReactionListener(get(), get()).init() + notificationListener.startListening( player, gameId, - channelCommand, - channelNotification, - ) - } + ) { channelNotification.trySendBlocking(it) } - IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)).also { sendCommand -> - channelCommand.send(sendCommand) - channelNotification.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id + GlobalScope.launch { + commandHandler.handle( + player, + gameId, + channelCommand, + channelNotification, + ) + } + + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)).also { sendCommand -> + channelCommand.send(sendCommand) + channelNotification.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + assertIs(channelNotification.receive()).let { + it.players shouldContain player } - } - assertIs(channelNotification.receive()).let { - it.players shouldContain player } } } diff --git a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt index 587bac5..5d18995 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt @@ -5,11 +5,11 @@ import eventDemo.business.entity.Player import eventDemo.business.event.GameEventHandler import eventDemo.business.event.event.NewPlayerEvent import eventDemo.business.event.projection.gameState.GameStateRepository +import eventDemo.configuration.injection.Configuration import eventDemo.configuration.injection.appKoinModule import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual -import io.mockk.mockk import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.joinAll @@ -26,7 +26,7 @@ class GameStateRepositoryTest : test("GameStateRepository should build the projection when a new event occurs") { val aggregateId = GameId() - koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply { + koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply { val repo = get() val eventHandler = get() eventHandler @@ -45,7 +45,7 @@ class GameStateRepositoryTest : test("get should build the last version of the state") { val aggregateId = GameId() - koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply { + koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply { val repo = get() val eventHandler = get() @@ -70,7 +70,7 @@ class GameStateRepositoryTest : test("getUntil should build the state until the event") { repeat(10) { val aggregateId = GameId() - koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply { + koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply { val repo = get() val eventHandler = get() @@ -99,7 +99,7 @@ class GameStateRepositoryTest : test("getUntil should be concurrently secure") { val aggregateId = GameId() - koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply { + koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply { val repo = get() val eventHandler = get()