Fix: close all connections after each tests
This commit is contained in:
@@ -48,6 +48,12 @@ private fun DefaultWebSocketServerSession.runWebSocket(
|
||||
val currentPlayer = call.getPlayer()
|
||||
val outgoingFrameChannel: SendChannel<Notification> = fromFrameChannel(outgoing)
|
||||
withLoggingContext("currentPlayer" to currentPlayer.toString()) {
|
||||
val notificationListener =
|
||||
playerNotificationListener.startListening(
|
||||
currentPlayer,
|
||||
gameId,
|
||||
) { outgoingFrameChannel.trySendBlocking(it) }
|
||||
|
||||
// TODO change GlobalScope
|
||||
GlobalScope.launch {
|
||||
commandHandler.handle(
|
||||
@@ -56,12 +62,8 @@ private fun DefaultWebSocketServerSession.runWebSocket(
|
||||
toObjectChannel(incoming),
|
||||
outgoingFrameChannel,
|
||||
)
|
||||
notificationListener.close()
|
||||
}
|
||||
|
||||
playerNotificationListener.startListening(
|
||||
currentPlayer,
|
||||
gameId,
|
||||
) { outgoingFrameChannel.trySendBlocking(it) }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -35,8 +35,8 @@ class PlayerNotificationListener(
|
||||
currentPlayer: Player,
|
||||
gameId: GameId,
|
||||
outgoingNotification: (Notification) -> Unit,
|
||||
) {
|
||||
projectionBus.subscribe { currentState ->
|
||||
): AutoCloseable {
|
||||
return projectionBus.subscribe { currentState ->
|
||||
if (currentState !is GameState) return@subscribe
|
||||
if (currentState.aggregateId != gameId) return@subscribe
|
||||
withLoggingContext("projection" to currentState.toString()) {
|
||||
|
||||
@@ -16,10 +16,13 @@ import eventDemo.business.event.projection.gameState.GameStateRepository
|
||||
import eventDemo.libs.event.projection.SnapshotConfig
|
||||
import org.koin.core.module.Module
|
||||
import org.koin.core.module.dsl.singleOf
|
||||
import org.koin.core.scope.Scope
|
||||
import org.koin.core.scope.ScopeCallback
|
||||
import org.koin.dsl.bind
|
||||
import javax.sql.DataSource
|
||||
|
||||
fun Module.configureDIInfrastructure(config: Configuration) {
|
||||
// Postgresql config
|
||||
single {
|
||||
HikariConfig()
|
||||
.apply {
|
||||
@@ -30,18 +33,27 @@ fun Module.configureDIInfrastructure(config: Configuration) {
|
||||
minimumIdle = 10
|
||||
}.let {
|
||||
HikariDataSource(it)
|
||||
}.also { datasource ->
|
||||
registerCallback(
|
||||
object : ScopeCallback {
|
||||
override fun onScopeClose(scope: Scope) {
|
||||
datasource.close()
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
} bind DataSource::class
|
||||
|
||||
single {
|
||||
// RabbitMQ config
|
||||
factory {
|
||||
ConnectionFactory().apply {
|
||||
host = config.rabbitmq.url
|
||||
port = config.rabbitmq.port
|
||||
virtualHost = virtualHost
|
||||
username = config.rabbitmq.username
|
||||
password = config.rabbitmq.password
|
||||
}
|
||||
}
|
||||
|
||||
singleOf(::GameEventBusInRabbinMQ) bind GameEventBus::class
|
||||
singleOf(::GameEventStoreInPostgresql) bind GameEventStore::class
|
||||
singleOf(::GameProjectionBusInMemory) bind GameProjectionBus::class
|
||||
|
||||
@@ -6,5 +6,9 @@ interface Bus<T> {
|
||||
/**
|
||||
* @param priority The higher the priority, the more it will be called first
|
||||
*/
|
||||
fun subscribe(block: suspend (T) -> Unit)
|
||||
fun subscribe(block: suspend (T) -> Unit): Subscription
|
||||
|
||||
interface Subscription : AutoCloseable {
|
||||
override fun close()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,12 @@ class BusInMemory<E>(
|
||||
}
|
||||
}
|
||||
|
||||
override fun subscribe(block: suspend (E) -> Unit) {
|
||||
override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription {
|
||||
subscribers.add(block)
|
||||
return object : Bus.Subscription {
|
||||
override fun close() {
|
||||
subscribers.remove(block)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,60 +1,90 @@
|
||||
package eventDemo.libs.bus
|
||||
|
||||
import com.rabbitmq.client.CancelCallback
|
||||
import com.rabbitmq.client.AMQP
|
||||
import com.rabbitmq.client.BuiltinExchangeType
|
||||
import com.rabbitmq.client.Connection
|
||||
import com.rabbitmq.client.ConnectionFactory
|
||||
import com.rabbitmq.client.DeliverCallback
|
||||
import com.rabbitmq.client.Delivery
|
||||
import com.rabbitmq.client.DefaultConsumer
|
||||
import com.rabbitmq.client.Envelope
|
||||
import io.ktor.utils.io.core.toByteArray
|
||||
import kotlinx.coroutines.runBlocking
|
||||
|
||||
class BusInRabbitMQ<E>(
|
||||
private val connectionFactory: ConnectionFactory,
|
||||
private val queueName: String,
|
||||
private val exchangeName: String,
|
||||
private val objectToString: (E) -> String,
|
||||
private val stringToObject: (String) -> E,
|
||||
) : Bus<E> {
|
||||
private val connection: Connection = connectionFactory.newConnection()
|
||||
get() {
|
||||
return if (field.isOpen) {
|
||||
field
|
||||
} else {
|
||||
connectionFactory.newConnection()
|
||||
}
|
||||
}
|
||||
private val routingKey = ""
|
||||
|
||||
init {
|
||||
connectionFactory
|
||||
.newConnection()
|
||||
connection
|
||||
.createChannel()
|
||||
.use {
|
||||
it.queueDeclare(
|
||||
queueName,
|
||||
it.exchangeDeclare(
|
||||
exchangeName,
|
||||
BuiltinExchangeType.FANOUT,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
emptyMap(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun publish(item: E) {
|
||||
connectionFactory
|
||||
.newConnection()
|
||||
connection
|
||||
.createChannel()
|
||||
.use {
|
||||
it.basicPublish(
|
||||
"",
|
||||
queueName,
|
||||
null,
|
||||
exchangeName,
|
||||
routingKey,
|
||||
AMQP.BasicProperties(),
|
||||
objectToString(item).toByteArray(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override fun subscribe(block: suspend (E) -> Unit) {
|
||||
connectionFactory
|
||||
.newConnection()
|
||||
override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription {
|
||||
connection
|
||||
.createChannel()
|
||||
.basicConsume(
|
||||
queueName,
|
||||
true,
|
||||
DeliverCallback { _: String, message: Delivery ->
|
||||
runBlocking {
|
||||
block(stringToObject(message.body.toString(Charsets.UTF_8)))
|
||||
.also { channel ->
|
||||
val queue =
|
||||
channel
|
||||
.queueDeclare()
|
||||
.queue
|
||||
.also { channel.queueBind(it, exchangeName, routingKey) }
|
||||
|
||||
channel
|
||||
.basicConsume(
|
||||
queue,
|
||||
object : DefaultConsumer(channel) {
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
envelope: Envelope,
|
||||
properties: AMQP.BasicProperties,
|
||||
body: ByteArray,
|
||||
) {
|
||||
runBlocking {
|
||||
block(stringToObject(body.toString(Charsets.UTF_8)))
|
||||
}
|
||||
channel.basicAck(envelope.deliveryTag, false)
|
||||
}
|
||||
},
|
||||
)
|
||||
}.let {
|
||||
return object : Bus.Subscription {
|
||||
override fun close() {
|
||||
it.close()
|
||||
}
|
||||
},
|
||||
CancelCallback {},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user