This commit is contained in:
2024-05-30 21:41:02 +02:00
parent 03ba14d918
commit ae5c229e4b
32 changed files with 537 additions and 344 deletions

View File

@@ -0,0 +1,20 @@
package eventDemo.libs.command
import eventDemo.plugins.CommandIdSerializer
import kotlinx.serialization.Serializable
import java.util.UUID
@JvmInline
@Serializable(with = CommandIdSerializer::class)
value class CommandId(
private val id: UUID = UUID.randomUUID(),
) {
constructor(id: String) : this(UUID.fromString(id))
override fun toString(): String = id.toString()
}
interface Command {
val id: CommandId
val name: String
}

View File

@@ -0,0 +1,36 @@
package eventDemo.libs.command
import kotlin.reflect.KClass
interface CommandStream<C : Command> {
/**
* Send a new [Command] to the queue.
*/
suspend fun send(
type: KClass<C>,
command: C,
)
/**
* Send multiple [Command] to the queue.
*/
suspend fun send(
type: KClass<C>,
vararg commands: C,
) {
commands.forEach { send(type, it) }
}
/**
* A class to implement succes/faild action.
*/
interface ComputeStatus {
fun ack()
fun nack()
}
suspend fun process(block: CommandBlock<C>)
}
suspend inline fun <reified C : Command> CommandStream<C>.send(vararg command: C) = send(C::class, *command)

View File

@@ -0,0 +1,82 @@
package eventDemo.libs.command
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlin.reflect.KClass
typealias CommandBlock<C> = CommandStream.ComputeStatus.(C) -> Unit
/**
* Manage [Command]'s
*
* It stores the new [Command] in memory.
*/
abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
private val logger = KotlinLogging.logger {}
private val failedCommand = mutableListOf<Command>()
private val queue: Channel<C> = Channel(onUndeliveredElement = { logger.atWarn { "${it.name} elem not send" } })
/**
* Send a new [Command] to the queue.
*/
override suspend fun send(
type: KClass<C>,
command: C,
) {
logger.atInfo {
message = "Command published: $command"
payload = mapOf("command" to command)
}
queue.send(command)
}
override suspend fun process(block: CommandBlock<C>) {
queue.consumeEach { command ->
compute(command, block)
}
for (command in queue) {
compute(command, block)
}
}
private fun compute(
command: C,
block: CommandBlock<C>,
) {
val status = object : CommandStream.ComputeStatus {
var isSet: Boolean = false
override fun ack() {
if (!isSet) markAsSuccess(command) else error("Already NACK")
isSet = true
}
override fun nack() {
if (!isSet) markAsFailed(command) else error("Already ACK")
isSet = true
}
}
if (runCatching { status.block(command) }.isFailure) {
markAsFailed(command)
} else if (!status.isSet) {
status.ack()
}
}
private fun <C : Command> markAsSuccess(command: C) {
logger.atInfo {
message = "Compute command SUCCESS and it removed of the stack : $command"
payload = mapOf("command" to command)
}
}
private fun <C : Command> markAsFailed(command: C) {
failedCommand.add(command)
logger.atWarn {
message = "Compute command FAILD and it put on the top of the stack : $command"
payload = mapOf("command" to command)
}
}
}