Improve concurrence of ProjectionSnapshotRepositoryInMemory and GameEventHandler
This commit is contained in:
@@ -9,5 +9,8 @@ import eventDemo.libs.event.Event
|
||||
interface EventHandler<E : Event<ID>, ID : AggregateId> {
|
||||
fun registerProjectionBuilder(builder: (E) -> Unit)
|
||||
|
||||
fun handle(buildEvent: (version: Int) -> E): E
|
||||
fun handle(
|
||||
aggregateId: ID,
|
||||
buildEvent: (version: Int) -> E,
|
||||
): E
|
||||
}
|
||||
|
||||
@@ -3,6 +3,10 @@ package eventDemo.app.event
|
||||
import eventDemo.app.entity.GameId
|
||||
import eventDemo.app.event.event.GameEvent
|
||||
import eventDemo.libs.event.VersionBuilder
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
/**
|
||||
* A stream to publish and read the played card event.
|
||||
@@ -12,18 +16,26 @@ class GameEventHandler(
|
||||
private val eventStream: GameEventStream,
|
||||
private val versionBuilder: VersionBuilder,
|
||||
) : EventHandler<GameEvent, GameId> {
|
||||
private val projectionsBuilders: MutableList<(GameEvent) -> Unit> = mutableListOf()
|
||||
private val projectionsBuilders: ConcurrentLinkedQueue<(GameEvent) -> Unit> = ConcurrentLinkedQueue()
|
||||
private val locks: ConcurrentHashMap<GameId, ReentrantLock> = ConcurrentHashMap()
|
||||
|
||||
override fun registerProjectionBuilder(builder: GameProjectionBuilder) {
|
||||
projectionsBuilders.add(builder)
|
||||
}
|
||||
|
||||
override fun handle(buildEvent: (version: Int) -> GameEvent): GameEvent =
|
||||
buildEvent(versionBuilder.buildNextVersion()).also { event ->
|
||||
eventStream.publish(event)
|
||||
projectionsBuilders.forEach { it(event) }
|
||||
eventBus.publish(event)
|
||||
}
|
||||
override fun handle(
|
||||
aggregateId: GameId,
|
||||
buildEvent: (version: Int) -> GameEvent,
|
||||
): GameEvent =
|
||||
locks
|
||||
.computeIfAbsent(aggregateId) { ReentrantLock() }
|
||||
.withLock {
|
||||
buildEvent(versionBuilder.buildNextVersion(aggregateId))
|
||||
.also { eventStream.publish(it) }
|
||||
}.also { event ->
|
||||
projectionsBuilders.forEach { it(event) }
|
||||
eventBus.publish(event)
|
||||
}
|
||||
}
|
||||
|
||||
typealias GameProjectionBuilder = (GameEvent) -> Unit
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package eventDemo.app.event.projection
|
||||
|
||||
import eventDemo.app.entity.Card
|
||||
import eventDemo.app.entity.GameId
|
||||
import eventDemo.app.event.GameEventStream
|
||||
import eventDemo.app.event.event.CardIsPlayedEvent
|
||||
import eventDemo.app.event.event.GameEvent
|
||||
import eventDemo.app.event.event.GameStartedEvent
|
||||
@@ -14,23 +12,8 @@ import eventDemo.app.event.event.PlayerReadyEvent
|
||||
import eventDemo.app.event.event.PlayerWinEvent
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
|
||||
fun GameId.buildStateFromEventStream(eventStream: GameEventStream): GameState {
|
||||
val events = eventStream.readAll(this)
|
||||
if (events.isEmpty()) return GameState(this)
|
||||
return events.buildStateFromEvents().also {
|
||||
KotlinLogging.logger {}.warn { "state is build from scratch for game: $this " }
|
||||
}
|
||||
}
|
||||
|
||||
fun Collection<GameEvent>.buildStateFromEvents(): GameState {
|
||||
val gameId = this.firstOrNull()?.aggregateId ?: error("Cannot build GameState from an empty list")
|
||||
return fold(GameState(gameId)) { state, event ->
|
||||
state.apply(event)
|
||||
}
|
||||
}
|
||||
|
||||
fun GameState?.apply(event: GameEvent): GameState =
|
||||
(this ?: GameState(event.aggregateId)).let { state ->
|
||||
fun GameState.apply(event: GameEvent): GameState =
|
||||
this.let { state ->
|
||||
val logger = KotlinLogging.logger { }
|
||||
if (event is PlayerActionEvent) {
|
||||
if (state.currentPlayerTurn != event.player) {
|
||||
|
||||
@@ -6,14 +6,16 @@ import eventDemo.app.event.GameEventStream
|
||||
import eventDemo.app.event.event.GameEvent
|
||||
|
||||
class GameStateRepository(
|
||||
private val eventStream: GameEventStream,
|
||||
eventStream: GameEventStream,
|
||||
eventHandler: GameEventHandler,
|
||||
maxSnapshotCacheSize: Int = 20,
|
||||
snapshotConfig: SnapshotConfig = SnapshotConfig(),
|
||||
) {
|
||||
private val projectionsSnapshot =
|
||||
ProjectionSnapshotRepositoryInMemory(
|
||||
applyToProjection = GameState?::apply,
|
||||
maxSnapshotCacheSize = maxSnapshotCacheSize,
|
||||
eventStream = eventStream,
|
||||
snapshotCacheConfig = snapshotConfig,
|
||||
applyToProjection = GameState::apply,
|
||||
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },
|
||||
)
|
||||
|
||||
init {
|
||||
@@ -27,9 +29,7 @@ class GameStateRepository(
|
||||
*
|
||||
* It fetches it from the local cache if possible, otherwise it builds it.
|
||||
*/
|
||||
fun getLast(gameId: GameId): GameState =
|
||||
projectionsSnapshot.getLast(gameId)
|
||||
?: gameId.buildStateFromEventStream(eventStream)
|
||||
fun getLast(gameId: GameId): GameState = projectionsSnapshot.getLast(gameId)
|
||||
|
||||
/**
|
||||
* Get the [GameState] to the specific [event][GameEvent].
|
||||
@@ -37,8 +37,5 @@ class GameStateRepository(
|
||||
*
|
||||
* It fetches it from the local cache if possible, otherwise it builds it.
|
||||
*/
|
||||
fun getUntil(event: GameEvent): GameState =
|
||||
projectionsSnapshot.getUntil(event)
|
||||
?: (eventStream.readAll(event.aggregateId).takeWhile { it != event } + event)
|
||||
.buildStateFromEvents()
|
||||
fun getUntil(event: GameEvent): GameState = projectionsSnapshot.getUntil(event)
|
||||
}
|
||||
|
||||
@@ -2,53 +2,172 @@ package eventDemo.app.event.projection
|
||||
|
||||
import eventDemo.libs.event.AggregateId
|
||||
import eventDemo.libs.event.Event
|
||||
import eventDemo.libs.event.EventStream
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
|
||||
data class SnapshotConfig(
|
||||
val maxSnapshotCacheSize: Int = 20,
|
||||
val maxSnapshotCacheTtl: Duration = 10.minutes,
|
||||
)
|
||||
|
||||
class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
|
||||
private val maxSnapshotCacheSize: Int = 20,
|
||||
private val applyToProjection: P?.(event: E) -> P,
|
||||
private val eventStream: EventStream<E, ID>,
|
||||
private val initialStateBuilder: (ID) -> P,
|
||||
private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(),
|
||||
private val applyToProjection: P.(event: E) -> P,
|
||||
) {
|
||||
private val projectionsSnapshot: ConcurrentHashMap<E, P> = ConcurrentHashMap()
|
||||
private val projectionsSnapshot: ConcurrentHashMap<ID, ConcurrentLinkedQueue<Pair<P, Instant>>> = ConcurrentHashMap()
|
||||
|
||||
fun applyAndPutToCache(event: E): P {
|
||||
// lock here
|
||||
return projectionsSnapshot
|
||||
.filterKeys { it.aggregateId == event.aggregateId }
|
||||
.toList()
|
||||
.find { (e, _) -> e.version == (event.version - 1) }
|
||||
?.second
|
||||
.applyToProjection(event)
|
||||
.also { projectionsSnapshot.put(event, it) }
|
||||
.also { removeOldSnapshot() }
|
||||
// Unlock here
|
||||
/**
|
||||
* Create a snapshot for the event
|
||||
*
|
||||
* 1. get the last snapshot with a version lower than that of the event
|
||||
* 2. get the events with a greater version of the snapshot
|
||||
* 3. apply the event to the snapshot
|
||||
* 4. apply the new event to the projection
|
||||
* 5. save it
|
||||
* 6. remove old one
|
||||
*/
|
||||
fun applyAndPutToCache(event: E): P =
|
||||
getUntil(event)
|
||||
.also {
|
||||
save(it)
|
||||
removeOldSnapshot(it.aggregateId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the last version of the [Projection] from the cache.
|
||||
*
|
||||
* 1. get the last snapshot
|
||||
* 2. get the missing event to the snapshot
|
||||
* 3. apply the missing events to the snapshot
|
||||
*/
|
||||
fun getLast(aggregateId: ID): P {
|
||||
val lastSnapshot = getLastSnapshot(aggregateId)?.first
|
||||
val missingEventOfSnapshot = getEventAfterTheSnapshot(aggregateId, lastSnapshot)
|
||||
return lastSnapshot.applyEvents(aggregateId, missingEventOfSnapshot)
|
||||
}
|
||||
|
||||
private fun removeOldSnapshot() {
|
||||
if (projectionsSnapshot.size > maxSnapshotCacheSize) {
|
||||
val numberToRemove = projectionsSnapshot.size - maxSnapshotCacheSize
|
||||
/**
|
||||
* Build the [Projection] to the specific [event][Event].
|
||||
*
|
||||
* It does not contain the [events][Event] it after this one.
|
||||
*
|
||||
* 1. get the last snapshot before the event
|
||||
* 2. get the events with a greater version of the snapshot but lower of passed event
|
||||
* 3. apply the events to the snapshot
|
||||
*/
|
||||
fun getUntil(event: E): P {
|
||||
val lastSnapshot = getLastSnapshotBeforeOrEqualEvent(event)?.first
|
||||
if (lastSnapshot?.lastEventVersion == event.version) {
|
||||
return lastSnapshot
|
||||
}
|
||||
|
||||
projectionsSnapshot
|
||||
.keys
|
||||
.sortedBy { it.version }
|
||||
.take(numberToRemove)
|
||||
.forEach { event ->
|
||||
projectionsSnapshot.remove(event)
|
||||
}
|
||||
val missingEventOfSnapshot =
|
||||
eventStream.readGreaterOfVersion(
|
||||
event.aggregateId,
|
||||
lastSnapshot?.lastEventVersion ?: 0,
|
||||
)
|
||||
|
||||
return if (lastSnapshot?.lastEventVersion == event.version) {
|
||||
lastSnapshot
|
||||
} else {
|
||||
lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last version of the [Projection] from the cache.
|
||||
* Remove the oldest snapshot.
|
||||
*
|
||||
* The rules are pass in the controller.
|
||||
*/
|
||||
fun getLast(aggregateId: ID): P? =
|
||||
projectionsSnapshot
|
||||
.filter { it.key.aggregateId == aggregateId }
|
||||
.maxByOrNull { (event, _) -> event.version }
|
||||
?.value
|
||||
private fun removeOldSnapshot(aggregateId: ID) {
|
||||
projectionsSnapshot[aggregateId]?.let { queue ->
|
||||
// never remove the last one
|
||||
val theLastOne = getLastSnapshot(aggregateId)
|
||||
|
||||
// remove the oldest by time
|
||||
val now = Clock.System.now()
|
||||
val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl
|
||||
val toRemove = queue.filter { deadLine > it.second }
|
||||
(toRemove - theLastOne).forEach { queue.remove(it) }
|
||||
|
||||
// Remove if size exceeds the limit
|
||||
if (queue.size > snapshotCacheConfig.maxSnapshotCacheSize) {
|
||||
val numberToRemove = projectionsSnapshot.size - snapshotCacheConfig.maxSnapshotCacheSize
|
||||
if (numberToRemove > 0) {
|
||||
queue
|
||||
.sortedByDescending { it.first.lastEventVersion }
|
||||
.take(numberToRemove)
|
||||
.let { it - theLastOne }
|
||||
.forEach { queue.remove(it) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the [Projection] to the specific [event][Event].
|
||||
* It does not contain the [events][Event] it after this one.
|
||||
* Save the snapshot.
|
||||
*/
|
||||
fun getUntil(event: E): P? = projectionsSnapshot.get(event)
|
||||
private fun save(projection: P) {
|
||||
projectionsSnapshot
|
||||
.computeIfAbsent(projection.aggregateId) { ConcurrentLinkedQueue() }
|
||||
.add(Pair(projection, Clock.System.now()))
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last snapshot when the version is lower of then event version
|
||||
*/
|
||||
private fun getLastSnapshotBeforeOrEqualEvent(event: E) =
|
||||
projectionsSnapshot[event.aggregateId]
|
||||
?.sortedByDescending { it.first.lastEventVersion }
|
||||
?.find { it.first.lastEventVersion <= event.version }
|
||||
|
||||
/**
|
||||
* Get the last snapshot (with the higher version).
|
||||
*/
|
||||
private fun getLastSnapshot(aggregateId: ID) =
|
||||
projectionsSnapshot[aggregateId]
|
||||
?.maxByOrNull { it.first.lastEventVersion }
|
||||
|
||||
/**
|
||||
* Get the events from the [event stream][EventStream] when the version is higher of the snapshot.
|
||||
*
|
||||
* If the snapshot is null, it takes all events from the event [event stream][EventStream]
|
||||
*/
|
||||
private fun getEventAfterTheSnapshot(
|
||||
aggregateId: ID,
|
||||
snapshot: P?,
|
||||
) = eventStream
|
||||
.readGreaterOfVersion(aggregateId, snapshot?.lastEventVersion ?: 0)
|
||||
|
||||
/**
|
||||
* Apply events to the projection.
|
||||
*/
|
||||
private fun P?.applyEvents(
|
||||
aggregateId: ID,
|
||||
eventsToApply: Set<E>,
|
||||
): P =
|
||||
eventsToApply
|
||||
.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure)
|
||||
|
||||
/**
|
||||
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
|
||||
*/
|
||||
private val applyToProjectionSecure: P.(event: E) -> P = { event ->
|
||||
if (event.version == lastEventVersion + 1) {
|
||||
applyToProjection(event)
|
||||
} else if (event.version <= lastEventVersion) {
|
||||
KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." }
|
||||
this
|
||||
} else {
|
||||
error("The version of the event must follow directly after the version of the projection.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user