move EventStream method implementation into the interface
This commit is contained in:
@@ -1,7 +1,12 @@
|
|||||||
package eventDemo.libs.event
|
package eventDemo.libs.event
|
||||||
|
|
||||||
|
import io.github.oshai.kotlinlogging.withLoggingContext
|
||||||
|
|
||||||
interface EventStore<E : Event<ID>, ID : AggregateId> {
|
interface EventStore<E : Event<ID>, ID : AggregateId> {
|
||||||
fun getStream(aggregateId: ID): EventStream<E>
|
fun getStream(aggregateId: ID): EventStream<E>
|
||||||
|
|
||||||
fun publish(event: E)
|
fun publish(event: E) =
|
||||||
|
withLoggingContext("event" to event.toString()) {
|
||||||
|
getStream(event.aggregateId).publish(event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package eventDemo.libs.event
|
package eventDemo.libs.event
|
||||||
|
|
||||||
import io.github.oshai.kotlinlogging.withLoggingContext
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.ConcurrentMap
|
import java.util.concurrent.ConcurrentMap
|
||||||
|
|
||||||
@@ -9,9 +8,4 @@ class EventStoreInMemory<E : Event<ID>, ID : AggregateId> : EventStore<E, ID> {
|
|||||||
|
|
||||||
override fun getStream(aggregateId: ID): EventStream<E> =
|
override fun getStream(aggregateId: ID): EventStream<E> =
|
||||||
streams.computeIfAbsent(aggregateId) { EventStreamInMemory() }
|
streams.computeIfAbsent(aggregateId) { EventStreamInMemory() }
|
||||||
|
|
||||||
override fun publish(event: E) =
|
|
||||||
withLoggingContext("event" to event.toString()) {
|
|
||||||
getStream(event.aggregateId).publish(event)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package eventDemo.libs.event
|
package eventDemo.libs.event
|
||||||
|
|
||||||
import eventDemo.libs.event.projection.Projection
|
import eventDemo.libs.event.projection.Projection
|
||||||
|
import io.github.oshai.kotlinlogging.withLoggingContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface representing an event stream for publishing and reading domain events
|
* Interface representing an event stream for publishing and reading domain events
|
||||||
@@ -10,12 +11,19 @@ interface EventStream<E : Event<*>> {
|
|||||||
fun publish(event: E)
|
fun publish(event: E)
|
||||||
|
|
||||||
/** Publishes multiple events to the event stream */
|
/** Publishes multiple events to the event stream */
|
||||||
fun publish(vararg events: E)
|
fun publish(vararg events: E) {
|
||||||
|
events.forEach {
|
||||||
|
withLoggingContext("event" to it.toString()) {
|
||||||
|
publish(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Reads all events */
|
/** Reads all events */
|
||||||
fun readAll(): Set<E>
|
fun readAll(): Set<E>
|
||||||
|
|
||||||
fun readGreaterOfVersion(version: Int): Set<E>
|
fun readGreaterOfVersion(version: Int): Set<E> =
|
||||||
|
readVersionBetween(version + 1..Int.MAX_VALUE)
|
||||||
|
|
||||||
fun readVersionBetween(version: IntRange): Set<E>
|
fun readVersionBetween(version: IntRange): Set<E>
|
||||||
|
|
||||||
@@ -25,5 +33,6 @@ interface EventStream<E : Event<*>> {
|
|||||||
): Set<E> =
|
): Set<E> =
|
||||||
readVersionBetween(((projection?.lastEventVersion ?: 0) + 1)..event.version)
|
readVersionBetween(((projection?.lastEventVersion ?: 0) + 1)..event.version)
|
||||||
|
|
||||||
fun getByVersion(version: Int): E?
|
fun getByVersion(version: Int): E? =
|
||||||
|
readVersionBetween(version..version).firstOrNull()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package eventDemo.libs.event
|
package eventDemo.libs.event
|
||||||
|
|
||||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||||
import io.github.oshai.kotlinlogging.withLoggingContext
|
|
||||||
import java.util.Queue
|
import java.util.Queue
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
|
|
||||||
@@ -21,27 +20,11 @@ class EventStreamInMemory<E : Event<*>> : EventStream<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun publish(vararg events: E) {
|
|
||||||
events.forEach {
|
|
||||||
withLoggingContext("event" to it.toString()) {
|
|
||||||
publish(it)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun readAll(): Set<E> =
|
override fun readAll(): Set<E> =
|
||||||
events.toSet()
|
events.toSet()
|
||||||
|
|
||||||
override fun readGreaterOfVersion(version: Int): Set<E> =
|
|
||||||
events
|
|
||||||
.filter { it.version > version }
|
|
||||||
.toSet()
|
|
||||||
|
|
||||||
override fun readVersionBetween(version: IntRange): Set<E> =
|
override fun readVersionBetween(version: IntRange): Set<E> =
|
||||||
events
|
events
|
||||||
.filter { version.contains(it.version) }
|
.filter { version.contains(it.version) }
|
||||||
.toSet()
|
.toSet()
|
||||||
|
|
||||||
override fun getByVersion(version: Int): E? =
|
|
||||||
events.find { version == it.version }
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user