From d9e05e6d9ad9de47e82802a6d7ec335e07cff58b Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Sat, 29 Mar 2025 02:42:26 +0100 Subject: [PATCH] create EventStreamInPostgresql --- migrations/events/V1__init_event_table.sql | 7 + .../kotlin/eventDemo/libs/event/EventStore.kt | 2 +- .../libs/event/EventStoreInMemory.kt | 4 +- .../libs/event/EventStoreInPostgresql.kt | 12 ++ .../eventDemo/libs/event/EventStream.kt | 4 +- .../libs/event/EventStreamInMemory.kt | 4 +- .../libs/event/EventStreamInPostgresql.kt | 105 ++++++++++++++ src/test/kotlin/eventDemo/Helpers.kt | 7 +- .../libs/event/EventStreamInMemoryTest.kt | 131 +++++++++++------- .../kotlin/eventDemo/libs/event/TestEvents.kt | 1 + 10 files changed, 214 insertions(+), 63 deletions(-) create mode 100644 migrations/events/V1__init_event_table.sql create mode 100644 src/main/kotlin/eventDemo/libs/event/EventStoreInPostgresql.kt create mode 100644 src/main/kotlin/eventDemo/libs/event/EventStreamInPostgresql.kt diff --git a/migrations/events/V1__init_event_table.sql b/migrations/events/V1__init_event_table.sql new file mode 100644 index 0000000..6811f49 --- /dev/null +++ b/migrations/events/V1__init_event_table.sql @@ -0,0 +1,7 @@ +create table event_stream ( + id uuid not null primary key, + aggregate_id uuid not null, + version int not null, + data jsonb not null, + unique(aggregate_id, version) +); \ No newline at end of file diff --git a/src/main/kotlin/eventDemo/libs/event/EventStore.kt b/src/main/kotlin/eventDemo/libs/event/EventStore.kt index 9329c9e..bc7a75b 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStore.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStore.kt @@ -3,7 +3,7 @@ package eventDemo.libs.event import io.github.oshai.kotlinlogging.withLoggingContext interface EventStore, ID : AggregateId> { - fun getStream(aggregateId: ID): EventStream + fun getStream(aggregateId: ID): EventStream fun publish(event: E) = withLoggingContext("event" to event.toString()) { diff --git a/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt index ba03975..9021a76 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt @@ -4,8 +4,8 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap class EventStoreInMemory, ID : AggregateId> : EventStore { - private val streams: ConcurrentMap> = ConcurrentHashMap() + private val streams: ConcurrentMap> = ConcurrentHashMap() - override fun getStream(aggregateId: ID): EventStream = + override fun getStream(aggregateId: ID): EventStream = streams.computeIfAbsent(aggregateId) { EventStreamInMemory(aggregateId) } } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStoreInPostgresql.kt b/src/main/kotlin/eventDemo/libs/event/EventStoreInPostgresql.kt new file mode 100644 index 0000000..b15859e --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/EventStoreInPostgresql.kt @@ -0,0 +1,12 @@ +package eventDemo.libs.event + +import javax.sql.DataSource + +class EventStoreInPostgresql, ID : AggregateId>( + private val dataSource: DataSource, + private val objectToString: (E) -> String, + private val stringToObject: (String) -> E, +) : EventStore { + override fun getStream(aggregateId: ID): EventStream = + EventStreamInPostgresql(aggregateId, dataSource, objectToString, stringToObject) +} diff --git a/src/main/kotlin/eventDemo/libs/event/EventStream.kt b/src/main/kotlin/eventDemo/libs/event/EventStream.kt index 0df6991..5cf7a82 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStream.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStream.kt @@ -6,7 +6,9 @@ import io.github.oshai.kotlinlogging.withLoggingContext /** * Interface representing an event stream for publishing and reading domain events */ -interface EventStream> { +interface EventStream, ID : AggregateId> { + val aggregateId: ID + /** Publishes a single event to the event stream */ fun publish(event: E) diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt index 036a042..48607ad 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt @@ -10,8 +10,8 @@ import java.util.concurrent.ConcurrentLinkedQueue * All methods are implemented. */ class EventStreamInMemory, ID : AggregateId>( - val aggregateId: ID, -) : EventStream { + override val aggregateId: ID, +) : EventStream { private val logger = KotlinLogging.logger {} private val events: Queue = ConcurrentLinkedQueue() diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamInPostgresql.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamInPostgresql.kt new file mode 100644 index 0000000..fe81fe5 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamInPostgresql.kt @@ -0,0 +1,105 @@ +package eventDemo.libs.event + +import io.github.oshai.kotlinlogging.KotlinLogging +import org.postgresql.util.PGobject +import javax.sql.DataSource + +/** + * An In-Memory implementation of an event stream. + * + * All methods are implemented. + */ +class EventStreamInPostgresql, ID : AggregateId>( + override val aggregateId: ID, + private val dataSource: DataSource, + private val objectToString: (E) -> String, + private val stringToObject: (String) -> E, +) : EventStream { + private val logger = KotlinLogging.logger {} + + override fun publish(event: E) { + if (event.aggregateId != aggregateId) { + throw EventStreamPublishException( + "You cannot publish this event in this stream because it has a different aggregateId!", + ) + } + dataSource.connection.use { connection -> + connection + .prepareStatement( + """ + insert into event_stream(id, aggregate_id, version, data) + values (?, ?, ?, ?) + """.trimIndent(), + ).use { + it.setObject(1, event.eventId) + it.setObject(2, event.aggregateId.id) + it.setInt(3, event.version) + it.setObject(4, PGJsonb(objectToString(event))) + it.executeUpdate() + } + } + logger.info { "Event published" } + } + + override fun readAll(): Set = + dataSource.connection.use { connection -> + connection + .prepareStatement( + """ + select data + from event_stream + where aggregate_id = ? + order by version asc + """.trimIndent(), + ).use { + it.setObject(1, aggregateId.id) + it.executeQuery().use { resultSet -> + buildSet { + while (resultSet.next()) { + resultSet + .getString("data") + .let(stringToObject) + .let { add(it) } + } + } + } + } + } + + override fun readVersionBetween(version: IntRange): Set = + dataSource.connection.use { connection -> + connection + .prepareStatement( + """ + select data + from event_stream + where version between ? and ? + and aggregate_id = ? + order by version asc + """.trimIndent(), + ).use { stmt -> + stmt.setInt(1, version.first) + stmt.setInt(2, version.last) + stmt.setObject(3, aggregateId.id) + stmt.executeQuery().use { resultSet -> + buildSet { + while (resultSet.next()) { + resultSet + .getString("data") + .let(stringToObject) + .let { add(it) } + } + } + } + } + } +} + +class PGJsonb( + value: String, +) : PGobject() { + init { + this.value = value + this.type = "jsonb" + } +} diff --git a/src/test/kotlin/eventDemo/Helpers.kt b/src/test/kotlin/eventDemo/Helpers.kt index 2a2a1f1..880360a 100644 --- a/src/test/kotlin/eventDemo/Helpers.kt +++ b/src/test/kotlin/eventDemo/Helpers.kt @@ -19,11 +19,8 @@ fun Deck.allCards(): Set = stack + discard + playersHands.values.flatten() @KoinApplicationDslMarker -suspend fun testKoinApplicationWithConfig(block: suspend Koin.() -> Unit) { - koinApplication { modules(appKoinModule(ApplicationConfig("application.conf").configuration())) }.koin.apply { - block() - } -} +suspend fun testKoinApplicationWithConfig(block: suspend Koin.() -> T): T = + koinApplication { modules(appKoinModule(ApplicationConfig("application.conf").configuration())) }.koin.block() @KtorDsl suspend fun testApplicationWithConfig(block: suspend ApplicationTestBuilder.(koin: Koin) -> Unit) { diff --git a/src/test/kotlin/eventDemo/libs/event/EventStreamInMemoryTest.kt b/src/test/kotlin/eventDemo/libs/event/EventStreamInMemoryTest.kt index c08235e..5702f98 100644 --- a/src/test/kotlin/eventDemo/libs/event/EventStreamInMemoryTest.kt +++ b/src/test/kotlin/eventDemo/libs/event/EventStreamInMemoryTest.kt @@ -1,11 +1,15 @@ package eventDemo.libs.event +import eventDemo.testKoinApplicationWithConfig import io.kotest.core.spec.style.FunSpec +import io.kotest.datatest.withData import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import kotlinx.serialization.json.Json import org.junit.jupiter.api.assertNull import org.junit.jupiter.api.assertThrows import kotlin.test.assertNotNull @@ -13,73 +17,96 @@ import kotlin.test.assertNotNull @DelicateCoroutinesApi class EventStreamInMemoryTest : FunSpec({ - fun streamWith3Events(block: EventStream>.(id: IdTest) -> Unit): EventStream> = - EventStreamInMemory(IdTest()).apply { + fun EventStream.with3Events(block: EventStream.(id: IdTest) -> Unit) = + also { publish(EventXTest(aggregateId = aggregateId, version = 1, num = 1)) publish(EventXTest(aggregateId = aggregateId, version = 2, num = 2)) publish(EventXTest(aggregateId = aggregateId, version = 3, num = 3)) block(aggregateId) } - test("readVersionBetween should only return the event of aggregate") { - streamWith3Events { - readVersionBetween(1..2) shouldHaveSize 2 - readVersionBetween(1..1) shouldHaveSize 1 - readVersionBetween(2..20) shouldHaveSize 2 - readVersionBetween(4..20) shouldHaveSize 0 + suspend fun eventStreams(): List> = + testKoinApplicationWithConfig { + listOf( + EventStreamInMemory(IdTest()), + EventStreamInPostgresql( + IdTest(), + dataSource = get(), + objectToString = { Json.encodeToString(it) }, + stringToObject = { Json.decodeFromString(it) }, + ), + ) + } + + context("readVersionBetween should only return the event of aggregate") { + withData(eventStreams()) { stream -> + stream.with3Events { + readVersionBetween(1..2) shouldHaveSize 2 + readVersionBetween(1..1) shouldHaveSize 1 + readVersionBetween(2..20) shouldHaveSize 2 + readVersionBetween(4..20) shouldHaveSize 0 + } } } - test("readAll should only return the event of aggregate") { - streamWith3Events { - readAll() shouldHaveSize 3 - readAll().also { - it.forEachIndexed { i, event -> - event.version shouldBeEqual i + 1 + context("readAll should only return the event of aggregate") { + withData(eventStreams()) { stream -> + stream.with3Events { + readAll() shouldHaveSize 3 + readAll().also { + it.forEachIndexed { i, event -> + event.version shouldBeEqual i + 1 + } } } } } - test("getByVersion should only return the event with this version") { - streamWith3Events { - assertNotNull(getByVersion(1)).version shouldBeEqual 1 - assertNotNull(getByVersion(2)).version shouldBeEqual 2 - assertNotNull(getByVersion(3)).version shouldBeEqual 3 - assertNull(getByVersion(4)) - } - } - - test("readGreaterOfVersion should only return the events with greater version") { - streamWith3Events { - assertNotNull(readGreaterOfVersion(1)) shouldHaveSize 2 - assertNotNull(readGreaterOfVersion(2)) shouldHaveSize 1 - assertNotNull(readGreaterOfVersion(3)) shouldHaveSize 0 - assertNotNull(readGreaterOfVersion(30)) shouldHaveSize 0 - } - } - - test("publish should be throw error when publish another aggregate event") { - EventStreamInMemory(IdTest()).apply { - assertThrows { publish(EventXTest(aggregateId = IdTest(), version = 1, num = 1)) } - } - } - - test("publish should be concurrently secure") { - val id = IdTest() - val stream = EventStreamInMemory(id) - (1..10).forEach { i1 -> - GlobalScope.launch { - (1..10).forEach { i2 -> - stream.publish( - EventXTest( - aggregateId = id, - version = (i1 * 10) + i2, - num = (i1 * 10) + i2, - ), - ) - } + context("getByVersion should only return the event with this version") { + withData(eventStreams()) { stream -> + stream.with3Events { + assertNotNull(getByVersion(1)).version shouldBeEqual 1 + assertNotNull(getByVersion(2)).version shouldBeEqual 2 + assertNotNull(getByVersion(3)).version shouldBeEqual 3 + assertNull(getByVersion(4)) } } } + + context("readGreaterOfVersion should only return the events with greater version") { + withData(eventStreams()) { + it.with3Events { + assertNotNull(readGreaterOfVersion(1)) shouldHaveSize 2 + assertNotNull(readGreaterOfVersion(2)) shouldHaveSize 1 + assertNotNull(readGreaterOfVersion(3)) shouldHaveSize 0 + assertNotNull(readGreaterOfVersion(30)) shouldHaveSize 0 + } + } + } + + context("publish should be throw error when publish another aggregate event") { + withData(eventStreams()) { + assertThrows { it.publish(EventXTest(aggregateId = IdTest(), version = 1, num = 1)) } + } + } + + context("publish should be concurrently secure") { + withData(eventStreams()) { stream -> + (0..9) + .map { i1 -> + GlobalScope.launch { + (1..10).forEach { i2 -> + stream.publish( + EventXTest( + aggregateId = stream.aggregateId, + version = (i1 * 10) + i2, + num = (i1 * 10) + i2, + ), + ) + } + } + }.joinAll() + stream.readAll() shouldHaveSize 100 + } + } }) diff --git a/src/test/kotlin/eventDemo/libs/event/TestEvents.kt b/src/test/kotlin/eventDemo/libs/event/TestEvents.kt index ec2caa8..19c4e8f 100644 --- a/src/test/kotlin/eventDemo/libs/event/TestEvents.kt +++ b/src/test/kotlin/eventDemo/libs/event/TestEvents.kt @@ -13,6 +13,7 @@ value class IdTest( override val id: UUID = UUID.randomUUID(), ) : AggregateId +@Serializable sealed interface TestEvents : Event @Serializable