remove priority of subscribe

This commit is contained in:
2025-04-05 05:13:32 +02:00
parent 0bfeef8a91
commit 080ea1245d
5 changed files with 11 additions and 32 deletions

View File

@@ -31,16 +31,9 @@ class GameCommandHandler(
private val logger = KotlinLogging.logger { } private val logger = KotlinLogging.logger { }
private val eventCommandMap = EventCommandMap() private val eventCommandMap = EventCommandMap()
companion object Config {
const val DEFAULT_PRIORITY = 1000
}
// subscribe to the event bus to send success notification after save the event. // subscribe to the event bus to send success notification after save the event.
fun subscribeToBus( fun subscribeToBus(eventBus: GameEventBus) {
eventBus: GameEventBus, eventBus.subscribe { event: GameEvent ->
listenerPriority: Int = DEFAULT_PRIORITY,
) {
eventBus.subscribe(listenerPriority) { event: GameEvent ->
eventCommandMap[event.eventId]?.apply { eventCommandMap[event.eventId]?.apply {
channel.sendSuccess(commandId)() channel.sendSuccess(commandId)()
} ?: logger.warn { "No Notification for event: $event" } } ?: logger.warn { "No Notification for event: $event" }

View File

@@ -15,18 +15,14 @@ class ReactionListener(
private val eventHandler: GameEventHandler, private val eventHandler: GameEventHandler,
) { ) {
companion object Config { companion object Config {
const val DEFAULT_PRIORITY = -1000
val registeredListeners = ConcurrentSkipListSet<GameProjectionBus>() val registeredListeners = ConcurrentSkipListSet<GameProjectionBus>()
} }
private val logger = KotlinLogging.logger { } private val logger = KotlinLogging.logger { }
fun subscribeToBus( fun subscribeToBus(projectionBus: GameProjectionBus) {
projectionBus: GameProjectionBus,
priority: Int = DEFAULT_PRIORITY,
) {
if (registeredListeners.add(projectionBus)) { if (registeredListeners.add(projectionBus)) {
projectionBus.subscribe(priority) { projection: Projection<GameId> -> projectionBus.subscribe { projection: Projection<GameId> ->
if (projection !is GameState) return@subscribe if (projection !is GameState) return@subscribe
withLoggingContext("projection" to projection.toString()) { withLoggingContext("projection" to projection.toString()) {
sendStartGameEvent(projection) sendStartGameEvent(projection)

View File

@@ -6,8 +6,5 @@ interface Bus<T> {
/** /**
* @param priority The higher the priority, the more it will be called first * @param priority The higher the priority, the more it will be called first
*/ */
fun subscribe( fun subscribe(block: suspend (T) -> Unit)
priority: Int = 0,
block: suspend (T) -> Unit,
)
} }

View File

@@ -9,25 +9,21 @@ class BusInMemory<E>(
val name: KClass<*> = BusInMemory::class, val name: KClass<*> = BusInMemory::class,
) : Bus<E> { ) : Bus<E> {
private val logger = KotlinLogging.logger(name.qualifiedName.toString()) private val logger = KotlinLogging.logger(name.qualifiedName.toString())
private val subscribers: MutableList<Pair<Int, suspend (E) -> Unit>> = mutableListOf() private val subscribers: MutableList<suspend (E) -> Unit> = mutableListOf()
override suspend fun publish(item: E) { override suspend fun publish(item: E) {
withLoggingContext("busItem" to item.toString()) { withLoggingContext("busItem" to item.toString()) {
logger.info { "Item sent to the bus: $item" } logger.info { "Item sent to the bus: $item" }
subscribers subscribers
.sortedByDescending { (priority, _) -> priority } .forEach {
.forEach { (_, block) ->
coroutineScope { coroutineScope {
block(item) it(item)
} }
} }
} }
} }
override fun subscribe( override fun subscribe(block: suspend (E) -> Unit) {
priority: Int, subscribers.add(block)
block: suspend (E) -> Unit,
) {
subscribers.add(priority to block)
} }
} }

View File

@@ -42,10 +42,7 @@ class BusInRabbitMQ<E>(
} }
} }
override fun subscribe( override fun subscribe(block: suspend (E) -> Unit) {
priority: Int,
block: suspend (E) -> Unit,
) {
connectionFactory connectionFactory
.newConnection() .newConnection()
.createChannel() .createChannel()