Clean and fix
This commit is contained in:
@@ -10,10 +10,15 @@ import eventDemo.app.entity.Player
|
|||||||
import eventDemo.app.event.GameEventStream
|
import eventDemo.app.event.GameEventStream
|
||||||
import eventDemo.app.event.buildStateFromEventStream
|
import eventDemo.app.event.buildStateFromEventStream
|
||||||
import eventDemo.app.event.event.GameEvent
|
import eventDemo.app.event.event.GameEvent
|
||||||
|
import eventDemo.libs.command.CommandBlock
|
||||||
import io.ktor.websocket.Frame
|
import io.ktor.websocket.Frame
|
||||||
|
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||||
|
import kotlinx.coroutines.GlobalScope
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.channels.ReceiveChannel
|
import kotlinx.coroutines.channels.ReceiveChannel
|
||||||
import kotlinx.coroutines.channels.SendChannel
|
import kotlinx.coroutines.channels.SendChannel
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.channels.trySendBlocking
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Listen [GameCommand] on [GameCommandStream], check the validity and execute an action.
|
* Listen [GameCommand] on [GameCommandStream], check the validity and execute an action.
|
||||||
@@ -22,21 +27,31 @@ import kotlinx.coroutines.runBlocking
|
|||||||
*/
|
*/
|
||||||
class GameCommandHandler(
|
class GameCommandHandler(
|
||||||
private val eventStream: GameEventStream,
|
private val eventStream: GameEventStream,
|
||||||
incoming: ReceiveChannel<Frame>,
|
|
||||||
outgoing: SendChannel<Frame>,
|
|
||||||
) {
|
) {
|
||||||
private val commandStream = GameCommandStream(incoming, outgoing)
|
|
||||||
private val playerNotifier: (String) -> Unit = { runBlocking { outgoing.send(Frame.Text(it)) } }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Init the handler
|
* Init the handler
|
||||||
*/
|
*/
|
||||||
suspend fun init(player: Player) {
|
@OptIn(DelicateCoroutinesApi::class)
|
||||||
|
fun handle(
|
||||||
|
player: Player,
|
||||||
|
incoming: ReceiveChannel<Frame>,
|
||||||
|
outgoing: SendChannel<Frame>,
|
||||||
|
): Job {
|
||||||
|
val commandStream = GameCommandStream(incoming, outgoing)
|
||||||
|
val playerNotifier: (String) -> Unit = { outgoing.trySendBlocking(Frame.Text(it)) }
|
||||||
|
return GlobalScope.launch {
|
||||||
|
init(player, commandStream, playerNotifier)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun init(
|
||||||
|
player: Player,
|
||||||
|
commandStream: GameCommandStream,
|
||||||
|
playerNotifier: (String) -> Unit,
|
||||||
|
) {
|
||||||
commandStream.process { command ->
|
commandStream.process { command ->
|
||||||
if (command.payload.player.id != player.id) {
|
if (command.payload.player.id != player.id) {
|
||||||
runBlocking {
|
nack()
|
||||||
nack()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val gameState = command.buildGameState()
|
val gameState = command.buildGameState()
|
||||||
@@ -47,7 +62,7 @@ class GameCommandHandler(
|
|||||||
is IWantToJoinTheGameCommand -> command.run(gameState, playerNotifier, eventStream)
|
is IWantToJoinTheGameCommand -> command.run(gameState, playerNotifier, eventStream)
|
||||||
is ICantPlayCommand -> command.run(gameState, playerNotifier, eventStream)
|
is ICantPlayCommand -> command.run(gameState, playerNotifier, eventStream)
|
||||||
}
|
}
|
||||||
}
|
} as CommandBlock<GameCommand>
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun GameCommand.buildGameState(): GameState = payload.gameId.buildStateFromEventStream(eventStream)
|
private fun GameCommand.buildGameState(): GameState = payload.gameId.buildStateFromEventStream(eventStream)
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
package eventDemo.app.command
|
package eventDemo.app.command
|
||||||
|
|
||||||
import eventDemo.app.entity.Player
|
import eventDemo.app.entity.Player
|
||||||
import eventDemo.app.event.GameEventBus
|
|
||||||
import eventDemo.app.event.GameEventStream
|
|
||||||
import eventDemo.app.eventListener.GameEventPlayerNotificationListener
|
import eventDemo.app.eventListener.GameEventPlayerNotificationListener
|
||||||
import io.ktor.server.application.ApplicationCall
|
import io.ktor.server.application.ApplicationCall
|
||||||
import io.ktor.server.auth.authenticate
|
import io.ktor.server.auth.authenticate
|
||||||
@@ -10,18 +8,15 @@ import io.ktor.server.auth.jwt.JWTPrincipal
|
|||||||
import io.ktor.server.auth.principal
|
import io.ktor.server.auth.principal
|
||||||
import io.ktor.server.routing.Route
|
import io.ktor.server.routing.Route
|
||||||
import io.ktor.server.websocket.webSocket
|
import io.ktor.server.websocket.webSocket
|
||||||
import kotlinx.coroutines.launch
|
|
||||||
|
|
||||||
fun Route.gameSocket(
|
fun Route.gameSocket(
|
||||||
eventStream: GameEventStream,
|
playerNotificationListener: GameEventPlayerNotificationListener,
|
||||||
eventBus: GameEventBus,
|
commandHandler: GameCommandHandler,
|
||||||
) {
|
) {
|
||||||
authenticate {
|
authenticate {
|
||||||
webSocket("/game") {
|
webSocket("/game") {
|
||||||
launch {
|
commandHandler.handle(call.getPlayer(), incoming, outgoing)
|
||||||
GameCommandHandler(eventStream, incoming, outgoing).init(call.getPlayer())
|
playerNotificationListener.startListening(outgoing)
|
||||||
}
|
|
||||||
GameEventPlayerNotificationListener(eventBus, outgoing).init()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,18 +2,17 @@ package eventDemo.app.event
|
|||||||
|
|
||||||
import eventDemo.app.entity.GameId
|
import eventDemo.app.entity.GameId
|
||||||
import eventDemo.app.event.event.GameEvent
|
import eventDemo.app.event.event.GameEvent
|
||||||
import eventDemo.libs.event.EventBus
|
|
||||||
import eventDemo.libs.event.EventStream
|
import eventDemo.libs.event.EventStream
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A stream to publish and read the played card event.
|
* A stream to publish and read the played card event.
|
||||||
*/
|
*/
|
||||||
class GameEventStream(
|
class GameEventStream(
|
||||||
private val eventBus: EventBus<GameEvent, GameId>,
|
private val eventBus: GameEventBus,
|
||||||
private val m: EventStream<GameEvent, GameId>,
|
private val eventStream: EventStream<GameEvent, GameId>,
|
||||||
) : EventStream<GameEvent, GameId> by m {
|
) : EventStream<GameEvent, GameId> by eventStream {
|
||||||
override fun publish(event: GameEvent) {
|
override fun publish(event: GameEvent) {
|
||||||
m.publish(event)
|
eventStream.publish(event)
|
||||||
eventBus.publish(event)
|
eventBus.publish(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,22 +1,18 @@
|
|||||||
package eventDemo.app.eventListener
|
package eventDemo.app.eventListener
|
||||||
|
|
||||||
import eventDemo.app.entity.GameId
|
import eventDemo.app.event.GameEventBus
|
||||||
import eventDemo.app.event.event.GameEvent
|
import eventDemo.app.event.event.GameEvent
|
||||||
import eventDemo.libs.event.EventBus
|
|
||||||
import eventDemo.shared.toFrame
|
import eventDemo.shared.toFrame
|
||||||
import io.ktor.websocket.Frame
|
import io.ktor.websocket.Frame
|
||||||
import kotlinx.coroutines.channels.SendChannel
|
import kotlinx.coroutines.channels.SendChannel
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.channels.trySendBlocking
|
||||||
|
|
||||||
class GameEventPlayerNotificationListener(
|
class GameEventPlayerNotificationListener(
|
||||||
private val eventBus: EventBus<GameEvent, GameId>,
|
private val eventBus: GameEventBus,
|
||||||
private val outgoing: SendChannel<Frame>,
|
|
||||||
) {
|
) {
|
||||||
fun init() {
|
fun startListening(outgoing: SendChannel<Frame>) {
|
||||||
eventBus.subscribe { event: GameEvent ->
|
eventBus.subscribe { event: GameEvent ->
|
||||||
runBlocking {
|
outgoing.trySendBlocking(event.toFrame())
|
||||||
outgoing.send(event.toFrame())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,14 @@
|
|||||||
package eventDemo.app.eventListener
|
package eventDemo.app.eventListener
|
||||||
|
|
||||||
import eventDemo.app.entity.GameId
|
import eventDemo.app.event.GameEventBus
|
||||||
|
import eventDemo.app.event.GameEventStream
|
||||||
import eventDemo.app.event.buildStateFromEventStream
|
import eventDemo.app.event.buildStateFromEventStream
|
||||||
import eventDemo.app.event.event.GameEvent
|
import eventDemo.app.event.event.GameEvent
|
||||||
import eventDemo.app.event.event.GameStartedEvent
|
import eventDemo.app.event.event.GameStartedEvent
|
||||||
import eventDemo.libs.event.EventBus
|
|
||||||
import eventDemo.libs.event.EventStream
|
|
||||||
|
|
||||||
class GameEventReactionListener(
|
class GameEventReactionListener(
|
||||||
private val eventBus: EventBus<GameEvent, GameId>,
|
private val eventBus: GameEventBus,
|
||||||
private val eventStream: EventStream<GameEvent, GameId>,
|
private val eventStream: GameEventStream,
|
||||||
) {
|
) {
|
||||||
fun init() {
|
fun init() {
|
||||||
eventBus.subscribe { event: GameEvent ->
|
eventBus.subscribe { event: GameEvent ->
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
package eventDemo.configuration
|
package eventDemo.configuration
|
||||||
|
|
||||||
|
import eventDemo.app.command.GameCommandHandler
|
||||||
import eventDemo.app.event.GameEventBus
|
import eventDemo.app.event.GameEventBus
|
||||||
import eventDemo.app.event.GameEventStream
|
import eventDemo.app.event.GameEventStream
|
||||||
|
import eventDemo.app.eventListener.GameEventPlayerNotificationListener
|
||||||
import eventDemo.libs.event.EventBusInMemory
|
import eventDemo.libs.event.EventBusInMemory
|
||||||
import eventDemo.libs.event.EventStreamInMemory
|
import eventDemo.libs.event.EventStreamInMemory
|
||||||
import io.ktor.server.application.Application
|
import io.ktor.server.application.Application
|
||||||
import io.ktor.server.application.install
|
import io.ktor.server.application.install
|
||||||
|
import org.koin.core.module.dsl.singleOf
|
||||||
import org.koin.dsl.module
|
import org.koin.dsl.module
|
||||||
import org.koin.ktor.plugin.Koin
|
import org.koin.ktor.plugin.Koin
|
||||||
import org.koin.logger.slf4jLogger
|
import org.koin.logger.slf4jLogger
|
||||||
@@ -19,10 +22,15 @@ fun Application.configureKoin() {
|
|||||||
|
|
||||||
val appKoinModule =
|
val appKoinModule =
|
||||||
module {
|
module {
|
||||||
|
single {
|
||||||
|
GameEventBus(EventBusInMemory())
|
||||||
|
}
|
||||||
single {
|
single {
|
||||||
GameEventStream(get(), EventStreamInMemory())
|
GameEventStream(get(), EventStreamInMemory())
|
||||||
}
|
}
|
||||||
single {
|
single {
|
||||||
GameEventBus(EventBusInMemory())
|
GameCommandHandler(get())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
singleOf(::GameEventPlayerNotificationListener)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,16 +1,16 @@
|
|||||||
package eventDemo.configuration
|
package eventDemo.configuration
|
||||||
|
|
||||||
|
import eventDemo.app.command.GameCommandHandler
|
||||||
import eventDemo.app.command.gameSocket
|
import eventDemo.app.command.gameSocket
|
||||||
import eventDemo.app.event.GameEventBus
|
import eventDemo.app.eventListener.GameEventPlayerNotificationListener
|
||||||
import eventDemo.app.event.GameEventStream
|
|
||||||
import io.ktor.server.application.Application
|
import io.ktor.server.application.Application
|
||||||
import io.ktor.server.routing.routing
|
import io.ktor.server.routing.routing
|
||||||
|
|
||||||
fun Application.declareWebSocketsGameRoute(
|
fun Application.declareWebSocketsGameRoute(
|
||||||
eventStream: GameEventStream,
|
playerNotificationListener: GameEventPlayerNotificationListener,
|
||||||
eventBus: GameEventBus,
|
commandHandler: GameCommandHandler,
|
||||||
) {
|
) {
|
||||||
routing {
|
routing {
|
||||||
gameSocket(eventStream, eventBus)
|
gameSocket(playerNotificationListener, commandHandler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
package eventDemo.libs.command
|
package eventDemo.libs.command
|
||||||
|
|
||||||
|
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||||
|
import kotlinx.coroutines.GlobalScope
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -11,7 +14,7 @@ interface CommandStream<C : Command> {
|
|||||||
/**
|
/**
|
||||||
* Send a new [Command] to the queue.
|
* Send a new [Command] to the queue.
|
||||||
*/
|
*/
|
||||||
suspend fun send(
|
fun send(
|
||||||
type: KClass<C>,
|
type: KClass<C>,
|
||||||
command: C,
|
command: C,
|
||||||
)
|
)
|
||||||
@@ -19,7 +22,7 @@ interface CommandStream<C : Command> {
|
|||||||
/**
|
/**
|
||||||
* Send multiple [Command] to the queue.
|
* Send multiple [Command] to the queue.
|
||||||
*/
|
*/
|
||||||
suspend fun send(
|
fun send(
|
||||||
type: KClass<C>,
|
type: KClass<C>,
|
||||||
vararg commands: C,
|
vararg commands: C,
|
||||||
) {
|
) {
|
||||||
@@ -39,6 +42,13 @@ interface CommandStream<C : Command> {
|
|||||||
* Apply an action to all command income in the stream.
|
* Apply an action to all command income in the stream.
|
||||||
*/
|
*/
|
||||||
suspend fun process(action: CommandBlock<C>)
|
suspend fun process(action: CommandBlock<C>)
|
||||||
|
|
||||||
|
@OptIn(DelicateCoroutinesApi::class)
|
||||||
|
fun blockAndProcess(action: CommandBlock<C>) {
|
||||||
|
GlobalScope.launch {
|
||||||
|
process(action)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend inline fun <reified C : Command> CommandStream<C>.send(vararg command: C) = send(C::class, *command)
|
suspend inline fun <reified C : Command> CommandStream<C>.send(vararg command: C) = send(C::class, *command)
|
||||||
|
|||||||
@@ -3,10 +3,11 @@ package eventDemo.libs.command
|
|||||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||||
import io.ktor.websocket.Frame
|
import io.ktor.websocket.Frame
|
||||||
import io.ktor.websocket.readText
|
import io.ktor.websocket.readText
|
||||||
import kotlinx.coroutines.GlobalScope
|
|
||||||
import kotlinx.coroutines.channels.ReceiveChannel
|
import kotlinx.coroutines.channels.ReceiveChannel
|
||||||
import kotlinx.coroutines.channels.SendChannel
|
import kotlinx.coroutines.channels.SendChannel
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.channels.onFailure
|
||||||
|
import kotlinx.coroutines.channels.onSuccess
|
||||||
|
import kotlinx.coroutines.channels.trySendBlocking
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -19,23 +20,30 @@ class CommandStreamChannel<C : Command>(
|
|||||||
private val deserializer: (String) -> C,
|
private val deserializer: (String) -> C,
|
||||||
) : CommandStream<C> {
|
) : CommandStream<C> {
|
||||||
private val logger = KotlinLogging.logger {}
|
private val logger = KotlinLogging.logger {}
|
||||||
private val failedCommand = mutableListOf<C>()
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a new [Command] to the queue.
|
* Send a new [Command] to the queue.
|
||||||
*/
|
*/
|
||||||
override suspend fun send(
|
override fun send(
|
||||||
type: KClass<C>,
|
type: KClass<C>,
|
||||||
command: C,
|
command: C,
|
||||||
) {
|
) {
|
||||||
outgoing.send(Frame.Text(serializer(command)))
|
outgoing
|
||||||
logger.atInfo {
|
.trySendBlocking(Frame.Text(serializer(command)))
|
||||||
message = "Command published: $command"
|
.onSuccess {
|
||||||
payload = mapOf("command" to command)
|
logger.atInfo {
|
||||||
}
|
message = "Command published: $command"
|
||||||
|
payload = mapOf("command" to command)
|
||||||
|
}
|
||||||
|
}.onFailure {
|
||||||
|
logger.atError {
|
||||||
|
message = "Command FAILED: $command"
|
||||||
|
payload = mapOf("command" to command)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun process(action: CommandStream.ComputeStatus.(C) -> Unit) {
|
override suspend fun process(action: CommandBlock<C>) {
|
||||||
// incoming.consumeEach { commandAsFrame ->
|
// incoming.consumeEach { commandAsFrame ->
|
||||||
// if (commandAsFrame is Frame.Text) {
|
// if (commandAsFrame is Frame.Text) {
|
||||||
// compute(deserializer(commandAsFrame.readText()), action)
|
// compute(deserializer(commandAsFrame.readText()), action)
|
||||||
@@ -50,7 +58,7 @@ class CommandStreamChannel<C : Command>(
|
|||||||
|
|
||||||
private suspend fun compute(
|
private suspend fun compute(
|
||||||
command: C,
|
command: C,
|
||||||
action: CommandStream.ComputeStatus.(C) -> Unit,
|
action: CommandBlock<C>,
|
||||||
) {
|
) {
|
||||||
val status =
|
val status =
|
||||||
object : CommandStream.ComputeStatus {
|
object : CommandStream.ComputeStatus {
|
||||||
@@ -67,12 +75,12 @@ class CommandStreamChannel<C : Command>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val action = runCatching { status.action(command) }
|
val actionResult = runCatching { status.action(command) }
|
||||||
if (action.isFailure) {
|
if (actionResult.isFailure) {
|
||||||
logger.atInfo {
|
logger.atInfo {
|
||||||
message = "Error"
|
message = "Error on compute the Command"
|
||||||
payload = mapOf("command" to command)
|
payload = mapOf("command" to command)
|
||||||
cause = action.exceptionOrNull()
|
cause = actionResult.exceptionOrNull()
|
||||||
}
|
}
|
||||||
markAsFailed(command)
|
markAsFailed(command)
|
||||||
} else if (!status.isSet) {
|
} else if (!status.isSet) {
|
||||||
@@ -82,20 +90,17 @@ class CommandStreamChannel<C : Command>(
|
|||||||
|
|
||||||
private suspend fun markAsSuccess(command: C) {
|
private suspend fun markAsSuccess(command: C) {
|
||||||
logger.atInfo {
|
logger.atInfo {
|
||||||
message = "Compute command SUCCESS and it removed of the stack : $command"
|
message = "Compute command SUCCESS and it removed of the stack"
|
||||||
payload = mapOf("command" to command)
|
payload = mapOf("command" to command)
|
||||||
}
|
}
|
||||||
GlobalScope.launch {
|
// outgoing.trySendBlocking(Frame.Text("Command executed successfully"))
|
||||||
// outgoing.send(Frame.Text("Command executed successfully"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun markAsFailed(command: C) {
|
private suspend fun markAsFailed(command: C) {
|
||||||
failedCommand.add(command)
|
|
||||||
logger.atWarn {
|
logger.atWarn {
|
||||||
message = "Compute command FAILED and it put it ot the top of the stack : $command"
|
message = "Compute command FAILED"
|
||||||
payload = mapOf("command" to command)
|
payload = mapOf("command" to command)
|
||||||
}
|
}
|
||||||
outgoing.send(Frame.Text("Command execution failed"))
|
// outgoing.trySendBlocking(Frame.Text("Command execution failed"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,9 +3,10 @@ package eventDemo.libs.command
|
|||||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
import kotlinx.coroutines.channels.consumeEach
|
import kotlinx.coroutines.channels.consumeEach
|
||||||
|
import kotlinx.coroutines.channels.trySendBlocking
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
typealias CommandBlock<C> = CommandStream.ComputeStatus.(C) -> Unit
|
typealias CommandBlock<C> = suspend CommandStream.ComputeStatus.(C) -> Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manage [Command]'s
|
* Manage [Command]'s
|
||||||
@@ -14,7 +15,6 @@ typealias CommandBlock<C> = CommandStream.ComputeStatus.(C) -> Unit
|
|||||||
*/
|
*/
|
||||||
abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
||||||
private val logger = KotlinLogging.logger {}
|
private val logger = KotlinLogging.logger {}
|
||||||
private val failedCommand = mutableListOf<Command>()
|
|
||||||
private val queue: Channel<C> =
|
private val queue: Channel<C> =
|
||||||
Channel(onUndeliveredElement = {
|
Channel(onUndeliveredElement = {
|
||||||
logger.atWarn { "${it::class.simpleName} command not send" }
|
logger.atWarn { "${it::class.simpleName} command not send" }
|
||||||
@@ -23,7 +23,7 @@ abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
|||||||
/**
|
/**
|
||||||
* Send a new [Command] to the queue.
|
* Send a new [Command] to the queue.
|
||||||
*/
|
*/
|
||||||
override suspend fun send(
|
override fun send(
|
||||||
type: KClass<C>,
|
type: KClass<C>,
|
||||||
command: C,
|
command: C,
|
||||||
) {
|
) {
|
||||||
@@ -31,7 +31,7 @@ abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
|||||||
message = "Command published: $command"
|
message = "Command published: $command"
|
||||||
payload = mapOf("command" to command)
|
payload = mapOf("command" to command)
|
||||||
}
|
}
|
||||||
queue.send(command)
|
queue.trySendBlocking(command)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun process(action: CommandBlock<C>) {
|
override suspend fun process(action: CommandBlock<C>) {
|
||||||
@@ -43,7 +43,7 @@ abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun compute(
|
private suspend fun compute(
|
||||||
command: C,
|
command: C,
|
||||||
action: CommandBlock<C>,
|
action: CommandBlock<C>,
|
||||||
) {
|
) {
|
||||||
@@ -51,12 +51,12 @@ abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
|||||||
object : CommandStream.ComputeStatus {
|
object : CommandStream.ComputeStatus {
|
||||||
var isSet: Boolean = false
|
var isSet: Boolean = false
|
||||||
|
|
||||||
override fun ack() {
|
override suspend fun ack() {
|
||||||
if (!isSet) markAsSuccess(command) else error("Already NACK")
|
if (!isSet) markAsSuccess(command) else error("Already NACK")
|
||||||
isSet = true
|
isSet = true
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun nack() {
|
override suspend fun nack() {
|
||||||
if (!isSet) markAsFailed(command) else error("Already ACK")
|
if (!isSet) markAsFailed(command) else error("Already ACK")
|
||||||
isSet = true
|
isSet = true
|
||||||
}
|
}
|
||||||
@@ -77,7 +77,6 @@ abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun <C : Command> markAsFailed(command: C) {
|
private fun <C : Command> markAsFailed(command: C) {
|
||||||
failedCommand.add(command)
|
|
||||||
logger.atWarn {
|
logger.atWarn {
|
||||||
message = "Compute command FAILED and it put it ot the top of the stack : $command"
|
message = "Compute command FAILED and it put it ot the top of the stack : $command"
|
||||||
payload = mapOf("command" to command)
|
payload = mapOf("command" to command)
|
||||||
|
|||||||
@@ -5,8 +5,6 @@ import io.ktor.websocket.Frame
|
|||||||
import io.mockk.mockk
|
import io.mockk.mockk
|
||||||
import io.mockk.verify
|
import io.mockk.verify
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
import kotlinx.coroutines.launch
|
|
||||||
import kotlinx.coroutines.runBlocking
|
|
||||||
|
|
||||||
class CommandTest(
|
class CommandTest(
|
||||||
override val id: CommandId,
|
override val id: CommandId,
|
||||||
@@ -30,19 +28,13 @@ class CommandStreamChannelTest :
|
|||||||
)
|
)
|
||||||
|
|
||||||
val spyCall: () -> Unit = mockk(relaxed = true)
|
val spyCall: () -> Unit = mockk(relaxed = true)
|
||||||
runBlocking {
|
|
||||||
launch {
|
stream.blockAndProcess {
|
||||||
stream.process {
|
println("In action ${it.id}")
|
||||||
println("In action ${it.id}")
|
spyCall()
|
||||||
spyCall()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
launch {
|
|
||||||
stream.send(command, command2)
|
|
||||||
stream.send(command3)
|
|
||||||
channel.close()
|
|
||||||
}.join()
|
|
||||||
verify(exactly = 3) { spyCall() }
|
|
||||||
}
|
}
|
||||||
|
stream.send(command, command2)
|
||||||
|
stream.send(command3)
|
||||||
|
verify(exactly = 3) { spyCall() }
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user