diff --git a/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt b/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt index b62dbf9..c7bffdb 100644 --- a/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt +++ b/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt @@ -13,7 +13,7 @@ import kotlin.concurrent.withLock */ class GameEventHandler( private val eventBus: GameEventBus, - private val eventStream: GameEventStream, + private val eventStore: GameEventStore, private val versionBuilder: VersionBuilder, ) : EventHandler { private val projectionsBuilders: ConcurrentLinkedQueue<(GameEvent) -> Unit> = ConcurrentLinkedQueue() @@ -31,7 +31,7 @@ class GameEventHandler( .computeIfAbsent(aggregateId) { ReentrantLock() } .withLock { buildEvent(versionBuilder.buildNextVersion(aggregateId)) - .also { eventStream.publish(it) } + .also { eventStore.publish(it) } }.also { event -> projectionsBuilders.forEach { it(event) } eventBus.publish(event) diff --git a/src/main/kotlin/eventDemo/app/event/GameEventStore.kt b/src/main/kotlin/eventDemo/app/event/GameEventStore.kt new file mode 100644 index 0000000..17bc5a0 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/event/GameEventStore.kt @@ -0,0 +1,12 @@ +package eventDemo.app.event + +import eventDemo.app.entity.GameId +import eventDemo.app.event.event.GameEvent +import eventDemo.libs.event.EventStore + +/** + * A stream to publish and read the played card event. + */ +class GameEventStore( + private val eventStore: EventStore, +) : EventStore by eventStore diff --git a/src/main/kotlin/eventDemo/app/event/GameEventStream.kt b/src/main/kotlin/eventDemo/app/event/GameEventStream.kt index 9a02310..bfeefd7 100644 --- a/src/main/kotlin/eventDemo/app/event/GameEventStream.kt +++ b/src/main/kotlin/eventDemo/app/event/GameEventStream.kt @@ -1,6 +1,5 @@ package eventDemo.app.event -import eventDemo.app.entity.GameId import eventDemo.app.event.event.GameEvent import eventDemo.libs.event.EventStream @@ -8,8 +7,8 @@ import eventDemo.libs.event.EventStream * A stream to publish and read the played card event. */ class GameEventStream( - private val eventStream: EventStream, -) : EventStream by eventStream { + private val eventStream: EventStream, +) : EventStream by eventStream { override fun publish(event: GameEvent) { eventStream.publish(event) } diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt b/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt index 6580ee7..762089d 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt @@ -2,17 +2,17 @@ package eventDemo.app.event.projection import eventDemo.app.entity.GameId import eventDemo.app.event.GameEventHandler -import eventDemo.app.event.GameEventStream +import eventDemo.app.event.GameEventStore import eventDemo.app.event.event.GameEvent class GameStateRepository( - eventStream: GameEventStream, + eventStore: GameEventStore, eventHandler: GameEventHandler, snapshotConfig: SnapshotConfig = SnapshotConfig(), ) { private val projectionsSnapshot = ProjectionSnapshotRepositoryInMemory( - eventStream = eventStream, + eventStore = eventStore, snapshotCacheConfig = snapshotConfig, applyToProjection = GameState::apply, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, diff --git a/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt b/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt index e91e273..3dd1985 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt @@ -2,6 +2,7 @@ package eventDemo.app.event.projection import eventDemo.libs.event.AggregateId import eventDemo.libs.event.Event +import eventDemo.libs.event.EventStore import eventDemo.libs.event.EventStream import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.datetime.Clock @@ -21,7 +22,7 @@ data class SnapshotConfig( ) class ProjectionSnapshotRepositoryInMemory, P : Projection, ID : AggregateId>( - private val eventStream: EventStream, + private val eventStore: EventStore, private val initialStateBuilder: (ID) -> P, private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(), private val applyToProjection: P.(event: E) -> P, @@ -77,10 +78,9 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID } val missingEventOfSnapshot = - eventStream.readVersionBetween( - event.aggregateId, - (lastSnapshot?.lastEventVersion ?: 1)..event.version, - ) + eventStore + .getStream(event.aggregateId) + .readVersionBetween((lastSnapshot?.lastEventVersion ?: 1)..event.version) return if (lastSnapshot?.lastEventVersion == event.version) { lastSnapshot @@ -164,8 +164,9 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID private fun getEventAfterTheSnapshot( aggregateId: ID, snapshot: P?, - ) = eventStream - .readGreaterOfVersion(aggregateId, snapshot?.lastEventVersion ?: 0) + ) = eventStore + .getStream(aggregateId) + .readGreaterOfVersion(snapshot?.lastEventVersion ?: 0) /** * Apply events to the projection. diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt index 56552ef..a33f723 100644 --- a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt +++ b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt @@ -5,13 +5,13 @@ import eventDemo.app.command.GameCommandRunner import eventDemo.app.command.command.GameCommand import eventDemo.app.event.GameEventBus import eventDemo.app.event.GameEventHandler -import eventDemo.app.event.GameEventStream +import eventDemo.app.event.GameEventStore import eventDemo.app.event.projection.GameStateRepository import eventDemo.app.event.projection.SnapshotConfig import eventDemo.app.eventListener.PlayerNotificationEventListener import eventDemo.libs.command.CommandStreamChannelBuilder import eventDemo.libs.event.EventBusInMemory -import eventDemo.libs.event.EventStreamInMemory +import eventDemo.libs.event.EventStoreInMemory import eventDemo.libs.event.VersionBuilder import eventDemo.libs.event.VersionBuilderLocal import io.ktor.server.application.Application @@ -35,7 +35,7 @@ val appKoinModule = GameEventBus(EventBusInMemory()) } single { - GameEventStream(EventStreamInMemory()) + GameEventStore(EventStoreInMemory()) } single { GameStateRepository(get(), get(), snapshotConfig = SnapshotConfig()) diff --git a/src/main/kotlin/eventDemo/libs/event/EventStore.kt b/src/main/kotlin/eventDemo/libs/event/EventStore.kt new file mode 100644 index 0000000..8717332 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/EventStore.kt @@ -0,0 +1,7 @@ +package eventDemo.libs.event + +interface EventStore, ID : AggregateId> { + fun getStream(aggregateId: ID): EventStream + + fun publish(event: E) +} diff --git a/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt new file mode 100644 index 0000000..fd3480a --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt @@ -0,0 +1,12 @@ +package eventDemo.libs.event + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + +class EventStoreInMemory, ID : AggregateId> : EventStore { + private val streams: ConcurrentMap> = ConcurrentHashMap() + + override fun getStream(aggregateId: ID): EventStream = streams.computeIfAbsent(aggregateId) { EventStreamInMemory() } + + override fun publish(event: E) = getStream(event.aggregateId).publish(event) +} diff --git a/src/main/kotlin/eventDemo/libs/event/EventStream.kt b/src/main/kotlin/eventDemo/libs/event/EventStream.kt index c9bbcf7..0657bcf 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStream.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStream.kt @@ -1,36 +1,19 @@ package eventDemo.libs.event -import kotlin.reflect.KClass - /** * Interface representing an event stream for publishing and reading domain events */ -interface EventStream, ID : AggregateId> { +interface EventStream> { /** Publishes a single event to the event stream */ fun publish(event: E) /** Publishes multiple events to the event stream */ fun publish(vararg events: E) - /** Reads the last event associated with a given aggregate ID */ - fun readLast(aggregateId: ID): E? + /** Reads all events */ + fun readAll(): Set - /** Reads the last event of a specific type associated with a given aggregate ID */ - fun readLastOf( - aggregateId: ID, - eventType: KClass, - ): R? + fun readGreaterOfVersion(version: Int): Set - /** Reads all events associated with a given aggregate ID */ - fun readAll(aggregateId: ID): Set - - fun readGreaterOfVersion( - aggregateId: ID, - version: Int, - ): Set - - fun readVersionBetween( - aggregateId: ID, - version: IntRange, - ): Set + fun readVersionBetween(version: IntRange): Set } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt index f5837a3..8d37f20 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt @@ -3,14 +3,13 @@ package eventDemo.libs.event import io.github.oshai.kotlinlogging.KotlinLogging import java.util.Queue import java.util.concurrent.ConcurrentLinkedQueue -import kotlin.reflect.KClass /** * An In-Memory implementation of an event stream. * * All methods are implemented. */ -class EventStreamInMemory, ID : AggregateId> : EventStream { +class EventStreamInMemory> : EventStream { private val logger = KotlinLogging.logger {} private val events: Queue = ConcurrentLinkedQueue() @@ -28,39 +27,15 @@ class EventStreamInMemory, ID : AggregateId> : EventStream events.forEach { publish(it) } } - override fun readLast(aggregateId: ID): E? = events.lastOrNull() + override fun readAll(): Set = events.toSet() - override fun readLastOf( - aggregateId: ID, - eventType: KClass, - ): R? = + override fun readGreaterOfVersion(version: Int): Set = events - .filterIsInstance(eventType.java) - .lastOrNull { it.aggregateId == aggregateId } - - override fun readAll(aggregateId: ID): Set = - events - .filter { it.aggregateId == aggregateId } - .toSet() - - override fun readGreaterOfVersion( - aggregateId: ID, - version: Int, - ): Set = - events - .filter { it.aggregateId == aggregateId } .filter { it.version > version } .toSet() - override fun readVersionBetween( - aggregateId: ID, - version: IntRange, - ): Set = + override fun readVersionBetween(version: IntRange): Set = events - .filter { it.aggregateId == aggregateId } .filter { version.contains(it.version) } .toSet() } - -inline fun , ID : AggregateId> EventStream.readLastOf(aggregateId: ID): R? = - readLastOf(aggregateId, R::class) diff --git a/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt b/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt index a72e5f2..2c9ff61 100644 --- a/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt +++ b/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt @@ -2,8 +2,8 @@ package eventDemo.app.event.projection import eventDemo.libs.event.AggregateId import eventDemo.libs.event.Event -import eventDemo.libs.event.EventStream -import eventDemo.libs.event.EventStreamInMemory +import eventDemo.libs.event.EventStore +import eventDemo.libs.event.EventStoreInMemory import eventDemo.libs.event.VersionBuilderLocal import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.equals.shouldBeEqual @@ -23,19 +23,19 @@ class ProjectionSnapshotRepositoryInMemoryTest : FunSpec({ test("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { - val eventStream: EventStream = EventStreamInMemory() - val repo = getSnapshotRepoTest(eventStream) + val eventStore: EventStore = EventStoreInMemory() + val repo = getSnapshotRepoTest(eventStore) val aggregateId = IdTest() val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) - eventStream.publish(eventOther) + eventStore.publish(eventOther) repo.applyAndPutToCache(eventOther) assertNotNull(repo.getUntil(eventOther)).also { assertNotNull(it.value) shouldBeEqual "valOther" } val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) - eventStream.publish(event1) + eventStore.publish(event1) repo.applyAndPutToCache(event1) assertNotNull(repo.getLast(event1.aggregateId)).also { assertNotNull(it.value) shouldBeEqual "val1" @@ -45,7 +45,7 @@ class ProjectionSnapshotRepositoryInMemoryTest : } val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) - eventStream.publish(event2) + eventStore.publish(event2) repo.applyAndPutToCache(event2) assertNotNull(repo.getLast(event2.aggregateId)).also { assertNotNull(it.value) shouldBeEqual "val1val2" @@ -59,8 +59,8 @@ class ProjectionSnapshotRepositoryInMemoryTest : } test("ProjectionSnapshotRepositoryInMemory should be thread safe") { - val eventStream: EventStream = EventStreamInMemory() - val repo = getSnapshotRepoTest(eventStream) + val eventStore: EventStore = EventStoreInMemory() + val repo = getSnapshotRepoTest(eventStore) val aggregateId = IdTest() val versionBuilder = VersionBuilderLocal() val lock = ReentrantLock() @@ -71,7 +71,7 @@ class ProjectionSnapshotRepositoryInMemoryTest : val eventX = lock.withLock { EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) - .also { eventStream.publish(it) } + .also { eventStore.publish(it) } } repo.applyAndPutToCache(eventX) } @@ -82,13 +82,13 @@ class ProjectionSnapshotRepositoryInMemoryTest : test("removeOldSnapshot") { val versionBuilder = VersionBuilderLocal() - val eventStream: EventStream = EventStreamInMemory() - val repo = getSnapshotRepoTest(eventStream, SnapshotConfig(2)) + val eventStore: EventStore = EventStoreInMemory() + val repo = getSnapshotRepoTest(eventStore, SnapshotConfig(2)) val aggregateId = IdTest() fun buildEndSendEventX() { EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) - .also { eventStream.publish(it) } + .also { eventStore.publish(it) } .also { repo.applyAndPutToCache(it) } } @@ -142,11 +142,11 @@ private data class EventXTest( ) : TestEvents private fun getSnapshotRepoTest( - eventStream: EventStream, + eventStore: EventStore, snapshotConfig: SnapshotConfig = SnapshotConfig(2000), ): ProjectionSnapshotRepositoryInMemory = ProjectionSnapshotRepositoryInMemory( - eventStream = eventStream, + eventStore = eventStore, initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, snapshotCacheConfig = snapshotConfig, ) { event -> diff --git a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt index a9453cf..b0bfb1c 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt @@ -8,7 +8,7 @@ import eventDemo.app.command.command.IamReadyToPlayCommand import eventDemo.app.entity.Card import eventDemo.app.entity.GameId import eventDemo.app.entity.Player -import eventDemo.app.event.GameEventStream +import eventDemo.app.event.GameEventStore import eventDemo.app.event.event.disableShuffleDeck import eventDemo.app.event.projection.GameState import eventDemo.app.event.projection.ProjectionSnapshotRepositoryInMemory @@ -134,7 +134,7 @@ class GameStateTest : koinApplication { modules(appKoinModule) }.koin.apply { val commandHandler by inject() - val eventStream by inject() + val eventStore by inject() val playerNotificationListener by inject() ReactionEventListener(get(), get(), get()).init() playerNotificationListener.startListening(channelNotification1, player1) @@ -151,7 +151,7 @@ class GameStateTest : val state = ProjectionSnapshotRepositoryInMemory( - eventStream = eventStream, + eventStore = eventStore, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, applyToProjection = GameState::apply, ).getLast(id)