Add Comments
This commit is contained in:
@@ -31,18 +31,18 @@ abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
||||
queue.send(command)
|
||||
}
|
||||
|
||||
override suspend fun process(block: CommandBlock<C>) {
|
||||
override suspend fun process(action: CommandBlock<C>) {
|
||||
queue.consumeEach { command ->
|
||||
compute(command, block)
|
||||
compute(command, action)
|
||||
}
|
||||
for (command in queue) {
|
||||
compute(command, block)
|
||||
compute(command, action)
|
||||
}
|
||||
}
|
||||
|
||||
private fun compute(
|
||||
command: C,
|
||||
block: CommandBlock<C>,
|
||||
action: CommandBlock<C>,
|
||||
) {
|
||||
val status = object : CommandStream.ComputeStatus {
|
||||
var isSet: Boolean = false
|
||||
@@ -58,7 +58,7 @@ abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
||||
}
|
||||
}
|
||||
|
||||
if (runCatching { status.block(command) }.isFailure) {
|
||||
if (runCatching { status.action(command) }.isFailure) {
|
||||
markAsFailed(command)
|
||||
} else if (!status.isSet) {
|
||||
status.ack()
|
||||
@@ -75,7 +75,7 @@ abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
|
||||
private fun <C : Command> markAsFailed(command: C) {
|
||||
failedCommand.add(command)
|
||||
logger.atWarn {
|
||||
message = "Compute command FAILD and it put on the top of the stack : $command"
|
||||
message = "Compute command FAILED and it put it ot the top of the stack : $command"
|
||||
payload = mapOf("command" to command)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user