create EventStreamInPostgresql

This commit is contained in:
2025-03-29 02:42:26 +01:00
parent a427bf88c7
commit d9e05e6d9a
10 changed files with 214 additions and 63 deletions

View File

@@ -0,0 +1,7 @@
create table event_stream (
id uuid not null primary key,
aggregate_id uuid not null,
version int not null,
data jsonb not null,
unique(aggregate_id, version)
);

View File

@@ -3,7 +3,7 @@ package eventDemo.libs.event
import io.github.oshai.kotlinlogging.withLoggingContext import io.github.oshai.kotlinlogging.withLoggingContext
interface EventStore<E : Event<ID>, ID : AggregateId> { interface EventStore<E : Event<ID>, ID : AggregateId> {
fun getStream(aggregateId: ID): EventStream<E> fun getStream(aggregateId: ID): EventStream<E, ID>
fun publish(event: E) = fun publish(event: E) =
withLoggingContext("event" to event.toString()) { withLoggingContext("event" to event.toString()) {

View File

@@ -4,8 +4,8 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap import java.util.concurrent.ConcurrentMap
class EventStoreInMemory<E : Event<ID>, ID : AggregateId> : EventStore<E, ID> { class EventStoreInMemory<E : Event<ID>, ID : AggregateId> : EventStore<E, ID> {
private val streams: ConcurrentMap<ID, EventStream<E>> = ConcurrentHashMap() private val streams: ConcurrentMap<ID, EventStream<E, ID>> = ConcurrentHashMap()
override fun getStream(aggregateId: ID): EventStream<E> = override fun getStream(aggregateId: ID): EventStream<E, ID> =
streams.computeIfAbsent(aggregateId) { EventStreamInMemory(aggregateId) } streams.computeIfAbsent(aggregateId) { EventStreamInMemory(aggregateId) }
} }

View File

@@ -0,0 +1,12 @@
package eventDemo.libs.event
import javax.sql.DataSource
class EventStoreInPostgresql<E : Event<ID>, ID : AggregateId>(
private val dataSource: DataSource,
private val objectToString: (E) -> String,
private val stringToObject: (String) -> E,
) : EventStore<E, ID> {
override fun getStream(aggregateId: ID): EventStream<E, ID> =
EventStreamInPostgresql(aggregateId, dataSource, objectToString, stringToObject)
}

View File

@@ -6,7 +6,9 @@ import io.github.oshai.kotlinlogging.withLoggingContext
/** /**
* Interface representing an event stream for publishing and reading domain events * Interface representing an event stream for publishing and reading domain events
*/ */
interface EventStream<E : Event<*>> { interface EventStream<E : Event<ID>, ID : AggregateId> {
val aggregateId: ID
/** Publishes a single event to the event stream */ /** Publishes a single event to the event stream */
fun publish(event: E) fun publish(event: E)

View File

@@ -10,8 +10,8 @@ import java.util.concurrent.ConcurrentLinkedQueue
* All methods are implemented. * All methods are implemented.
*/ */
class EventStreamInMemory<E : Event<ID>, ID : AggregateId>( class EventStreamInMemory<E : Event<ID>, ID : AggregateId>(
val aggregateId: ID, override val aggregateId: ID,
) : EventStream<E> { ) : EventStream<E, ID> {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private val events: Queue<E> = ConcurrentLinkedQueue() private val events: Queue<E> = ConcurrentLinkedQueue()

View File

@@ -0,0 +1,105 @@
package eventDemo.libs.event
import io.github.oshai.kotlinlogging.KotlinLogging
import org.postgresql.util.PGobject
import javax.sql.DataSource
/**
* An In-Memory implementation of an event stream.
*
* All methods are implemented.
*/
class EventStreamInPostgresql<E : Event<ID>, ID : AggregateId>(
override val aggregateId: ID,
private val dataSource: DataSource,
private val objectToString: (E) -> String,
private val stringToObject: (String) -> E,
) : EventStream<E, ID> {
private val logger = KotlinLogging.logger {}
override fun publish(event: E) {
if (event.aggregateId != aggregateId) {
throw EventStreamPublishException(
"You cannot publish this event in this stream because it has a different aggregateId!",
)
}
dataSource.connection.use { connection ->
connection
.prepareStatement(
"""
insert into event_stream(id, aggregate_id, version, data)
values (?, ?, ?, ?)
""".trimIndent(),
).use {
it.setObject(1, event.eventId)
it.setObject(2, event.aggregateId.id)
it.setInt(3, event.version)
it.setObject(4, PGJsonb(objectToString(event)))
it.executeUpdate()
}
}
logger.info { "Event published" }
}
override fun readAll(): Set<E> =
dataSource.connection.use { connection ->
connection
.prepareStatement(
"""
select data
from event_stream
where aggregate_id = ?
order by version asc
""".trimIndent(),
).use {
it.setObject(1, aggregateId.id)
it.executeQuery().use { resultSet ->
buildSet {
while (resultSet.next()) {
resultSet
.getString("data")
.let(stringToObject)
.let { add(it) }
}
}
}
}
}
override fun readVersionBetween(version: IntRange): Set<E> =
dataSource.connection.use { connection ->
connection
.prepareStatement(
"""
select data
from event_stream
where version between ? and ?
and aggregate_id = ?
order by version asc
""".trimIndent(),
).use { stmt ->
stmt.setInt(1, version.first)
stmt.setInt(2, version.last)
stmt.setObject(3, aggregateId.id)
stmt.executeQuery().use { resultSet ->
buildSet {
while (resultSet.next()) {
resultSet
.getString("data")
.let(stringToObject)
.let { add(it) }
}
}
}
}
}
}
class PGJsonb(
value: String,
) : PGobject() {
init {
this.value = value
this.type = "jsonb"
}
}

View File

@@ -19,11 +19,8 @@ fun Deck.allCards(): Set<Card> =
stack + discard + playersHands.values.flatten() stack + discard + playersHands.values.flatten()
@KoinApplicationDslMarker @KoinApplicationDslMarker
suspend fun testKoinApplicationWithConfig(block: suspend Koin.() -> Unit) { suspend fun <T> testKoinApplicationWithConfig(block: suspend Koin.() -> T): T =
koinApplication { modules(appKoinModule(ApplicationConfig("application.conf").configuration())) }.koin.apply { koinApplication { modules(appKoinModule(ApplicationConfig("application.conf").configuration())) }.koin.block()
block()
}
}
@KtorDsl @KtorDsl
suspend fun testApplicationWithConfig(block: suspend ApplicationTestBuilder.(koin: Koin) -> Unit) { suspend fun testApplicationWithConfig(block: suspend ApplicationTestBuilder.(koin: Koin) -> Unit) {

View File

@@ -1,11 +1,15 @@
package eventDemo.libs.event package eventDemo.libs.event
import eventDemo.testKoinApplicationWithConfig
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.kotest.datatest.withData
import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.equals.shouldBeEqual
import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import org.junit.jupiter.api.assertNull import org.junit.jupiter.api.assertNull
import org.junit.jupiter.api.assertThrows import org.junit.jupiter.api.assertThrows
import kotlin.test.assertNotNull import kotlin.test.assertNotNull
@@ -13,73 +17,96 @@ import kotlin.test.assertNotNull
@DelicateCoroutinesApi @DelicateCoroutinesApi
class EventStreamInMemoryTest : class EventStreamInMemoryTest :
FunSpec({ FunSpec({
fun streamWith3Events(block: EventStream<Event<IdTest>>.(id: IdTest) -> Unit): EventStream<Event<IdTest>> = fun EventStream<EventXTest, IdTest>.with3Events(block: EventStream<EventXTest, IdTest>.(id: IdTest) -> Unit) =
EventStreamInMemory(IdTest()).apply { also {
publish(EventXTest(aggregateId = aggregateId, version = 1, num = 1)) publish(EventXTest(aggregateId = aggregateId, version = 1, num = 1))
publish(EventXTest(aggregateId = aggregateId, version = 2, num = 2)) publish(EventXTest(aggregateId = aggregateId, version = 2, num = 2))
publish(EventXTest(aggregateId = aggregateId, version = 3, num = 3)) publish(EventXTest(aggregateId = aggregateId, version = 3, num = 3))
block(aggregateId) block(aggregateId)
} }
test("readVersionBetween should only return the event of aggregate") { suspend fun eventStreams(): List<EventStream<EventXTest, IdTest>> =
streamWith3Events { testKoinApplicationWithConfig {
readVersionBetween(1..2) shouldHaveSize 2 listOf(
readVersionBetween(1..1) shouldHaveSize 1 EventStreamInMemory(IdTest()),
readVersionBetween(2..20) shouldHaveSize 2 EventStreamInPostgresql(
readVersionBetween(4..20) shouldHaveSize 0 IdTest(),
dataSource = get(),
objectToString = { Json.encodeToString(it) },
stringToObject = { Json.decodeFromString(it) },
),
)
}
context("readVersionBetween should only return the event of aggregate") {
withData(eventStreams()) { stream ->
stream.with3Events {
readVersionBetween(1..2) shouldHaveSize 2
readVersionBetween(1..1) shouldHaveSize 1
readVersionBetween(2..20) shouldHaveSize 2
readVersionBetween(4..20) shouldHaveSize 0
}
} }
} }
test("readAll should only return the event of aggregate") { context("readAll should only return the event of aggregate") {
streamWith3Events { withData(eventStreams()) { stream ->
readAll() shouldHaveSize 3 stream.with3Events {
readAll().also { readAll() shouldHaveSize 3
it.forEachIndexed { i, event -> readAll().also {
event.version shouldBeEqual i + 1 it.forEachIndexed { i, event ->
event.version shouldBeEqual i + 1
}
} }
} }
} }
} }
test("getByVersion should only return the event with this version") { context("getByVersion should only return the event with this version") {
streamWith3Events { withData(eventStreams()) { stream ->
assertNotNull(getByVersion(1)).version shouldBeEqual 1 stream.with3Events {
assertNotNull(getByVersion(2)).version shouldBeEqual 2 assertNotNull(getByVersion(1)).version shouldBeEqual 1
assertNotNull(getByVersion(3)).version shouldBeEqual 3 assertNotNull(getByVersion(2)).version shouldBeEqual 2
assertNull(getByVersion(4)) assertNotNull(getByVersion(3)).version shouldBeEqual 3
} assertNull(getByVersion(4))
}
test("readGreaterOfVersion should only return the events with greater version") {
streamWith3Events {
assertNotNull(readGreaterOfVersion(1)) shouldHaveSize 2
assertNotNull(readGreaterOfVersion(2)) shouldHaveSize 1
assertNotNull(readGreaterOfVersion(3)) shouldHaveSize 0
assertNotNull(readGreaterOfVersion(30)) shouldHaveSize 0
}
}
test("publish should be throw error when publish another aggregate event") {
EventStreamInMemory(IdTest()).apply {
assertThrows<EventStreamPublishException> { publish(EventXTest(aggregateId = IdTest(), version = 1, num = 1)) }
}
}
test("publish should be concurrently secure") {
val id = IdTest()
val stream = EventStreamInMemory(id)
(1..10).forEach { i1 ->
GlobalScope.launch {
(1..10).forEach { i2 ->
stream.publish(
EventXTest(
aggregateId = id,
version = (i1 * 10) + i2,
num = (i1 * 10) + i2,
),
)
}
} }
} }
} }
context("readGreaterOfVersion should only return the events with greater version") {
withData(eventStreams()) {
it.with3Events {
assertNotNull(readGreaterOfVersion(1)) shouldHaveSize 2
assertNotNull(readGreaterOfVersion(2)) shouldHaveSize 1
assertNotNull(readGreaterOfVersion(3)) shouldHaveSize 0
assertNotNull(readGreaterOfVersion(30)) shouldHaveSize 0
}
}
}
context("publish should be throw error when publish another aggregate event") {
withData(eventStreams()) {
assertThrows<EventStreamPublishException> { it.publish(EventXTest(aggregateId = IdTest(), version = 1, num = 1)) }
}
}
context("publish should be concurrently secure") {
withData(eventStreams()) { stream ->
(0..9)
.map { i1 ->
GlobalScope.launch {
(1..10).forEach { i2 ->
stream.publish(
EventXTest(
aggregateId = stream.aggregateId,
version = (i1 * 10) + i2,
num = (i1 * 10) + i2,
),
)
}
}
}.joinAll()
stream.readAll() shouldHaveSize 100
}
}
}) })

View File

@@ -13,6 +13,7 @@ value class IdTest(
override val id: UUID = UUID.randomUUID(), override val id: UUID = UUID.randomUUID(),
) : AggregateId ) : AggregateId
@Serializable
sealed interface TestEvents : Event<IdTest> sealed interface TestEvents : Event<IdTest>
@Serializable @Serializable