Projection was now built on listener events

Create ProjectionBus and use it with listeners
add EventStream::getByVersion
This commit is contained in:
2025-03-18 21:48:37 +01:00
parent 908cc888ad
commit 8c1eabb9f5
22 changed files with 126 additions and 89 deletions

View File

@@ -4,7 +4,14 @@ import eventDemo.business.event.GameEventBus
import eventDemo.business.event.event.GameEvent
import eventDemo.libs.bus.Bus
import eventDemo.libs.bus.BusInMemory
import java.util.UUID
class GameEventBusInMemory :
GameEventBus(),
Bus<GameEvent> by BusInMemory()
GameEventBus,
Bus<GameEvent> by BusInMemory(),
Comparable<GameEventBusInMemory> {
private val instanceId: UUID = UUID.randomUUID()
override fun compareTo(other: GameEventBusInMemory): Int =
compareValues(instanceId, other.instanceId)
}

View File

@@ -1,8 +1,9 @@
package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventHandler
import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.gameList.GameList
import eventDemo.business.event.projection.gameList.GameListRepository
import eventDemo.business.event.projection.gameList.apply
@@ -10,9 +11,13 @@ import eventDemo.business.event.projection.gameState.GameState
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
/**
* Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus].
*/
class GameListRepositoryInMemory(
eventStore: GameEventStore,
eventHandler: GameEventHandler,
projectionBus: GameProjectionBus,
eventBus: GameEventBus,
snapshotConfig: SnapshotConfig = SnapshotConfig(),
) : GameListRepository {
private val projectionsSnapshot =
@@ -24,8 +29,10 @@ class GameListRepositoryInMemory(
)
init {
eventHandler.registerProjectionBuilder { event ->
projectionsSnapshot.applyAndPutToCache(event)
eventBus.subscribe { event ->
projectionsSnapshot
.applyAndPutToCache(event)
.also { projectionBus.publish(it) }
}
}

View File

@@ -0,0 +1,18 @@
package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId
import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.libs.bus.Bus
import eventDemo.libs.bus.BusInMemory
import eventDemo.libs.event.projection.Projection
import java.util.UUID
class GameProjectionBusInMemory :
GameProjectionBus,
Bus<Projection<GameId>> by BusInMemory(),
Comparable<GameProjectionBusInMemory> {
private val instanceId: UUID = UUID.randomUUID()
override fun compareTo(other: GameProjectionBusInMemory): Int =
compareValues(instanceId, other.instanceId)
}

View File

@@ -1,18 +1,23 @@
package eventDemo.adapter.infrastructureLayer.event.projection
import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventHandler
import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.event.GameEvent
import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.gameState.GameState
import eventDemo.business.event.projection.gameState.GameStateRepository
import eventDemo.business.event.projection.gameState.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
/**
* Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus].
*/
class GameStateRepositoryInMemory(
eventStore: GameEventStore,
eventHandler: GameEventHandler,
projectionBus: GameProjectionBus,
eventBus: GameEventBus,
snapshotConfig: SnapshotConfig = SnapshotConfig(),
) : GameStateRepository {
private val projectionsSnapshot =
@@ -24,8 +29,11 @@ class GameStateRepositoryInMemory(
)
init {
eventHandler.registerProjectionBuilder { event ->
projectionsSnapshot.applyAndPutToCache(event)
// On new event was received, build snapshot and publish it to the projection bus
eventBus.subscribe { event ->
projectionsSnapshot
.applyAndPutToCache(event)
.also { projectionBus.publish(it) }
}
}

View File

@@ -3,7 +3,7 @@ package eventDemo.adapter.interfaceLayer
import eventDemo.business.command.GameCommandHandler
import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player
import eventDemo.business.event.eventListener.PlayerNotificationEventListener
import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener
import eventDemo.business.notification.Notification
import eventDemo.libs.fromFrameChannel
import eventDemo.libs.toObjectChannel
@@ -24,7 +24,7 @@ import java.util.UUID
@DelicateCoroutinesApi
fun Route.gameWebSocket(
playerNotificationListener: PlayerNotificationEventListener,
playerNotificationListener: PlayerNotificationListener,
commandHandler: GameCommandHandler,
) {
authenticate {
@@ -43,7 +43,7 @@ fun Route.gameWebSocket(
private fun DefaultWebSocketServerSession.runWebSocket(
gameId: GameId,
commandHandler: GameCommandHandler,
playerNotificationListener: PlayerNotificationEventListener,
playerNotificationListener: PlayerNotificationListener,
) {
val currentPlayer = call.getPlayer()
val outgoingFrameChannel: SendChannel<Notification> = fromFrameChannel(outgoing)

View File

@@ -2,13 +2,5 @@ package eventDemo.business.event
import eventDemo.business.event.event.GameEvent
import eventDemo.libs.bus.Bus
import java.util.UUID
abstract class GameEventBus :
Bus<GameEvent>,
Comparable<GameEventBus> {
private val instanceId: UUID = UUID.randomUUID()
override fun compareTo(other: GameEventBus): Int =
compareValues(instanceId, other.instanceId)
}
interface GameEventBus : Bus<GameEvent>

View File

@@ -25,8 +25,7 @@ class GameEventHandler(
}
/**
* Build Event, and send it to the event store and bus.
* Build also the projections.
* Build Event then send it to the event store and bus.
*/
override fun handle(
aggregateId: GameId,
@@ -47,8 +46,6 @@ class GameEventHandler(
}
}.also { event ->
withLoggingContext("event" to event.toString()) {
// Build the projections
projectionsBuilders.forEach { it(event) }
// Publish to the bus
eventBus.publish(event)
}

View File

@@ -0,0 +1,7 @@
package eventDemo.business.event.projection
import eventDemo.business.entity.GameId
import eventDemo.libs.bus.Bus
import eventDemo.libs.event.projection.Projection
interface GameProjectionBus : Bus<Projection<GameId>>

View File

@@ -5,6 +5,9 @@ import eventDemo.business.entity.Player
import eventDemo.libs.event.projection.Projection
import kotlinx.serialization.Serializable
/**
* This [projection][Projection] is used to list all current games
*/
@Serializable
data class GameList(
override val aggregateId: GameId,

View File

@@ -4,9 +4,13 @@ import eventDemo.business.entity.Card
import eventDemo.business.entity.Deck
import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player
import eventDemo.business.event.event.GameEvent
import eventDemo.libs.event.projection.Projection
import kotlinx.serialization.Serializable
/**
* This [projection][Projection] is used for manage a game and theirs [card][Card]
*/
@Serializable
data class GameState(
override val aggregateId: GameId,
@@ -20,6 +24,7 @@ data class GameState(
val deck: Deck = Deck(players),
val isStarted: Boolean = false,
val playerWins: Set<Player> = emptySet(),
val lastEvent: GameEvent? = null,
) : Projection<GameId> {
enum class Direction {
CLOCKWISE,

View File

@@ -111,5 +111,6 @@ fun GameState.apply(event: GameEvent): GameState =
}
}.copy(
lastEventVersion = event.version,
lastEvent = event,
)
}

View File

@@ -1,18 +1,17 @@
package eventDemo.business.event.eventListener
package eventDemo.business.event.projection.projectionListener
import eventDemo.business.entity.Card
import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player
import eventDemo.business.event.GameEventBus
import eventDemo.business.event.event.CardIsPlayedEvent
import eventDemo.business.event.event.GameEvent
import eventDemo.business.event.event.GameStartedEvent
import eventDemo.business.event.event.NewPlayerEvent
import eventDemo.business.event.event.PlayerChoseColorEvent
import eventDemo.business.event.event.PlayerHavePassEvent
import eventDemo.business.event.event.PlayerReadyEvent
import eventDemo.business.event.event.PlayerWinEvent
import eventDemo.business.event.projection.gameState.GameStateRepository
import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.gameState.GameState
import eventDemo.business.notification.ItsTheTurnOfNotification
import eventDemo.business.notification.Notification
import eventDemo.business.notification.PlayerAsJoinTheGameNotification
@@ -27,9 +26,8 @@ import eventDemo.business.notification.YourNewCardNotification
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
class PlayerNotificationEventListener(
private val eventBus: GameEventBus,
private val gameStateRepository: GameStateRepository,
class PlayerNotificationListener(
private val projectionBus: GameProjectionBus,
) {
private val logger = KotlinLogging.logger {}
@@ -38,14 +36,10 @@ class PlayerNotificationEventListener(
currentPlayer: Player,
gameId: GameId,
) {
eventBus.subscribe { event: GameEvent ->
withLoggingContext("event" to event.toString()) {
if (event.aggregateId != gameId) {
return@subscribe
}
val currentState = gameStateRepository.getUntil(event)
projectionBus.subscribe { currentState ->
if (currentState !is GameState) return@subscribe
if (currentState.aggregateId != gameId) return@subscribe
withLoggingContext("projection" to currentState.toString()) {
fun Notification.send() {
withLoggingContext("notification" to this.toString()) {
if (currentState.players.contains(currentPlayer)) {
@@ -65,6 +59,10 @@ class PlayerNotificationEventListener(
player = currentState.currentPlayerTurn ?: error("No player turn defined"),
).send()
val event =
currentState.lastEvent
?: error("No last event in the GameState projection")
when (event) {
is NewPlayerEvent -> {
if (currentPlayer != event.player) {

View File

@@ -1,37 +1,35 @@
package eventDemo.business.event.eventListener
package eventDemo.business.event.projection.projectionListener
import eventDemo.business.event.GameEventBus
import eventDemo.business.entity.GameId
import eventDemo.business.event.GameEventHandler
import eventDemo.business.event.event.GameEvent
import eventDemo.business.event.event.GameStartedEvent
import eventDemo.business.event.event.PlayerReadyEvent
import eventDemo.business.event.event.PlayerWinEvent
import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.gameState.GameState
import eventDemo.business.event.projection.gameState.GameStateRepository
import eventDemo.libs.event.projection.Projection
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import java.util.concurrent.ConcurrentSkipListSet
class ReactionEventListener(
private val eventBus: GameEventBus,
class ReactionListener(
private val projectionBus: GameProjectionBus,
private val eventHandler: GameEventHandler,
private val gameStateRepository: GameStateRepository,
private val priority: Int = DEFAULT_PRIORITY,
) {
companion object Config {
const val DEFAULT_PRIORITY = -1000
val registeredListeners = ConcurrentSkipListSet<GameEventBus>()
val registeredListeners = ConcurrentSkipListSet<GameProjectionBus>()
}
private val logger = KotlinLogging.logger { }
fun init() {
if (registeredListeners.add(eventBus)) {
eventBus.subscribe(priority) { event: GameEvent ->
withLoggingContext("event" to event.toString()) {
val state = gameStateRepository.getUntil(event)
sendStartGameEvent(state, event)
sendWinnerEvent(state)
if (registeredListeners.add(projectionBus)) {
projectionBus.subscribe(priority) { projection: Projection<GameId> ->
if (projection !is GameState) return@subscribe
withLoggingContext("projection" to projection.toString()) {
sendStartGameEvent(projection)
sendWinnerEvent(projection)
}
}
} else {
@@ -39,10 +37,7 @@ class ReactionEventListener(
}
}
private fun sendStartGameEvent(
state: GameState,
event: GameEvent,
) {
private fun sendStartGameEvent(state: GameState) {
if (state.isReady && !state.isStarted) {
val reactionEvent =
eventHandler.handle(state.aggregateId) {
@@ -56,10 +51,6 @@ class ReactionEventListener(
message = "Reaction event was Send"
payload = mapOf("reactionEvent" to reactionEvent)
}
} else {
if (event is PlayerReadyEvent) {
logger.info { "All players was not ready ${state.readyPlayers}" }
}
}
}

View File

@@ -1,10 +1,10 @@
package eventDemo.configuration.business
import eventDemo.business.event.eventListener.ReactionEventListener
import eventDemo.business.event.projection.projectionListener.ReactionListener
import io.ktor.server.application.Application
import org.koin.ktor.ext.get
fun Application.configureGameListener() {
ReactionEventListener(get(), get(), get())
ReactionListener(get(), get())
.init()
}

View File

@@ -3,7 +3,7 @@ package eventDemo.configuration.injection
import eventDemo.business.command.GameCommandActionRunner
import eventDemo.business.command.GameCommandHandler
import eventDemo.business.event.GameEventHandler
import eventDemo.business.event.eventListener.PlayerNotificationEventListener
import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener
import org.koin.core.module.Module
import org.koin.core.module.dsl.singleOf
@@ -13,5 +13,5 @@ fun Module.configureDIBusiness() {
}
singleOf(::GameEventHandler)
singleOf(::GameCommandActionRunner)
singleOf(::PlayerNotificationEventListener)
singleOf(::PlayerNotificationListener)
}

View File

@@ -3,29 +3,28 @@ package eventDemo.configuration.injection
import eventDemo.adapter.infrastructureLayer.event.GameEventBusInMemory
import eventDemo.adapter.infrastructureLayer.event.GameEventStoreInMemory
import eventDemo.adapter.infrastructureLayer.event.projection.GameListRepositoryInMemory
import eventDemo.adapter.infrastructureLayer.event.projection.GameProjectionBusInMemory
import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInMemory
import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.gameList.GameListRepository
import eventDemo.business.event.projection.gameState.GameStateRepository
import eventDemo.libs.event.projection.SnapshotConfig
import org.koin.core.module.Module
import org.koin.core.module.dsl.singleOf
import org.koin.dsl.bind
fun Module.configureDIInfrastructure() {
single {
GameEventBusInMemory()
} bind GameEventBus::class
singleOf(::GameEventBusInMemory) bind GameEventBus::class
singleOf(::GameEventStoreInMemory) bind GameEventStore::class
singleOf(::GameProjectionBusInMemory) bind GameProjectionBus::class
single {
GameEventStoreInMemory()
} bind GameEventStore::class
single {
GameStateRepositoryInMemory(get(), get(), snapshotConfig = SnapshotConfig())
GameStateRepositoryInMemory(get(), get(), get(), snapshotConfig = SnapshotConfig())
} bind GameStateRepository::class
single {
GameListRepositoryInMemory(get(), get(), snapshotConfig = SnapshotConfig())
GameListRepositoryInMemory(get(), get(), get(), snapshotConfig = SnapshotConfig())
} bind GameListRepository::class
}

View File

@@ -2,14 +2,14 @@ package eventDemo.configuration.route
import eventDemo.adapter.interfaceLayer.gameWebSocket
import eventDemo.business.command.GameCommandHandler
import eventDemo.business.event.eventListener.PlayerNotificationEventListener
import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener
import io.ktor.server.application.Application
import io.ktor.server.routing.routing
import kotlinx.coroutines.DelicateCoroutinesApi
@OptIn(DelicateCoroutinesApi::class)
fun Application.declareWebSocketsGameRoute(
playerNotificationListener: PlayerNotificationEventListener,
playerNotificationListener: PlayerNotificationListener,
commandHandler: GameCommandHandler,
) {
routing {

View File

@@ -16,4 +16,6 @@ interface EventStream<E : Event<*>> {
fun readGreaterOfVersion(version: Int): Set<E>
fun readVersionBetween(version: IntRange): Set<E>
fun getByVersion(version: Int): E?
}

View File

@@ -41,4 +41,7 @@ class EventStreamInMemory<E : Event<*>> : EventStream<E> {
events
.filter { version.contains(it.version) }
.toSet()
override fun getByVersion(version: Int): E? =
events.find { version == it.version }
}

View File

@@ -50,13 +50,12 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
* 5. save it
* 6. remove old one
*/
fun applyAndPutToCache(event: E) {
fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
save(it)
removeOldSnapshot(it.aggregateId)
}
}
/**
* Build the list of all [Projections][Projection]