create EventStreamInMemoryTest

This commit is contained in:
2025-03-29 01:00:34 +01:00
parent cee57ad2a3
commit a427bf88c7
7 changed files with 115 additions and 14 deletions

View File

@@ -7,5 +7,5 @@ class EventStoreInMemory<E : Event<ID>, ID : AggregateId> : EventStore<E, ID> {
private val streams: ConcurrentMap<ID, EventStream<E>> = ConcurrentHashMap()
override fun getStream(aggregateId: ID): EventStream<E> =
streams.computeIfAbsent(aggregateId) { EventStreamInMemory() }
streams.computeIfAbsent(aggregateId) { EventStreamInMemory(aggregateId) }
}

View File

@@ -9,11 +9,18 @@ import java.util.concurrent.ConcurrentLinkedQueue
*
* All methods are implemented.
*/
class EventStreamInMemory<E : Event<*>> : EventStream<E> {
class EventStreamInMemory<E : Event<ID>, ID : AggregateId>(
val aggregateId: ID,
) : EventStream<E> {
private val logger = KotlinLogging.logger {}
private val events: Queue<E> = ConcurrentLinkedQueue()
override fun publish(event: E) {
if (event.aggregateId != aggregateId) {
throw EventStreamPublishException(
"You cannot publish this event in this stream because it has a different aggregateId!",
)
}
if (events.none { it.eventId == event.eventId }) {
events.add(event)
logger.info { "Event published" }

View File

@@ -0,0 +1,5 @@
package eventDemo.libs.event
class EventStreamPublishException(
override val message: String,
) : Exception(message)

View File

@@ -105,7 +105,7 @@ class ProjectionSnapshotRepositoryTest :
(0..9)
.map {
GlobalScope.launch {
(1..10).map {
(1..10).forEach {
val eventX =
lock.withLock {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)

View File

@@ -1,16 +1,85 @@
package eventDemo.libs.event
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.equals.shouldBeEqual
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.junit.jupiter.api.assertNull
import org.junit.jupiter.api.assertThrows
import kotlin.test.assertNotNull
@DelicateCoroutinesApi
class EventStreamInMemoryTest :
FunSpec({
fun streamWith3Events(block: EventStream<Event<IdTest>>.(id: IdTest) -> Unit): EventStream<Event<IdTest>> =
EventStreamInMemory(IdTest()).apply {
publish(EventXTest(aggregateId = aggregateId, version = 1, num = 1))
publish(EventXTest(aggregateId = aggregateId, version = 2, num = 2))
publish(EventXTest(aggregateId = aggregateId, version = 3, num = 3))
block(aggregateId)
}
xtest("publish should be concurrently secure") { }
test("readVersionBetween should only return the event of aggregate") {
streamWith3Events {
readVersionBetween(1..2) shouldHaveSize 2
readVersionBetween(1..1) shouldHaveSize 1
readVersionBetween(2..20) shouldHaveSize 2
readVersionBetween(4..20) shouldHaveSize 0
}
}
xtest("readLast should only return the event of aggregate") { }
xtest("readLast should return the last event of the aggregate") { }
test("readAll should only return the event of aggregate") {
streamWith3Events {
readAll() shouldHaveSize 3
readAll().also {
it.forEachIndexed { i, event ->
event.version shouldBeEqual i + 1
}
}
}
}
xtest("readLastOf should return the last event of the aggregate of the type") { }
test("getByVersion should only return the event with this version") {
streamWith3Events {
assertNotNull(getByVersion(1)).version shouldBeEqual 1
assertNotNull(getByVersion(2)).version shouldBeEqual 2
assertNotNull(getByVersion(3)).version shouldBeEqual 3
assertNull(getByVersion(4))
}
}
xtest("readAll should only return the event of aggregate") { }
test("readGreaterOfVersion should only return the events with greater version") {
streamWith3Events {
assertNotNull(readGreaterOfVersion(1)) shouldHaveSize 2
assertNotNull(readGreaterOfVersion(2)) shouldHaveSize 1
assertNotNull(readGreaterOfVersion(3)) shouldHaveSize 0
assertNotNull(readGreaterOfVersion(30)) shouldHaveSize 0
}
}
test("publish should be throw error when publish another aggregate event") {
EventStreamInMemory(IdTest()).apply {
assertThrows<EventStreamPublishException> { publish(EventXTest(aggregateId = IdTest(), version = 1, num = 1)) }
}
}
test("publish should be concurrently secure") {
val id = IdTest()
val stream = EventStreamInMemory(id)
(1..10).forEach { i1 ->
GlobalScope.launch {
(1..10).forEach { i2 ->
stream.publish(
EventXTest(
aggregateId = id,
version = (i1 * 10) + i2,
num = (i1 * 10) + i2,
),
)
}
}
}
}
})

View File

@@ -0,0 +1,26 @@
package eventDemo.libs.event
import eventDemo.configuration.serializer.UUIDSerializer
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.Serializable
import java.util.UUID
@JvmInline
@Serializable
value class IdTest(
@Serializable(with = UUIDSerializer::class)
override val id: UUID = UUID.randomUUID(),
) : AggregateId
sealed interface TestEvents : Event<IdTest>
@Serializable
data class EventXTest(
@Serializable(with = UUIDSerializer::class)
override val eventId: UUID = UUID.randomUUID(),
override val aggregateId: IdTest,
override val createdAt: Instant = Clock.System.now(),
override val version: Int,
val num: Int,
) : TestEvents

View File

@@ -6,12 +6,6 @@ import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import java.util.UUID
@JvmInline
private value class IdTest(
override val id: UUID = UUID.randomUUID(),
) : AggregateId
@OptIn(DelicateCoroutinesApi::class)
class VersionBuilderLocalTest :