diff --git a/src/main/kotlin/eventDemo/business/event/GameEventHandler.kt b/src/main/kotlin/eventDemo/business/event/GameEventHandler.kt index 37d161f..ee2f1d1 100644 --- a/src/main/kotlin/eventDemo/business/event/GameEventHandler.kt +++ b/src/main/kotlin/eventDemo/business/event/GameEventHandler.kt @@ -2,11 +2,9 @@ package eventDemo.business.event import eventDemo.business.entity.GameId import eventDemo.business.event.event.GameEvent +import eventDemo.libs.event.EventHandler +import eventDemo.libs.event.EventHandlerImpl import eventDemo.libs.event.VersionBuilder -import io.github.oshai.kotlinlogging.withLoggingContext -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock /** * Handle the event to dispatch it to store, bus and projections builders @@ -15,34 +13,4 @@ class GameEventHandler( private val eventBus: GameEventBus, private val eventStore: GameEventStore, private val versionBuilder: VersionBuilder, -) : EventHandler { - private val locks: ConcurrentHashMap = ConcurrentHashMap() - - /** - * Build Event then send it to the event store and bus. - */ - override suspend fun handle( - aggregateId: GameId, - buildEvent: (version: Int) -> GameEvent, - ): GameEvent = - withLoggingContext("aggregateId" to aggregateId.toString()) { - locks - // Get lock for the aggregate - .computeIfAbsent(aggregateId) { ReentrantLock() } - .withLock { - // Build event with the version - buildEvent(versionBuilder.buildNextVersion(aggregateId)) - // then publish it to the event store - .also { - withLoggingContext("event" to it.toString()) { - eventStore.publish(it) - } - } - }.also { event -> - withLoggingContext("event" to event.toString()) { - // Publish to the bus - eventBus.publish(event) - } - } - } -} +) : EventHandler by EventHandlerImpl(eventBus, eventStore, versionBuilder) diff --git a/src/main/kotlin/eventDemo/business/event/EventHandler.kt b/src/main/kotlin/eventDemo/libs/event/EventHandler.kt similarity index 66% rename from src/main/kotlin/eventDemo/business/event/EventHandler.kt rename to src/main/kotlin/eventDemo/libs/event/EventHandler.kt index 4a4340f..0991099 100644 --- a/src/main/kotlin/eventDemo/business/event/EventHandler.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventHandler.kt @@ -1,7 +1,4 @@ -package eventDemo.business.event - -import eventDemo.libs.event.AggregateId -import eventDemo.libs.event.Event +package eventDemo.libs.event /** * A stream to publish and read the played card event. diff --git a/src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt b/src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt new file mode 100644 index 0000000..5bb5826 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt @@ -0,0 +1,46 @@ +package eventDemo.libs.event + +import eventDemo.libs.bus.Bus +import io.github.oshai.kotlinlogging.withLoggingContext +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * Handle the event to dispatch it to store, bus and projections builders + */ +class EventHandlerImpl, ID : AggregateId>( + private val eventBus: Bus, + private val eventStore: EventStore, + private val versionBuilder: VersionBuilder, +) : EventHandler { + private val locks: ConcurrentHashMap = ConcurrentHashMap() + + /** + * Build Event then send it to the event store and bus. + */ + override suspend fun handle( + aggregateId: ID, + buildEvent: (version: Int) -> E, + ): E = + withLoggingContext("aggregateId" to aggregateId.toString()) { + locks + // Get lock for the aggregate + .computeIfAbsent(aggregateId) { ReentrantLock() } + .withLock { + // Build event with the version + buildEvent(versionBuilder.buildNextVersion(aggregateId)) + // then publish it to the event store + .also { + withLoggingContext("event" to it.toString()) { + eventStore.publish(it) + } + } + }.also { event -> + withLoggingContext("event" to event.toString()) { + // Publish to the bus + eventBus.publish(event) + } + } + } +}