refactor: move EventHandler to libs
This commit is contained in:
@@ -2,11 +2,9 @@ package eventDemo.business.event
|
|||||||
|
|
||||||
import eventDemo.business.entity.GameId
|
import eventDemo.business.entity.GameId
|
||||||
import eventDemo.business.event.event.GameEvent
|
import eventDemo.business.event.event.GameEvent
|
||||||
|
import eventDemo.libs.event.EventHandler
|
||||||
|
import eventDemo.libs.event.EventHandlerImpl
|
||||||
import eventDemo.libs.event.VersionBuilder
|
import eventDemo.libs.event.VersionBuilder
|
||||||
import io.github.oshai.kotlinlogging.withLoggingContext
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
|
||||||
import kotlin.concurrent.withLock
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle the event to dispatch it to store, bus and projections builders
|
* Handle the event to dispatch it to store, bus and projections builders
|
||||||
@@ -15,34 +13,4 @@ class GameEventHandler(
|
|||||||
private val eventBus: GameEventBus,
|
private val eventBus: GameEventBus,
|
||||||
private val eventStore: GameEventStore,
|
private val eventStore: GameEventStore,
|
||||||
private val versionBuilder: VersionBuilder,
|
private val versionBuilder: VersionBuilder,
|
||||||
) : EventHandler<GameEvent, GameId> {
|
) : EventHandler<GameEvent, GameId> by EventHandlerImpl(eventBus, eventStore, versionBuilder)
|
||||||
private val locks: ConcurrentHashMap<GameId, ReentrantLock> = ConcurrentHashMap()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Build Event then send it to the event store and bus.
|
|
||||||
*/
|
|
||||||
override suspend fun handle(
|
|
||||||
aggregateId: GameId,
|
|
||||||
buildEvent: (version: Int) -> GameEvent,
|
|
||||||
): GameEvent =
|
|
||||||
withLoggingContext("aggregateId" to aggregateId.toString()) {
|
|
||||||
locks
|
|
||||||
// Get lock for the aggregate
|
|
||||||
.computeIfAbsent(aggregateId) { ReentrantLock() }
|
|
||||||
.withLock {
|
|
||||||
// Build event with the version
|
|
||||||
buildEvent(versionBuilder.buildNextVersion(aggregateId))
|
|
||||||
// then publish it to the event store
|
|
||||||
.also {
|
|
||||||
withLoggingContext("event" to it.toString()) {
|
|
||||||
eventStore.publish(it)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}.also { event ->
|
|
||||||
withLoggingContext("event" to event.toString()) {
|
|
||||||
// Publish to the bus
|
|
||||||
eventBus.publish(event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,7 +1,4 @@
|
|||||||
package eventDemo.business.event
|
package eventDemo.libs.event
|
||||||
|
|
||||||
import eventDemo.libs.event.AggregateId
|
|
||||||
import eventDemo.libs.event.Event
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A stream to publish and read the played card event.
|
* A stream to publish and read the played card event.
|
||||||
46
src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt
Normal file
46
src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
package eventDemo.libs.event
|
||||||
|
|
||||||
|
import eventDemo.libs.bus.Bus
|
||||||
|
import io.github.oshai.kotlinlogging.withLoggingContext
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
|
import kotlin.concurrent.withLock
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle the event to dispatch it to store, bus and projections builders
|
||||||
|
*/
|
||||||
|
class EventHandlerImpl<E : Event<ID>, ID : AggregateId>(
|
||||||
|
private val eventBus: Bus<E>,
|
||||||
|
private val eventStore: EventStore<E, ID>,
|
||||||
|
private val versionBuilder: VersionBuilder,
|
||||||
|
) : EventHandler<E, ID> {
|
||||||
|
private val locks: ConcurrentHashMap<ID, ReentrantLock> = ConcurrentHashMap()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build Event then send it to the event store and bus.
|
||||||
|
*/
|
||||||
|
override suspend fun handle(
|
||||||
|
aggregateId: ID,
|
||||||
|
buildEvent: (version: Int) -> E,
|
||||||
|
): E =
|
||||||
|
withLoggingContext("aggregateId" to aggregateId.toString()) {
|
||||||
|
locks
|
||||||
|
// Get lock for the aggregate
|
||||||
|
.computeIfAbsent(aggregateId) { ReentrantLock() }
|
||||||
|
.withLock {
|
||||||
|
// Build event with the version
|
||||||
|
buildEvent(versionBuilder.buildNextVersion(aggregateId))
|
||||||
|
// then publish it to the event store
|
||||||
|
.also {
|
||||||
|
withLoggingContext("event" to it.toString()) {
|
||||||
|
eventStore.publish(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.also { event ->
|
||||||
|
withLoggingContext("event" to event.toString()) {
|
||||||
|
// Publish to the bus
|
||||||
|
eventBus.publish(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user