create GameStateRepositoryInRedis
This commit is contained in:
@@ -0,0 +1,63 @@
|
||||
package eventDemo.adapter.infrastructureLayer.event.projection
|
||||
|
||||
import eventDemo.business.entity.GameId
|
||||
import eventDemo.business.event.GameEventBus
|
||||
import eventDemo.business.event.GameEventStore
|
||||
import eventDemo.business.event.event.GameEvent
|
||||
import eventDemo.business.event.projection.GameProjectionBus
|
||||
import eventDemo.business.event.projection.gameState.GameState
|
||||
import eventDemo.business.event.projection.gameState.GameStateRepository
|
||||
import eventDemo.business.event.projection.gameState.apply
|
||||
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis
|
||||
import eventDemo.libs.event.projection.SnapshotConfig
|
||||
import kotlinx.serialization.json.Json
|
||||
import redis.clients.jedis.UnifiedJedis
|
||||
|
||||
/**
|
||||
* Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus].
|
||||
*/
|
||||
class GameStateRepositoryInRedis(
|
||||
eventStore: GameEventStore,
|
||||
projectionBus: GameProjectionBus,
|
||||
eventBus: GameEventBus,
|
||||
jedis: UnifiedJedis,
|
||||
snapshotConfig: SnapshotConfig = SnapshotConfig(),
|
||||
) : GameStateRepository {
|
||||
private val projectionsSnapshot =
|
||||
ProjectionSnapshotRepositoryInRedis(
|
||||
eventStore = eventStore,
|
||||
snapshotCacheConfig = snapshotConfig,
|
||||
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },
|
||||
projectionClass = GameState::class,
|
||||
projectionToJson = { Json.encodeToString(GameState.serializer(), it) },
|
||||
jsonToProjection = { Json.decodeFromString(GameState.serializer(), it) },
|
||||
applyToProjection = GameState::apply,
|
||||
jedis = jedis,
|
||||
)
|
||||
|
||||
init {
|
||||
// On new event was received, build snapshot and publish it to the projection bus
|
||||
eventBus.subscribe { event ->
|
||||
projectionsSnapshot
|
||||
.applyAndPutToCache(event)
|
||||
.also { projectionBus.publish(it) }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last version of the [GameState] from the all eventStream.
|
||||
*
|
||||
* It fetches it from the local cache if possible, otherwise it builds it.
|
||||
*/
|
||||
override fun getLast(gameId: GameId): GameState =
|
||||
projectionsSnapshot.getLast(gameId)
|
||||
|
||||
/**
|
||||
* Get the [GameState] to the specific [event][GameEvent].
|
||||
* It does not contain the [events][GameEvent] it after this one.
|
||||
*
|
||||
* It fetches it from the local cache if possible, otherwise it builds it.
|
||||
*/
|
||||
override fun getUntil(event: GameEvent): GameState =
|
||||
projectionsSnapshot.getUntil(event)
|
||||
}
|
||||
@@ -3,13 +3,16 @@ package eventDemo.business.event.event
|
||||
import eventDemo.business.entity.Card
|
||||
import eventDemo.business.entity.GameId
|
||||
import eventDemo.business.entity.Player
|
||||
import eventDemo.configuration.serializer.UUIDSerializer
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.Serializable
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* An [GameEvent] to represent a played card.
|
||||
*/
|
||||
@Serializable
|
||||
data class CardIsPlayedEvent(
|
||||
override val aggregateId: GameId,
|
||||
val card: Card,
|
||||
@@ -17,6 +20,7 @@ data class CardIsPlayedEvent(
|
||||
override val version: Int,
|
||||
) : GameEvent,
|
||||
PlayerActionEvent {
|
||||
@Serializable(with = UUIDSerializer::class)
|
||||
override val eventId: UUID = UUID.randomUUID()
|
||||
override val createdAt: Instant = Clock.System.now()
|
||||
}
|
||||
|
||||
@@ -4,19 +4,23 @@ import eventDemo.business.entity.Deck
|
||||
import eventDemo.business.entity.GameId
|
||||
import eventDemo.business.entity.Player
|
||||
import eventDemo.business.entity.initHands
|
||||
import eventDemo.configuration.serializer.UUIDSerializer
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.Serializable
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* This [GameEvent] is sent when all players are ready.
|
||||
*/
|
||||
@Serializable
|
||||
data class GameStartedEvent(
|
||||
override val aggregateId: GameId,
|
||||
val firstPlayer: Player,
|
||||
val deck: Deck,
|
||||
override val version: Int,
|
||||
) : GameEvent {
|
||||
@Serializable(with = UUIDSerializer::class)
|
||||
override val eventId: UUID = UUID.randomUUID()
|
||||
override val createdAt: Instant = Clock.System.now()
|
||||
|
||||
|
||||
@@ -2,18 +2,22 @@ package eventDemo.business.event.event
|
||||
|
||||
import eventDemo.business.entity.GameId
|
||||
import eventDemo.business.entity.Player
|
||||
import eventDemo.configuration.serializer.UUIDSerializer
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.Serializable
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* An [GameEvent] to represent a new player joining the game.
|
||||
*/
|
||||
@Serializable
|
||||
data class NewPlayerEvent(
|
||||
override val aggregateId: GameId,
|
||||
val player: Player,
|
||||
override val version: Int,
|
||||
) : GameEvent {
|
||||
@Serializable(with = UUIDSerializer::class)
|
||||
override val eventId: UUID = UUID.randomUUID()
|
||||
override val createdAt: Instant = Clock.System.now()
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package eventDemo.business.event.event
|
||||
|
||||
import eventDemo.business.entity.Player
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
@Serializable
|
||||
sealed interface PlayerActionEvent : GameEvent {
|
||||
val player: Player
|
||||
}
|
||||
|
||||
@@ -3,13 +3,16 @@ package eventDemo.business.event.event
|
||||
import eventDemo.business.entity.Card
|
||||
import eventDemo.business.entity.GameId
|
||||
import eventDemo.business.entity.Player
|
||||
import eventDemo.configuration.serializer.UUIDSerializer
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.Serializable
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* This [GameEvent] is sent when a player chose a color.
|
||||
*/
|
||||
@Serializable
|
||||
data class PlayerChoseColorEvent(
|
||||
override val aggregateId: GameId,
|
||||
override val player: Player,
|
||||
@@ -17,6 +20,7 @@ data class PlayerChoseColorEvent(
|
||||
override val version: Int,
|
||||
) : GameEvent,
|
||||
PlayerActionEvent {
|
||||
@Serializable(with = UUIDSerializer::class)
|
||||
override val eventId: UUID = UUID.randomUUID()
|
||||
override val createdAt: Instant = Clock.System.now()
|
||||
}
|
||||
|
||||
@@ -3,13 +3,16 @@ package eventDemo.business.event.event
|
||||
import eventDemo.business.entity.Card
|
||||
import eventDemo.business.entity.GameId
|
||||
import eventDemo.business.entity.Player
|
||||
import eventDemo.configuration.serializer.UUIDSerializer
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.Serializable
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* This [GameEvent] is sent when a player can play.
|
||||
*/
|
||||
@Serializable
|
||||
data class PlayerHavePassEvent(
|
||||
override val aggregateId: GameId,
|
||||
override val player: Player,
|
||||
@@ -17,6 +20,7 @@ data class PlayerHavePassEvent(
|
||||
override val version: Int,
|
||||
) : GameEvent,
|
||||
PlayerActionEvent {
|
||||
@Serializable(with = UUIDSerializer::class)
|
||||
override val eventId: UUID = UUID.randomUUID()
|
||||
override val createdAt: Instant = Clock.System.now()
|
||||
}
|
||||
|
||||
@@ -2,18 +2,22 @@ package eventDemo.business.event.event
|
||||
|
||||
import eventDemo.business.entity.GameId
|
||||
import eventDemo.business.entity.Player
|
||||
import eventDemo.configuration.serializer.UUIDSerializer
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.Serializable
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* This [GameEvent] is sent when a player is ready.
|
||||
*/
|
||||
@Serializable
|
||||
data class PlayerReadyEvent(
|
||||
override val aggregateId: GameId,
|
||||
val player: Player,
|
||||
override val version: Int,
|
||||
) : GameEvent {
|
||||
@Serializable(with = UUIDSerializer::class)
|
||||
override val eventId: UUID = UUID.randomUUID()
|
||||
override val createdAt: Instant = Clock.System.now()
|
||||
}
|
||||
|
||||
@@ -2,18 +2,22 @@ package eventDemo.business.event.event
|
||||
|
||||
import eventDemo.business.entity.GameId
|
||||
import eventDemo.business.entity.Player
|
||||
import eventDemo.configuration.serializer.UUIDSerializer
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.Serializable
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* This [GameEvent] is sent when a player is ready.
|
||||
*/
|
||||
@Serializable
|
||||
data class PlayerWinEvent(
|
||||
override val aggregateId: GameId,
|
||||
val player: Player,
|
||||
override val version: Int,
|
||||
) : GameEvent {
|
||||
@Serializable(with = UUIDSerializer::class)
|
||||
override val eventId: UUID = UUID.randomUUID()
|
||||
override val createdAt: Instant = Clock.System.now()
|
||||
}
|
||||
|
||||
@@ -4,41 +4,22 @@ import eventDemo.adapter.infrastructureLayer.event.GameEventBusInMemory
|
||||
import eventDemo.adapter.infrastructureLayer.event.GameEventStoreInMemory
|
||||
import eventDemo.adapter.infrastructureLayer.event.projection.GameListRepositoryInMemory
|
||||
import eventDemo.adapter.infrastructureLayer.event.projection.GameProjectionBusInMemory
|
||||
import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInMemory
|
||||
import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInRedis
|
||||
import eventDemo.business.event.GameEventBus
|
||||
import eventDemo.business.event.GameEventStore
|
||||
import eventDemo.business.event.projection.GameProjectionBus
|
||||
import eventDemo.business.event.projection.gameList.GameListRepository
|
||||
import eventDemo.business.event.projection.gameState.GameStateRepository
|
||||
import eventDemo.libs.event.projection.SnapshotConfig
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.serializer
|
||||
import org.koin.core.module.Module
|
||||
import org.koin.core.module.dsl.singleOf
|
||||
import org.koin.dsl.bind
|
||||
import redis.clients.jedis.JedisPooled
|
||||
import redis.clients.jedis.UnifiedJedis
|
||||
import redis.clients.jedis.json.JsonObjectMapper
|
||||
|
||||
fun Module.configureDIInfrastructure(redisUrl: String) {
|
||||
factory {
|
||||
JedisPooled(redisUrl).apply {
|
||||
setJsonObjectMapper(
|
||||
object : JsonObjectMapper {
|
||||
override fun <T> fromJson(
|
||||
value: String,
|
||||
valueType: Class<T>,
|
||||
): T {
|
||||
val s: KSerializer<T> = serializer(valueType) as KSerializer<T>
|
||||
return Json.decodeFromString(s, value)
|
||||
}
|
||||
|
||||
override fun toJson(value: Any): String =
|
||||
Json.encodeToString(value)
|
||||
},
|
||||
)
|
||||
}
|
||||
JedisPooled(redisUrl)
|
||||
} bind UnifiedJedis::class
|
||||
|
||||
singleOf(::GameEventBusInMemory) bind GameEventBus::class
|
||||
@@ -46,7 +27,7 @@ fun Module.configureDIInfrastructure(redisUrl: String) {
|
||||
singleOf(::GameProjectionBusInMemory) bind GameProjectionBus::class
|
||||
|
||||
single {
|
||||
GameStateRepositoryInMemory(get(), get(), get(), snapshotConfig = SnapshotConfig())
|
||||
GameStateRepositoryInRedis(get(), get(), get(), get(), snapshotConfig = SnapshotConfig())
|
||||
} bind GameStateRepository::class
|
||||
|
||||
single {
|
||||
|
||||
@@ -61,7 +61,8 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
|
||||
override fun getLast(aggregateId: ID): P =
|
||||
jedis
|
||||
.get(projectionClass.redisKeyLatest(aggregateId))
|
||||
.let(jsonToProjection)
|
||||
?.let(jsonToProjection)
|
||||
?: initialStateBuilder(aggregateId)
|
||||
|
||||
/**
|
||||
* Build the [Projection] to the specific [event][Event].
|
||||
|
||||
@@ -22,12 +22,12 @@ import eventDemo.business.notification.PlayerAsPlayACardNotification
|
||||
import eventDemo.business.notification.PlayerWasReadyNotification
|
||||
import eventDemo.business.notification.TheGameWasStartedNotification
|
||||
import eventDemo.business.notification.WelcomeToTheGameNotification
|
||||
import eventDemo.configuration.injection.Configuration
|
||||
import eventDemo.configuration.injection.appKoinModule
|
||||
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
|
||||
import io.kotest.core.spec.style.FunSpec
|
||||
import io.kotest.matchers.collections.shouldHaveSize
|
||||
import io.kotest.matchers.equals.shouldBeEqual
|
||||
import io.mockk.mockk
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
@@ -171,7 +171,7 @@ class GameSimulationTest :
|
||||
}
|
||||
}
|
||||
|
||||
koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply {
|
||||
koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply {
|
||||
val commandHandler by inject<GameCommandHandler>()
|
||||
val eventStore by inject<GameEventStore>()
|
||||
val playerNotificationListener by inject<PlayerNotificationListener>()
|
||||
|
||||
@@ -9,53 +9,57 @@ import eventDemo.business.event.projection.projectionListener.ReactionListener
|
||||
import eventDemo.business.notification.CommandSuccessNotification
|
||||
import eventDemo.business.notification.Notification
|
||||
import eventDemo.business.notification.WelcomeToTheGameNotification
|
||||
import eventDemo.configuration.injection.Configuration
|
||||
import eventDemo.configuration.injection.appKoinModule
|
||||
import io.kotest.core.spec.style.FunSpec
|
||||
import io.kotest.matchers.collections.shouldContain
|
||||
import io.kotest.matchers.equals.shouldBeEqual
|
||||
import io.mockk.mockk
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withTimeout
|
||||
import org.koin.dsl.koinApplication
|
||||
import kotlin.test.assertIs
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
class GameCommandHandlerTest :
|
||||
FunSpec({
|
||||
test("handle a command should execute the command") {
|
||||
koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply {
|
||||
val commandHandler by inject<GameCommandHandler>()
|
||||
val notificationListener by inject<PlayerNotificationListener>()
|
||||
val gameId = GameId()
|
||||
val player = Player("Tesla")
|
||||
val channelCommand = Channel<GameCommand>(Channel.BUFFERED)
|
||||
val channelNotification = Channel<Notification>(Channel.BUFFERED)
|
||||
ReactionListener(get(), get()).init()
|
||||
notificationListener.startListening(
|
||||
player,
|
||||
gameId,
|
||||
) { channelNotification.trySendBlocking(it) }
|
||||
|
||||
GlobalScope.launch {
|
||||
commandHandler.handle(
|
||||
withTimeout(1.seconds) {
|
||||
koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply {
|
||||
val commandHandler by inject<GameCommandHandler>()
|
||||
val notificationListener by inject<PlayerNotificationListener>()
|
||||
val gameId = GameId()
|
||||
val player = Player("Tesla")
|
||||
val channelCommand = Channel<GameCommand>(Channel.BUFFERED)
|
||||
val channelNotification = Channel<Notification>(Channel.BUFFERED)
|
||||
ReactionListener(get(), get()).init()
|
||||
notificationListener.startListening(
|
||||
player,
|
||||
gameId,
|
||||
channelCommand,
|
||||
channelNotification,
|
||||
)
|
||||
}
|
||||
) { channelNotification.trySendBlocking(it) }
|
||||
|
||||
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)).also { sendCommand ->
|
||||
channelCommand.send(sendCommand)
|
||||
channelNotification.receive().let {
|
||||
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
|
||||
GlobalScope.launch {
|
||||
commandHandler.handle(
|
||||
player,
|
||||
gameId,
|
||||
channelCommand,
|
||||
channelNotification,
|
||||
)
|
||||
}
|
||||
|
||||
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)).also { sendCommand ->
|
||||
channelCommand.send(sendCommand)
|
||||
channelNotification.receive().let {
|
||||
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
|
||||
}
|
||||
}
|
||||
assertIs<WelcomeToTheGameNotification>(channelNotification.receive()).let {
|
||||
it.players shouldContain player
|
||||
}
|
||||
}
|
||||
assertIs<WelcomeToTheGameNotification>(channelNotification.receive()).let {
|
||||
it.players shouldContain player
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,11 @@ import eventDemo.business.entity.Player
|
||||
import eventDemo.business.event.GameEventHandler
|
||||
import eventDemo.business.event.event.NewPlayerEvent
|
||||
import eventDemo.business.event.projection.gameState.GameStateRepository
|
||||
import eventDemo.configuration.injection.Configuration
|
||||
import eventDemo.configuration.injection.appKoinModule
|
||||
import io.kotest.core.spec.style.FunSpec
|
||||
import io.kotest.matchers.collections.shouldHaveSize
|
||||
import io.kotest.matchers.equals.shouldBeEqual
|
||||
import io.mockk.mockk
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.joinAll
|
||||
@@ -26,7 +26,7 @@ class GameStateRepositoryTest :
|
||||
|
||||
test("GameStateRepository should build the projection when a new event occurs") {
|
||||
val aggregateId = GameId()
|
||||
koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply {
|
||||
koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply {
|
||||
val repo = get<GameStateRepository>()
|
||||
val eventHandler = get<GameEventHandler>()
|
||||
eventHandler
|
||||
@@ -45,7 +45,7 @@ class GameStateRepositoryTest :
|
||||
|
||||
test("get should build the last version of the state") {
|
||||
val aggregateId = GameId()
|
||||
koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply {
|
||||
koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply {
|
||||
val repo = get<GameStateRepository>()
|
||||
val eventHandler = get<GameEventHandler>()
|
||||
|
||||
@@ -70,7 +70,7 @@ class GameStateRepositoryTest :
|
||||
test("getUntil should build the state until the event") {
|
||||
repeat(10) {
|
||||
val aggregateId = GameId()
|
||||
koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply {
|
||||
koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply {
|
||||
val repo = get<GameStateRepository>()
|
||||
val eventHandler = get<GameEventHandler>()
|
||||
|
||||
@@ -99,7 +99,7 @@ class GameStateRepositoryTest :
|
||||
|
||||
test("getUntil should be concurrently secure") {
|
||||
val aggregateId = GameId()
|
||||
koinApplication { modules(appKoinModule(mockk(relaxed = true))) }.koin.apply {
|
||||
koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply {
|
||||
val repo = get<GameStateRepository>()
|
||||
val eventHandler = get<GameEventHandler>()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user