Add MDC to log4j

This commit is contained in:
2025-03-16 01:42:57 +01:00
parent ca95344ca9
commit 804ccd785e
14 changed files with 196 additions and 244 deletions

View File

@@ -11,6 +11,7 @@ import eventDemo.app.notification.Notification
import eventDemo.libs.command.CommandId
import eventDemo.libs.command.CommandStreamChannel
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import java.util.UUID
@@ -56,35 +57,29 @@ class GameCommandHandler(
player: Player,
incomingCommandChannel: ReceiveChannel<GameCommand>,
channelNotification: SendChannel<Notification>,
) =
) {
commandStreamChannel.process(incomingCommandChannel) { command ->
if (command.payload.player.id != player.id) {
logger.atWarn {
message = "Handle command Refuse, the player of the command is not the same: $command"
payload = mapOf("command" to command)
}
channelNotification.sendError(command)("You are not the author of this command\n")
} else {
logger.atInfo {
message = "Handle command: $command"
payload = mapOf("command" to command)
}
try {
val eventBuilder = runner.run(command)
withLoggingContext("command" to command.toString()) {
if (command.payload.player.id != player.id) {
logger.warn { "Handle command Refuse, the player of the command is not the same" }
channelNotification.sendError(command)("You are not the author of this command\n")
} else {
logger.info { "Handle command" }
try {
val eventBuilder = runner.run(command)
eventHandler.handle(command.payload.aggregateId) { version ->
eventBuilder(version)
.also { eventCommandMap.set(it.eventId, channelNotification, command.id) }
eventHandler.handle(command.payload.aggregateId) { version ->
eventBuilder(version)
.also { eventCommandMap.set(it.eventId, channelNotification, command.id) }
}
} catch (e: CommandException) {
logger.warn(e) { e.message }
channelNotification.sendError(command)(e.message)
}
} catch (e: CommandException) {
logger.atWarn {
message = e.message
payload = mapOf("command" to command)
}
channelNotification.sendError(command)(e.message)
}
}
}
}
}
private fun SendChannel<Notification>.sendSuccess(commandId: CommandId): suspend () -> Unit =
@@ -92,15 +87,10 @@ private fun SendChannel<Notification>.sendSuccess(commandId: CommandId): suspend
val logger = KotlinLogging.logger { }
CommandSuccessNotification(commandId = commandId)
.also { notification ->
logger.atDebug {
message = "Notification SUCCESS sent"
payload =
mapOf(
"notification" to notification,
"commandId" to commandId,
)
withLoggingContext("notification" to notification.toString(), "commandId" to commandId.toString()) {
logger.debug { "Notification SUCCESS sent" }
send(notification)
}
send(notification)
}
}
@@ -109,15 +99,10 @@ private fun SendChannel<Notification>.sendError(command: GameCommand): suspend (
val logger = KotlinLogging.logger { }
CommandErrorNotification(message = it, command = command)
.also { notification ->
logger.atWarn {
message = "Notification ERROR sent: ${notification.message}"
payload =
mapOf(
"notification" to notification,
"command" to command,
)
withLoggingContext("notification" to notification.toString(), "command" to command.toString()) {
logger.warn { "Notification ERROR sent: ${notification.message}" }
send(notification)
}
send(notification)
}
}

View File

@@ -5,6 +5,7 @@ import eventDemo.app.eventListener.PlayerNotificationEventListener
import eventDemo.app.notification.Notification
import eventDemo.libs.fromFrameChannel
import eventDemo.libs.toObjectChannel
import io.github.oshai.kotlinlogging.withLoggingContext
import io.ktor.server.application.ApplicationCall
import io.ktor.server.auth.authenticate
import io.ktor.server.auth.jwt.JWTPrincipal
@@ -25,14 +26,16 @@ fun Route.gameSocket(
webSocket("/game") {
val currentPlayer = call.getPlayer()
val outgoingFrameChannel: SendChannel<Notification> = fromFrameChannel(outgoing)
GlobalScope.launch {
commandHandler.handle(
currentPlayer,
toObjectChannel(incoming),
outgoingFrameChannel,
)
withLoggingContext("currentPlayer" to currentPlayer.toString()) {
GlobalScope.launch {
commandHandler.handle(
currentPlayer,
toObjectChannel(incoming),
outgoingFrameChannel,
)
}
playerNotificationListener.startListening(outgoingFrameChannel, currentPlayer)
}
playerNotificationListener.startListening(outgoingFrameChannel, currentPlayer)
}
}
}

View File

@@ -3,6 +3,7 @@ package eventDemo.app.event
import eventDemo.app.entity.GameId
import eventDemo.app.event.event.GameEvent
import eventDemo.libs.event.VersionBuilder
import io.github.oshai.kotlinlogging.withLoggingContext
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.locks.ReentrantLock
@@ -31,20 +32,28 @@ class GameEventHandler(
aggregateId: GameId,
buildEvent: (version: Int) -> GameEvent,
): GameEvent =
locks
// Get lock for the aggregate
.computeIfAbsent(aggregateId) { ReentrantLock() }
.withLock {
// Build event with the version
buildEvent(versionBuilder.buildNextVersion(aggregateId))
// then publish it to the event store
.also { eventStore.publish(it) }
}.also { event ->
// Build the projections
projectionsBuilders.forEach { it(event) }
// Publish to the bus
eventBus.publish(event)
}
withLoggingContext("aggregateId" to aggregateId.toString()) {
locks
// Get lock for the aggregate
.computeIfAbsent(aggregateId) { ReentrantLock() }
.withLock {
// Build event with the version
buildEvent(versionBuilder.buildNextVersion(aggregateId))
// then publish it to the event store
.also {
withLoggingContext("event" to it.toString()) {
eventStore.publish(it)
}
}
}.also { event ->
withLoggingContext("event" to event.toString()) {
// Build the projections
projectionsBuilders.forEach { it(event) }
// Publish to the bus
eventBus.publish(event)
}
}
}
}
typealias GameProjectionBuilder = (GameEvent) -> Unit

View File

@@ -1,15 +0,0 @@
package eventDemo.app.event
import eventDemo.app.event.event.GameEvent
import eventDemo.libs.event.EventStream
/**
* A stream to publish and read the played card event.
*/
class GameEventStream(
private val eventStream: EventStream<GameEvent>,
) : EventStream<GameEvent> by eventStream {
override fun publish(event: GameEvent) {
eventStream.publish(event)
}
}

View File

@@ -18,7 +18,7 @@ fun GameState.apply(event: GameEvent): GameState =
if (event is PlayerActionEvent) {
if (state.currentPlayerTurn != event.player) {
logger.atError {
message = "Inconsistent player turn. CurrentPlayerTurn: $state.currentPlayerTurn | Player: ${event.player}"
message = "Inconsistent player turn"
payload =
mapOf(
"CurrentPlayerTurn" to (state.currentPlayerTurn ?: "No currentPlayerTurn"),

View File

@@ -5,6 +5,7 @@ import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStream
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.concurrent.ConcurrentHashMap
@@ -176,20 +177,21 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
aggregateId: ID,
eventsToApply: Set<E>,
): P =
eventsToApply
.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure)
eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure)
/**
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
*/
private val applyToProjectionSecure: P.(event: E) -> P = { event ->
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")
withLoggingContext("event" to event.toString(), "projection" to this.toString()) {
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")
}
}
}
}

View File

@@ -24,6 +24,7 @@ import eventDemo.app.notification.TheGameWasStartedNotification
import eventDemo.app.notification.WelcomeToTheGameNotification
import eventDemo.app.notification.YourNewCardNotification
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.trySendBlocking
@@ -38,106 +39,104 @@ class PlayerNotificationEventListener(
currentPlayer: Player,
) {
eventBus.subscribe { event: GameEvent ->
val currentState = gameStateRepository.getUntil(event)
withLoggingContext("event" to event.toString()) {
val currentState = gameStateRepository.getUntil(event)
fun Notification.send() {
if (currentState.players.contains(currentPlayer)) {
// Only notify players who have already joined the game.
outgoingNotificationChannel.trySendBlocking(this)
logger.atInfo {
message = "Notification for player ${currentPlayer.name} was SEND: ${this@send}"
payload = mapOf("notification" to this@send, "event" to event)
}
} else {
// Rare use case, when a connexion is created with the channel,
// but the player was not already join in the game
logger.atWarn {
message = "Notification for player ${currentPlayer.name} was SKIP, No player on the game: ${this@send}"
payload = mapOf("notification" to this@send, "event" to event)
}
}
}
fun sendNextTurnNotif() =
ItsTheTurnOfNotification(
player = currentState.currentPlayerTurn ?: error("No player turn defined"),
).send()
when (event) {
is NewPlayerEvent -> {
if (currentPlayer != event.player) {
PlayerAsJoinTheGameNotification(
player = event.player,
).send()
} else {
WelcomeToTheGameNotification(
players = currentState.players,
).send()
fun Notification.send() {
withLoggingContext("notification" to this.toString()) {
if (currentState.players.contains(currentPlayer)) {
// Only notify players who have already joined the game.
outgoingNotificationChannel.trySendBlocking(this)
logger.info { "Notification was SEND" }
} else {
// Rare use case, when a connexion is created with the channel,
// but the player was not already join in the game
logger.warn { "Notification was SKIP, no player on the game" }
}
}
}
is CardIsPlayedEvent -> {
if (currentPlayer != event.player) {
PlayerAsPlayACardNotification(
player = event.player,
card = event.card,
).send()
}
if (event.card !is Card.AllColorCard) {
ItsTheTurnOfNotification(
player = currentState.currentPlayerTurn ?: error("No player turn defined"),
).send()
}
}
is GameStartedEvent -> {
TheGameWasStartedNotification(
hand =
event.deck.playersHands.getHand(currentPlayer)
?: error("You are not in the game"),
fun sendNextTurnNotif() =
ItsTheTurnOfNotification(
player = currentState.currentPlayerTurn ?: error("No player turn defined"),
).send()
sendNextTurnNotif()
}
is PlayerChoseColorEvent -> {
if (currentPlayer != event.player) {
PlayerWasChoseTheCardColorNotification(
player = event.player,
color = event.color,
).send()
when (event) {
is NewPlayerEvent -> {
if (currentPlayer != event.player) {
PlayerAsJoinTheGameNotification(
player = event.player,
).send()
} else {
WelcomeToTheGameNotification(
players = currentState.players,
).send()
}
}
sendNextTurnNotif()
}
is CardIsPlayedEvent -> {
if (currentPlayer != event.player) {
PlayerAsPlayACardNotification(
player = event.player,
card = event.card,
).send()
}
is PlayerHavePassEvent -> {
if (currentPlayer == event.player) {
YourNewCardNotification(
card = event.takenCard,
if (event.card !is Card.AllColorCard) {
ItsTheTurnOfNotification(
player = currentState.currentPlayerTurn ?: error("No player turn defined"),
).send()
}
}
is GameStartedEvent -> {
TheGameWasStartedNotification(
hand =
event.deck.playersHands.getHand(currentPlayer)
?: error("You are not in the game"),
).send()
} else {
PlayerHavePassNotification(
sendNextTurnNotif()
}
is PlayerChoseColorEvent -> {
if (currentPlayer != event.player) {
PlayerWasChoseTheCardColorNotification(
player = event.player,
color = event.color,
).send()
}
sendNextTurnNotif()
}
is PlayerHavePassEvent -> {
if (currentPlayer == event.player) {
YourNewCardNotification(
card = event.takenCard,
).send()
} else {
PlayerHavePassNotification(
player = event.player,
).send()
}
sendNextTurnNotif()
}
is PlayerReadyEvent -> {
if (currentPlayer != event.player) {
PlayerWasReadyNotification(
player = event.player,
).send()
}
}
is PlayerWinEvent -> {
PlayerWinNotification(
player = event.player,
).send()
}
sendNextTurnNotif()
}
is PlayerReadyEvent -> {
if (currentPlayer != event.player) {
PlayerWasReadyNotification(
player = event.player,
).send()
}
}
is PlayerWinEvent -> {
PlayerWinNotification(
player = event.player,
).send()
}
}
}

View File

@@ -9,6 +9,7 @@ import eventDemo.app.event.event.PlayerWinEvent
import eventDemo.app.event.projection.GameState
import eventDemo.app.event.projection.GameStateRepository
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
class ReactionEventListener(
private val eventBus: GameEventBus,
@@ -24,13 +25,15 @@ class ReactionEventListener(
fun init() {
eventBus.subscribe(priority) { event: GameEvent ->
val state = gameStateRepository.getUntil(event)
sendStartGameEvent(state, event)
sendWinnerEvent(state, event)
withLoggingContext("event" to event.toString()) {
val state = gameStateRepository.getUntil(event)
sendStartGameEvent(state, event)
sendWinnerEvent(state)
}
}
}
private suspend fun sendStartGameEvent(
private fun sendStartGameEvent(
state: GameState,
event: GameEvent,
) {
@@ -44,12 +47,8 @@ class ReactionEventListener(
)
}
logger.atInfo {
message = "Reaction event was Send $reactionEvent on reaction of: $event"
payload =
mapOf(
"event" to event,
"reactionEvent" to reactionEvent,
)
message = "Reaction event was Send"
payload = mapOf("reactionEvent" to reactionEvent)
}
} else {
if (event is PlayerReadyEvent) {
@@ -58,10 +57,7 @@ class ReactionEventListener(
}
}
private fun sendWinnerEvent(
state: GameState,
event: GameEvent,
) {
private fun sendWinnerEvent(state: GameState) {
val winner = state.playerHasNoCardLeft().firstOrNull()
if (winner != null) {
val reactionEvent =
@@ -74,12 +70,8 @@ class ReactionEventListener(
}
logger.atInfo {
message = "Reaction event was Send $reactionEvent on reaction of: $event"
payload =
mapOf(
"event" to event,
"reactionEvent" to reactionEvent,
)
message = "Reaction event was Send"
payload = mapOf("reactionEvent" to reactionEvent)
}
}
}

View File

@@ -1,24 +0,0 @@
package eventDemo.app.eventListener
import eventDemo.app.notification.CommandSuccessNotification
import eventDemo.app.notification.Notification
import eventDemo.libs.command.CommandId
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.SendChannel
private fun SendChannel<Notification>.successNotifier(commandId: CommandId): suspend () -> Unit =
{
val logger = KotlinLogging.logger { }
CommandSuccessNotification(commandId = commandId)
.let { notification ->
logger.atDebug {
message = "Notification SUCCESS sent"
payload =
mapOf(
"notification" to notification,
"commandId" to commandId,
)
}
send(notification)
}
}

View File

@@ -1,6 +1,7 @@
package eventDemo.libs.command
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.coroutines.channels.ReceiveChannel
/**
@@ -20,15 +21,14 @@ class CommandStreamChannel<C : Command>(
action: CommandBlock<C>,
) {
for (command in incoming) {
try {
controller.runOnlyOnce(command) {
// Wrap action to add logs
runAndLogStatus(command, action)
}
} catch (e: CommandRunnerController.Exception) {
logger.atWarn {
message = e.message
payload = mapOf("command" to command)
withLoggingContext("command" to command.toString()) {
try {
controller.runOnlyOnce(command) {
// Wrap action to add logs
runAndLogStatus(command, action)
}
} catch (e: CommandRunnerController.Exception) {
logger.warn { e.message }
}
}
}
@@ -40,16 +40,9 @@ class CommandStreamChannel<C : Command>(
) {
val actionResult = runCatching { action(command) }
if (actionResult.isFailure) {
logger.atWarn {
message = "Compute command FAILED: $command"
payload = mapOf("command" to command)
cause = actionResult.exceptionOrNull()
}
logger.warn(actionResult.exceptionOrNull()) { "Compute command FAILED" }
} else if (actionResult.isSuccess) {
logger.atInfo {
message = "Compute command SUCCESS: $command"
payload = mapOf("command" to command)
}
logger.info { "Compute command SUCCESS" }
}
}
}

View File

@@ -1,5 +1,6 @@
package eventDemo.libs.event
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.coroutines.runBlocking
class EventBusInMemory<E : Event<ID>, ID : AggregateId> : EventBus<E, ID> {
@@ -10,7 +11,9 @@ class EventBusInMemory<E : Event<ID>, ID : AggregateId> : EventBus<E, ID> {
.sortedByDescending { (priority, _) -> priority }
.forEach { (_, block) ->
runBlocking {
block(event)
withLoggingContext("event" to event.toString()) {
block(event)
}
}
}
}

View File

@@ -1,6 +1,7 @@
package eventDemo.libs.event
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import java.util.Queue
import java.util.concurrent.ConcurrentLinkedQueue
@@ -16,15 +17,16 @@ class EventStreamInMemory<E : Event<*>> : EventStream<E> {
override fun publish(event: E) {
if (events.none { it.eventId == event.eventId }) {
events.add(event)
logger.atInfo {
message = "Event published: $event"
payload = mapOf("event" to event)
}
logger.info { "Event published" }
}
}
override fun publish(vararg events: E) {
events.forEach { publish(it) }
events.forEach {
withLoggingContext("event" to it.toString()) {
publish(it)
}
}
}
override fun readAll(): Set<E> =

View File

@@ -1,6 +1,7 @@
package eventDemo.libs.event
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
@@ -9,9 +10,11 @@ class VersionBuilderLocal : VersionBuilder {
private val versions: ConcurrentHashMap<AggregateId, AtomicInteger> = ConcurrentHashMap()
override fun buildNextVersion(aggregateId: AggregateId): Int =
versionOfAggregate(aggregateId)
.addAndGet(1)
.also { logger.debug { "New version $it" } }
withLoggingContext("aggregateId" to aggregateId.toString()) {
versionOfAggregate(aggregateId)
.addAndGet(1)
.also { logger.debug { "New event version $it" } }
}
override fun getLastVersion(aggregateId: AggregateId): Int =
versionOfAggregate(aggregateId).toInt()

View File

@@ -1,7 +1,7 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n > MDC=%mdc%n</pattern>
</encoder>
</appender>
<root level="trace">