From a427bf88c709e921a1c4b0b6f868739289b68daa Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Sat, 29 Mar 2025 01:00:34 +0100 Subject: [PATCH] create EventStreamInMemoryTest --- .../libs/event/EventStoreInMemory.kt | 2 +- .../libs/event/EventStreamInMemory.kt | 9 ++- .../libs/event/EventStreamPublishException.kt | 5 ++ .../ProjectionSnapshotRepositoryTest.kt | 2 +- .../libs/event/EventStreamInMemoryTest.kt | 79 +++++++++++++++++-- .../kotlin/eventDemo/libs/event/TestEvents.kt | 26 ++++++ .../libs/event/VersionBuilderLocalTest.kt | 6 -- 7 files changed, 115 insertions(+), 14 deletions(-) create mode 100644 src/main/kotlin/eventDemo/libs/event/EventStreamPublishException.kt create mode 100644 src/test/kotlin/eventDemo/libs/event/TestEvents.kt diff --git a/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt index bc4dae9..ba03975 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt @@ -7,5 +7,5 @@ class EventStoreInMemory, ID : AggregateId> : EventStore { private val streams: ConcurrentMap> = ConcurrentHashMap() override fun getStream(aggregateId: ID): EventStream = - streams.computeIfAbsent(aggregateId) { EventStreamInMemory() } + streams.computeIfAbsent(aggregateId) { EventStreamInMemory(aggregateId) } } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt index 9e884f9..036a042 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt @@ -9,11 +9,18 @@ import java.util.concurrent.ConcurrentLinkedQueue * * All methods are implemented. */ -class EventStreamInMemory> : EventStream { +class EventStreamInMemory, ID : AggregateId>( + val aggregateId: ID, +) : EventStream { private val logger = KotlinLogging.logger {} private val events: Queue = ConcurrentLinkedQueue() override fun publish(event: E) { + if (event.aggregateId != aggregateId) { + throw EventStreamPublishException( + "You cannot publish this event in this stream because it has a different aggregateId!", + ) + } if (events.none { it.eventId == event.eventId }) { events.add(event) logger.info { "Event published" } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamPublishException.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamPublishException.kt new file mode 100644 index 0000000..c2013f7 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamPublishException.kt @@ -0,0 +1,5 @@ +package eventDemo.libs.event + +class EventStreamPublishException( + override val message: String, +) : Exception(message) diff --git a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt index c1c800b..2a3ed42 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt @@ -105,7 +105,7 @@ class ProjectionSnapshotRepositoryTest : (0..9) .map { GlobalScope.launch { - (1..10).map { + (1..10).forEach { val eventX = lock.withLock { EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) diff --git a/src/test/kotlin/eventDemo/libs/event/EventStreamInMemoryTest.kt b/src/test/kotlin/eventDemo/libs/event/EventStreamInMemoryTest.kt index f8133ff..c08235e 100644 --- a/src/test/kotlin/eventDemo/libs/event/EventStreamInMemoryTest.kt +++ b/src/test/kotlin/eventDemo/libs/event/EventStreamInMemoryTest.kt @@ -1,16 +1,85 @@ package eventDemo.libs.event import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.equals.shouldBeEqual +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.junit.jupiter.api.assertNull +import org.junit.jupiter.api.assertThrows +import kotlin.test.assertNotNull +@DelicateCoroutinesApi class EventStreamInMemoryTest : FunSpec({ + fun streamWith3Events(block: EventStream>.(id: IdTest) -> Unit): EventStream> = + EventStreamInMemory(IdTest()).apply { + publish(EventXTest(aggregateId = aggregateId, version = 1, num = 1)) + publish(EventXTest(aggregateId = aggregateId, version = 2, num = 2)) + publish(EventXTest(aggregateId = aggregateId, version = 3, num = 3)) + block(aggregateId) + } - xtest("publish should be concurrently secure") { } + test("readVersionBetween should only return the event of aggregate") { + streamWith3Events { + readVersionBetween(1..2) shouldHaveSize 2 + readVersionBetween(1..1) shouldHaveSize 1 + readVersionBetween(2..20) shouldHaveSize 2 + readVersionBetween(4..20) shouldHaveSize 0 + } + } - xtest("readLast should only return the event of aggregate") { } - xtest("readLast should return the last event of the aggregate") { } + test("readAll should only return the event of aggregate") { + streamWith3Events { + readAll() shouldHaveSize 3 + readAll().also { + it.forEachIndexed { i, event -> + event.version shouldBeEqual i + 1 + } + } + } + } - xtest("readLastOf should return the last event of the aggregate of the type") { } + test("getByVersion should only return the event with this version") { + streamWith3Events { + assertNotNull(getByVersion(1)).version shouldBeEqual 1 + assertNotNull(getByVersion(2)).version shouldBeEqual 2 + assertNotNull(getByVersion(3)).version shouldBeEqual 3 + assertNull(getByVersion(4)) + } + } - xtest("readAll should only return the event of aggregate") { } + test("readGreaterOfVersion should only return the events with greater version") { + streamWith3Events { + assertNotNull(readGreaterOfVersion(1)) shouldHaveSize 2 + assertNotNull(readGreaterOfVersion(2)) shouldHaveSize 1 + assertNotNull(readGreaterOfVersion(3)) shouldHaveSize 0 + assertNotNull(readGreaterOfVersion(30)) shouldHaveSize 0 + } + } + + test("publish should be throw error when publish another aggregate event") { + EventStreamInMemory(IdTest()).apply { + assertThrows { publish(EventXTest(aggregateId = IdTest(), version = 1, num = 1)) } + } + } + + test("publish should be concurrently secure") { + val id = IdTest() + val stream = EventStreamInMemory(id) + (1..10).forEach { i1 -> + GlobalScope.launch { + (1..10).forEach { i2 -> + stream.publish( + EventXTest( + aggregateId = id, + version = (i1 * 10) + i2, + num = (i1 * 10) + i2, + ), + ) + } + } + } + } }) diff --git a/src/test/kotlin/eventDemo/libs/event/TestEvents.kt b/src/test/kotlin/eventDemo/libs/event/TestEvents.kt new file mode 100644 index 0000000..ec2caa8 --- /dev/null +++ b/src/test/kotlin/eventDemo/libs/event/TestEvents.kt @@ -0,0 +1,26 @@ +package eventDemo.libs.event + +import eventDemo.configuration.serializer.UUIDSerializer +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable +import java.util.UUID + +@JvmInline +@Serializable +value class IdTest( + @Serializable(with = UUIDSerializer::class) + override val id: UUID = UUID.randomUUID(), +) : AggregateId + +sealed interface TestEvents : Event + +@Serializable +data class EventXTest( + @Serializable(with = UUIDSerializer::class) + override val eventId: UUID = UUID.randomUUID(), + override val aggregateId: IdTest, + override val createdAt: Instant = Clock.System.now(), + override val version: Int, + val num: Int, +) : TestEvents diff --git a/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt b/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt index 8636890..927dec1 100644 --- a/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt +++ b/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt @@ -6,12 +6,6 @@ import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch -import java.util.UUID - -@JvmInline -private value class IdTest( - override val id: UUID = UUID.randomUUID(), -) : AggregateId @OptIn(DelicateCoroutinesApi::class) class VersionBuilderLocalTest :