diff --git a/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt index 2a7cb73..d7c06ac 100644 --- a/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt @@ -31,16 +31,9 @@ class GameCommandHandler( private val logger = KotlinLogging.logger { } private val eventCommandMap = EventCommandMap() - companion object Config { - const val DEFAULT_PRIORITY = 1000 - } - // subscribe to the event bus to send success notification after save the event. - fun subscribeToBus( - eventBus: GameEventBus, - listenerPriority: Int = DEFAULT_PRIORITY, - ) { - eventBus.subscribe(listenerPriority) { event: GameEvent -> + fun subscribeToBus(eventBus: GameEventBus) { + eventBus.subscribe { event: GameEvent -> eventCommandMap[event.eventId]?.apply { channel.sendSuccess(commandId)() } ?: logger.warn { "No Notification for event: $event" } diff --git a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt index eccdfcb..27fa66d 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt @@ -15,18 +15,14 @@ class ReactionListener( private val eventHandler: GameEventHandler, ) { companion object Config { - const val DEFAULT_PRIORITY = -1000 val registeredListeners = ConcurrentSkipListSet() } private val logger = KotlinLogging.logger { } - fun subscribeToBus( - projectionBus: GameProjectionBus, - priority: Int = DEFAULT_PRIORITY, - ) { + fun subscribeToBus(projectionBus: GameProjectionBus) { if (registeredListeners.add(projectionBus)) { - projectionBus.subscribe(priority) { projection: Projection -> + projectionBus.subscribe { projection: Projection -> if (projection !is GameState) return@subscribe withLoggingContext("projection" to projection.toString()) { sendStartGameEvent(projection) diff --git a/src/main/kotlin/eventDemo/libs/bus/Bus.kt b/src/main/kotlin/eventDemo/libs/bus/Bus.kt index 4c4f030..831e82a 100644 --- a/src/main/kotlin/eventDemo/libs/bus/Bus.kt +++ b/src/main/kotlin/eventDemo/libs/bus/Bus.kt @@ -6,8 +6,5 @@ interface Bus { /** * @param priority The higher the priority, the more it will be called first */ - fun subscribe( - priority: Int = 0, - block: suspend (T) -> Unit, - ) + fun subscribe(block: suspend (T) -> Unit) } diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt index a0f13a4..7dd1620 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt @@ -9,25 +9,21 @@ class BusInMemory( val name: KClass<*> = BusInMemory::class, ) : Bus { private val logger = KotlinLogging.logger(name.qualifiedName.toString()) - private val subscribers: MutableList Unit>> = mutableListOf() + private val subscribers: MutableList Unit> = mutableListOf() override suspend fun publish(item: E) { withLoggingContext("busItem" to item.toString()) { logger.info { "Item sent to the bus: $item" } subscribers - .sortedByDescending { (priority, _) -> priority } - .forEach { (_, block) -> + .forEach { coroutineScope { - block(item) + it(item) } } } } - override fun subscribe( - priority: Int, - block: suspend (E) -> Unit, - ) { - subscribers.add(priority to block) + override fun subscribe(block: suspend (E) -> Unit) { + subscribers.add(block) } } diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt index 6486468..f9ffb2a 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt @@ -42,10 +42,7 @@ class BusInRabbitMQ( } } - override fun subscribe( - priority: Int, - block: suspend (E) -> Unit, - ) { + override fun subscribe(block: suspend (E) -> Unit) { connectionFactory .newConnection() .createChannel()