Split EventStream and EventStore

This commit is contained in:
2025-03-14 02:32:05 +01:00
parent 23cc3e3567
commit 492981bed0
12 changed files with 76 additions and 87 deletions

View File

@@ -13,7 +13,7 @@ import kotlin.concurrent.withLock
*/ */
class GameEventHandler( class GameEventHandler(
private val eventBus: GameEventBus, private val eventBus: GameEventBus,
private val eventStream: GameEventStream, private val eventStore: GameEventStore,
private val versionBuilder: VersionBuilder, private val versionBuilder: VersionBuilder,
) : EventHandler<GameEvent, GameId> { ) : EventHandler<GameEvent, GameId> {
private val projectionsBuilders: ConcurrentLinkedQueue<(GameEvent) -> Unit> = ConcurrentLinkedQueue() private val projectionsBuilders: ConcurrentLinkedQueue<(GameEvent) -> Unit> = ConcurrentLinkedQueue()
@@ -31,7 +31,7 @@ class GameEventHandler(
.computeIfAbsent(aggregateId) { ReentrantLock() } .computeIfAbsent(aggregateId) { ReentrantLock() }
.withLock { .withLock {
buildEvent(versionBuilder.buildNextVersion(aggregateId)) buildEvent(versionBuilder.buildNextVersion(aggregateId))
.also { eventStream.publish(it) } .also { eventStore.publish(it) }
}.also { event -> }.also { event ->
projectionsBuilders.forEach { it(event) } projectionsBuilders.forEach { it(event) }
eventBus.publish(event) eventBus.publish(event)

View File

@@ -0,0 +1,12 @@
package eventDemo.app.event
import eventDemo.app.entity.GameId
import eventDemo.app.event.event.GameEvent
import eventDemo.libs.event.EventStore
/**
* A stream to publish and read the played card event.
*/
class GameEventStore(
private val eventStore: EventStore<GameEvent, GameId>,
) : EventStore<GameEvent, GameId> by eventStore

View File

@@ -1,6 +1,5 @@
package eventDemo.app.event package eventDemo.app.event
import eventDemo.app.entity.GameId
import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameEvent
import eventDemo.libs.event.EventStream import eventDemo.libs.event.EventStream
@@ -8,8 +7,8 @@ import eventDemo.libs.event.EventStream
* A stream to publish and read the played card event. * A stream to publish and read the played card event.
*/ */
class GameEventStream( class GameEventStream(
private val eventStream: EventStream<GameEvent, GameId>, private val eventStream: EventStream<GameEvent>,
) : EventStream<GameEvent, GameId> by eventStream { ) : EventStream<GameEvent> by eventStream {
override fun publish(event: GameEvent) { override fun publish(event: GameEvent) {
eventStream.publish(event) eventStream.publish(event)
} }

View File

@@ -2,17 +2,17 @@ package eventDemo.app.event.projection
import eventDemo.app.entity.GameId import eventDemo.app.entity.GameId
import eventDemo.app.event.GameEventHandler import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.GameEventStream import eventDemo.app.event.GameEventStore
import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameEvent
class GameStateRepository( class GameStateRepository(
eventStream: GameEventStream, eventStore: GameEventStore,
eventHandler: GameEventHandler, eventHandler: GameEventHandler,
snapshotConfig: SnapshotConfig = SnapshotConfig(), snapshotConfig: SnapshotConfig = SnapshotConfig(),
) { ) {
private val projectionsSnapshot = private val projectionsSnapshot =
ProjectionSnapshotRepositoryInMemory( ProjectionSnapshotRepositoryInMemory(
eventStream = eventStream, eventStore = eventStore,
snapshotCacheConfig = snapshotConfig, snapshotCacheConfig = snapshotConfig,
applyToProjection = GameState::apply, applyToProjection = GameState::apply,
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },

View File

@@ -2,6 +2,7 @@ package eventDemo.app.event.projection
import eventDemo.libs.event.AggregateId import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStream import eventDemo.libs.event.EventStream
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.datetime.Clock import kotlinx.datetime.Clock
@@ -21,7 +22,7 @@ data class SnapshotConfig(
) )
class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>( class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val eventStream: EventStream<E, ID>, private val eventStore: EventStore<E, ID>,
private val initialStateBuilder: (ID) -> P, private val initialStateBuilder: (ID) -> P,
private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(), private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(),
private val applyToProjection: P.(event: E) -> P, private val applyToProjection: P.(event: E) -> P,
@@ -77,10 +78,9 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
} }
val missingEventOfSnapshot = val missingEventOfSnapshot =
eventStream.readVersionBetween( eventStore
event.aggregateId, .getStream(event.aggregateId)
(lastSnapshot?.lastEventVersion ?: 1)..event.version, .readVersionBetween((lastSnapshot?.lastEventVersion ?: 1)..event.version)
)
return if (lastSnapshot?.lastEventVersion == event.version) { return if (lastSnapshot?.lastEventVersion == event.version) {
lastSnapshot lastSnapshot
@@ -164,8 +164,9 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
private fun getEventAfterTheSnapshot( private fun getEventAfterTheSnapshot(
aggregateId: ID, aggregateId: ID,
snapshot: P?, snapshot: P?,
) = eventStream ) = eventStore
.readGreaterOfVersion(aggregateId, snapshot?.lastEventVersion ?: 0) .getStream(aggregateId)
.readGreaterOfVersion(snapshot?.lastEventVersion ?: 0)
/** /**
* Apply events to the projection. * Apply events to the projection.

View File

@@ -5,13 +5,13 @@ import eventDemo.app.command.GameCommandRunner
import eventDemo.app.command.command.GameCommand import eventDemo.app.command.command.GameCommand
import eventDemo.app.event.GameEventBus import eventDemo.app.event.GameEventBus
import eventDemo.app.event.GameEventHandler import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.GameEventStream import eventDemo.app.event.GameEventStore
import eventDemo.app.event.projection.GameStateRepository import eventDemo.app.event.projection.GameStateRepository
import eventDemo.app.event.projection.SnapshotConfig import eventDemo.app.event.projection.SnapshotConfig
import eventDemo.app.eventListener.PlayerNotificationEventListener import eventDemo.app.eventListener.PlayerNotificationEventListener
import eventDemo.libs.command.CommandStreamChannelBuilder import eventDemo.libs.command.CommandStreamChannelBuilder
import eventDemo.libs.event.EventBusInMemory import eventDemo.libs.event.EventBusInMemory
import eventDemo.libs.event.EventStreamInMemory import eventDemo.libs.event.EventStoreInMemory
import eventDemo.libs.event.VersionBuilder import eventDemo.libs.event.VersionBuilder
import eventDemo.libs.event.VersionBuilderLocal import eventDemo.libs.event.VersionBuilderLocal
import io.ktor.server.application.Application import io.ktor.server.application.Application
@@ -35,7 +35,7 @@ val appKoinModule =
GameEventBus(EventBusInMemory()) GameEventBus(EventBusInMemory())
} }
single { single {
GameEventStream(EventStreamInMemory()) GameEventStore(EventStoreInMemory())
} }
single { single {
GameStateRepository(get(), get(), snapshotConfig = SnapshotConfig()) GameStateRepository(get(), get(), snapshotConfig = SnapshotConfig())

View File

@@ -0,0 +1,7 @@
package eventDemo.libs.event
interface EventStore<E : Event<ID>, ID : AggregateId> {
fun getStream(aggregateId: ID): EventStream<E>
fun publish(event: E)
}

View File

@@ -0,0 +1,12 @@
package eventDemo.libs.event
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
class EventStoreInMemory<E : Event<ID>, ID : AggregateId> : EventStore<E, ID> {
private val streams: ConcurrentMap<ID, EventStream<E>> = ConcurrentHashMap()
override fun getStream(aggregateId: ID): EventStream<E> = streams.computeIfAbsent(aggregateId) { EventStreamInMemory() }
override fun publish(event: E) = getStream(event.aggregateId).publish(event)
}

View File

@@ -1,36 +1,19 @@
package eventDemo.libs.event package eventDemo.libs.event
import kotlin.reflect.KClass
/** /**
* Interface representing an event stream for publishing and reading domain events * Interface representing an event stream for publishing and reading domain events
*/ */
interface EventStream<E : Event<ID>, ID : AggregateId> { interface EventStream<E : Event<*>> {
/** Publishes a single event to the event stream */ /** Publishes a single event to the event stream */
fun publish(event: E) fun publish(event: E)
/** Publishes multiple events to the event stream */ /** Publishes multiple events to the event stream */
fun publish(vararg events: E) fun publish(vararg events: E)
/** Reads the last event associated with a given aggregate ID */ /** Reads all events */
fun readLast(aggregateId: ID): E? fun readAll(): Set<E>
/** Reads the last event of a specific type associated with a given aggregate ID */ fun readGreaterOfVersion(version: Int): Set<E>
fun <R : E> readLastOf(
aggregateId: ID,
eventType: KClass<out R>,
): R?
/** Reads all events associated with a given aggregate ID */ fun readVersionBetween(version: IntRange): Set<E>
fun readAll(aggregateId: ID): Set<E>
fun readGreaterOfVersion(
aggregateId: ID,
version: Int,
): Set<E>
fun readVersionBetween(
aggregateId: ID,
version: IntRange,
): Set<E>
} }

View File

@@ -3,14 +3,13 @@ package eventDemo.libs.event
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.Queue import java.util.Queue
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KClass
/** /**
* An In-Memory implementation of an event stream. * An In-Memory implementation of an event stream.
* *
* All methods are implemented. * All methods are implemented.
*/ */
class EventStreamInMemory<E : Event<ID>, ID : AggregateId> : EventStream<E, ID> { class EventStreamInMemory<E : Event<*>> : EventStream<E> {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private val events: Queue<E> = ConcurrentLinkedQueue() private val events: Queue<E> = ConcurrentLinkedQueue()
@@ -28,39 +27,15 @@ class EventStreamInMemory<E : Event<ID>, ID : AggregateId> : EventStream<E, ID>
events.forEach { publish(it) } events.forEach { publish(it) }
} }
override fun readLast(aggregateId: ID): E? = events.lastOrNull() override fun readAll(): Set<E> = events.toSet()
override fun <R : E> readLastOf( override fun readGreaterOfVersion(version: Int): Set<E> =
aggregateId: ID,
eventType: KClass<out R>,
): R? =
events events
.filterIsInstance(eventType.java)
.lastOrNull { it.aggregateId == aggregateId }
override fun readAll(aggregateId: ID): Set<E> =
events
.filter { it.aggregateId == aggregateId }
.toSet()
override fun readGreaterOfVersion(
aggregateId: ID,
version: Int,
): Set<E> =
events
.filter { it.aggregateId == aggregateId }
.filter { it.version > version } .filter { it.version > version }
.toSet() .toSet()
override fun readVersionBetween( override fun readVersionBetween(version: IntRange): Set<E> =
aggregateId: ID,
version: IntRange,
): Set<E> =
events events
.filter { it.aggregateId == aggregateId }
.filter { version.contains(it.version) } .filter { version.contains(it.version) }
.toSet() .toSet()
} }
inline fun <reified R : E, E : Event<ID>, ID : AggregateId> EventStream<E, ID>.readLastOf(aggregateId: ID): R? =
readLastOf(aggregateId, R::class)

View File

@@ -2,8 +2,8 @@ package eventDemo.app.event.projection
import eventDemo.libs.event.AggregateId import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStream import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStreamInMemory import eventDemo.libs.event.EventStoreInMemory
import eventDemo.libs.event.VersionBuilderLocal import eventDemo.libs.event.VersionBuilderLocal
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.equals.shouldBeEqual
@@ -23,19 +23,19 @@ class ProjectionSnapshotRepositoryInMemoryTest :
FunSpec({ FunSpec({
test("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { test("when call applyAndPutToCache, the getUntil method must be use the built projection cache") {
val eventStream: EventStream<TestEvents, IdTest> = EventStreamInMemory() val eventStore: EventStore<TestEvents, IdTest> = EventStoreInMemory()
val repo = getSnapshotRepoTest(eventStream) val repo = getSnapshotRepoTest(eventStore)
val aggregateId = IdTest() val aggregateId = IdTest()
val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest())
eventStream.publish(eventOther) eventStore.publish(eventOther)
repo.applyAndPutToCache(eventOther) repo.applyAndPutToCache(eventOther)
assertNotNull(repo.getUntil(eventOther)).also { assertNotNull(repo.getUntil(eventOther)).also {
assertNotNull(it.value) shouldBeEqual "valOther" assertNotNull(it.value) shouldBeEqual "valOther"
} }
val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId)
eventStream.publish(event1) eventStore.publish(event1)
repo.applyAndPutToCache(event1) repo.applyAndPutToCache(event1)
assertNotNull(repo.getLast(event1.aggregateId)).also { assertNotNull(repo.getLast(event1.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1" assertNotNull(it.value) shouldBeEqual "val1"
@@ -45,7 +45,7 @@ class ProjectionSnapshotRepositoryInMemoryTest :
} }
val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId)
eventStream.publish(event2) eventStore.publish(event2)
repo.applyAndPutToCache(event2) repo.applyAndPutToCache(event2)
assertNotNull(repo.getLast(event2.aggregateId)).also { assertNotNull(repo.getLast(event2.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1val2" assertNotNull(it.value) shouldBeEqual "val1val2"
@@ -59,8 +59,8 @@ class ProjectionSnapshotRepositoryInMemoryTest :
} }
test("ProjectionSnapshotRepositoryInMemory should be thread safe") { test("ProjectionSnapshotRepositoryInMemory should be thread safe") {
val eventStream: EventStream<TestEvents, IdTest> = EventStreamInMemory() val eventStore: EventStore<TestEvents, IdTest> = EventStoreInMemory()
val repo = getSnapshotRepoTest(eventStream) val repo = getSnapshotRepoTest(eventStore)
val aggregateId = IdTest() val aggregateId = IdTest()
val versionBuilder = VersionBuilderLocal() val versionBuilder = VersionBuilderLocal()
val lock = ReentrantLock() val lock = ReentrantLock()
@@ -71,7 +71,7 @@ class ProjectionSnapshotRepositoryInMemoryTest :
val eventX = val eventX =
lock.withLock { lock.withLock {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)
.also { eventStream.publish(it) } .also { eventStore.publish(it) }
} }
repo.applyAndPutToCache(eventX) repo.applyAndPutToCache(eventX)
} }
@@ -82,13 +82,13 @@ class ProjectionSnapshotRepositoryInMemoryTest :
test("removeOldSnapshot") { test("removeOldSnapshot") {
val versionBuilder = VersionBuilderLocal() val versionBuilder = VersionBuilderLocal()
val eventStream: EventStream<TestEvents, IdTest> = EventStreamInMemory() val eventStore: EventStore<TestEvents, IdTest> = EventStoreInMemory()
val repo = getSnapshotRepoTest(eventStream, SnapshotConfig(2)) val repo = getSnapshotRepoTest(eventStore, SnapshotConfig(2))
val aggregateId = IdTest() val aggregateId = IdTest()
fun buildEndSendEventX() { fun buildEndSendEventX() {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)
.also { eventStream.publish(it) } .also { eventStore.publish(it) }
.also { repo.applyAndPutToCache(it) } .also { repo.applyAndPutToCache(it) }
} }
@@ -142,11 +142,11 @@ private data class EventXTest(
) : TestEvents ) : TestEvents
private fun getSnapshotRepoTest( private fun getSnapshotRepoTest(
eventStream: EventStream<TestEvents, IdTest>, eventStore: EventStore<TestEvents, IdTest>,
snapshotConfig: SnapshotConfig = SnapshotConfig(2000), snapshotConfig: SnapshotConfig = SnapshotConfig(2000),
): ProjectionSnapshotRepositoryInMemory<TestEvents, ProjectionTest, IdTest> = ): ProjectionSnapshotRepositoryInMemory<TestEvents, ProjectionTest, IdTest> =
ProjectionSnapshotRepositoryInMemory( ProjectionSnapshotRepositoryInMemory(
eventStream = eventStream, eventStore = eventStore,
initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) },
snapshotCacheConfig = snapshotConfig, snapshotCacheConfig = snapshotConfig,
) { event -> ) { event ->

View File

@@ -8,7 +8,7 @@ import eventDemo.app.command.command.IamReadyToPlayCommand
import eventDemo.app.entity.Card import eventDemo.app.entity.Card
import eventDemo.app.entity.GameId import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventStream import eventDemo.app.event.GameEventStore
import eventDemo.app.event.event.disableShuffleDeck import eventDemo.app.event.event.disableShuffleDeck
import eventDemo.app.event.projection.GameState import eventDemo.app.event.projection.GameState
import eventDemo.app.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.app.event.projection.ProjectionSnapshotRepositoryInMemory
@@ -134,7 +134,7 @@ class GameStateTest :
koinApplication { modules(appKoinModule) }.koin.apply { koinApplication { modules(appKoinModule) }.koin.apply {
val commandHandler by inject<GameCommandHandler>() val commandHandler by inject<GameCommandHandler>()
val eventStream by inject<GameEventStream>() val eventStore by inject<GameEventStore>()
val playerNotificationListener by inject<PlayerNotificationEventListener>() val playerNotificationListener by inject<PlayerNotificationEventListener>()
ReactionEventListener(get(), get(), get()).init() ReactionEventListener(get(), get(), get()).init()
playerNotificationListener.startListening(channelNotification1, player1) playerNotificationListener.startListening(channelNotification1, player1)
@@ -151,7 +151,7 @@ class GameStateTest :
val state = val state =
ProjectionSnapshotRepositoryInMemory( ProjectionSnapshotRepositoryInMemory(
eventStream = eventStream, eventStore = eventStore,
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },
applyToProjection = GameState::apply, applyToProjection = GameState::apply,
).getLast(id) ).getLast(id)