diff --git a/src/main/kotlin/application/Application.kt b/src/main/kotlin/application/Application.kt index 4dc2a9f..f479d39 100644 --- a/src/main/kotlin/application/Application.kt +++ b/src/main/kotlin/application/Application.kt @@ -3,7 +3,7 @@ package fr.dcproject.application import com.fasterxml.jackson.core.util.DefaultIndenter import com.fasterxml.jackson.core.util.DefaultPrettyPrinter import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.databind.PropertyNamingStrategy +import com.fasterxml.jackson.databind.PropertyNamingStrategies import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.datatype.joda.JodaModule import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException @@ -33,7 +33,7 @@ import fr.dcproject.component.vote.routes.installVoteRoutes import fr.dcproject.component.vote.voteKoinModule import fr.dcproject.component.workgroup.routes.installWorkgroupRoutes import fr.dcproject.component.workgroup.workgroupKoinModule -import fr.dcproject.event.EventNotification +import fr.dcproject.notification.EventNotification import fr.dcproject.routes.definition import fr.dcproject.routes.notificationArticle import fr.dcproject.security.AccessDeniedException @@ -123,7 +123,7 @@ fun Application.module(env: Env = PROD) { masking = false } - EventNotification(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName, get()).config() + EventNotification(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName).config() install(Authentication, jwtInstallation(get())) @@ -131,7 +131,7 @@ fun Application.module(env: Env = PROD) { install(ContentNegotiation) { jackson { - propertyNamingStrategy = PropertyNamingStrategy.SNAKE_CASE + propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE registerModule(JodaModule()) disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) diff --git a/src/main/kotlin/application/KoinModule.kt b/src/main/kotlin/application/KoinModule.kt index 1fd748e..4869330 100644 --- a/src/main/kotlin/application/KoinModule.kt +++ b/src/main/kotlin/application/KoinModule.kt @@ -8,7 +8,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.datatype.joda.JodaModule import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.rabbitmq.client.ConnectionFactory -import fr.dcproject.event.publisher.Publisher +import fr.dcproject.notification.publisher.Publisher import fr.dcproject.messages.Mailer import fr.dcproject.messages.NotificationEmailSender import fr.postgresjson.connexion.Connection @@ -78,7 +78,7 @@ val KoinModule = module { // Mailer single { Mailer(Configuration.sendGridKey) } - single { Publisher(get(), get(), exchangeName = Configuration.exchangeNotificationName) } + single { Publisher(factory = get(), exchangeName = Configuration.exchangeNotificationName) } single { NotificationEmailSender(get(), Configuration.domain, get(), get()) } } diff --git a/src/main/kotlin/component/article/routes/UpsertArticle.kt b/src/main/kotlin/component/article/routes/UpsertArticle.kt index 6b12127..77a14e0 100644 --- a/src/main/kotlin/component/article/routes/UpsertArticle.kt +++ b/src/main/kotlin/component/article/routes/UpsertArticle.kt @@ -8,8 +8,8 @@ import fr.dcproject.component.article.routes.UpsertArticle.UpsertArticleRequest. import fr.dcproject.component.auth.citizen import fr.dcproject.component.auth.citizenOrNull import fr.dcproject.component.workgroup.WorkgroupRef -import fr.dcproject.event.ArticleUpdate -import fr.dcproject.event.publisher.Publisher +import fr.dcproject.notification.ArticleUpdateNotification +import fr.dcproject.notification.publisher.Publisher import fr.dcproject.security.assert import io.ktor.application.ApplicationCall import io.ktor.application.call @@ -59,7 +59,7 @@ object UpsertArticle { ac.assert { canUpsert(article, citizenOrNull) } val newArticle: ArticleForView = repo.upsert(article) ?: error("Article not updated") call.respond(newArticle) - publisher.publish(ArticleUpdate(newArticle)) + publisher.publish(ArticleUpdateNotification(newArticle)) } } } diff --git a/src/main/kotlin/event/EventSubscriber.kt b/src/main/kotlin/event/EventSubscriber.kt deleted file mode 100644 index 638eb04..0000000 --- a/src/main/kotlin/event/EventSubscriber.kt +++ /dev/null @@ -1,58 +0,0 @@ -package fr.dcproject.event - -import fr.postgresjson.entity.Serializable -import fr.postgresjson.entity.UuidEntity -import io.ktor.application.Application -import io.ktor.application.ApplicationEvents -import io.ktor.application.ApplicationFeature -import io.ktor.application.EventDefinition -import io.ktor.application.EventHandler -import io.ktor.util.AttributeKey -import io.ktor.util.KtorExperimentalAPI -import kotlinx.coroutines.DisposableHandle -import org.joda.time.DateTime -import kotlin.random.Random.Default.nextInt - -open class Event( - val type: String, - val createdAt: DateTime = DateTime.now() -) : Serializable { - val id: Double = randId(createdAt.millis) - - private fun randId(time: Long): Double { - return (time.toString() + nextInt(1000, 9999).toString()).toDouble() - } -} - -open class EntityEvent( - val target: UuidEntity, - type: String, - val action: String -) : Event(type) - -/** - * Installation Class - */ -class EventSubscriber { - class Configuration(private val monitor: ApplicationEvents) { - private val subscribers = mutableListOf() - fun subscribe(definition: EventDefinition, handler: EventHandler): DisposableHandle { - return monitor.subscribe(definition, handler).also { - subscribers.add(it) - } - } - } - - companion object Feature : ApplicationFeature { - override val key = AttributeKey("EventSubscriber") - - @KtorExperimentalAPI - override fun install( - pipeline: Application, - configure: Configuration.() -> Unit - ): EventSubscriber { - Configuration(pipeline.environment.monitor).apply(configure) - return EventSubscriber() - } - } -} diff --git a/src/main/kotlin/event/EventNotification.kt b/src/main/kotlin/notification/EventNotification.kt similarity index 60% rename from src/main/kotlin/event/EventNotification.kt rename to src/main/kotlin/notification/EventNotification.kt index ad4e193..b8106c6 100644 --- a/src/main/kotlin/event/EventNotification.kt +++ b/src/main/kotlin/notification/EventNotification.kt @@ -1,7 +1,11 @@ -package fr.dcproject.event +package fr.dcproject.notification import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.PropertyNamingStrategies +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.datatype.joda.JodaModule +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.rabbitmq.client.AMQP import com.rabbitmq.client.BuiltinExchangeType.DIRECT @@ -15,24 +19,56 @@ import fr.dcproject.component.citizen.CitizenRef import fr.dcproject.component.follow.FollowArticleRepository import fr.dcproject.component.follow.FollowConstitutionRepository import fr.dcproject.component.follow.FollowSimple -import fr.dcproject.event.publisher.Publisher +import fr.dcproject.notification.publisher.Publisher import fr.dcproject.messages.NotificationEmailSender -import io.ktor.application.EventDefinition +import fr.postgresjson.entity.UuidEntity import io.ktor.utils.io.errors.IOException import io.lettuce.core.api.async.RedisAsyncCommands import kotlinx.coroutines.flow.collect import kotlinx.coroutines.runBlocking +import org.joda.time.DateTime import org.slf4j.Logger import org.slf4j.LoggerFactory +import kotlin.random.Random + +open class Notification( + val type: String, + val createdAt: DateTime = DateTime.now() +) { + val id: Double = randId(createdAt.millis) + + private fun randId(time: Long): Double { + return (time.toString() + Random.nextInt(1000, 9999).toString()).toDouble() + } + + fun serialize(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification") + + fun toByteArray() = serialize().toByteArray() -class ArticleUpdate( - target: ArticleForView -) : EntityEvent(target, "article", "update") { companion object { - val event = EventDefinition() + val mapper = jacksonObjectMapper().apply { + registerModule(SimpleModule()) + propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE + + registerModule(JodaModule()) + disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + } + + inline fun deserialize(raw: String): T = mapper.readValue(raw) } } +open class EntityNotification( + val target: UuidEntity, + type: String, + val action: String +) : Notification(type) + +class ArticleUpdateNotification( + target: ArticleForView +) : EntityNotification(target, "article", "update") + class EventNotification( private val rabbitFactory: ConnectionFactory, private val redis: RedisAsyncCommands, @@ -40,11 +76,7 @@ class EventNotification( private val followArticleRepo: FollowArticleRepository, private val notificationEmailSender: NotificationEmailSender, private val exchangeName: String, - mapper: ObjectMapper, ) { - private val mapper: ObjectMapper = mapper.copy() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName) fun config() { @@ -70,11 +102,11 @@ class EventNotification( properties: AMQP.BasicProperties, body: ByteArray ) = runBlocking { - decodeEvent(body) { + decodeMessage(body) { redis.zadd( "notification:${it.follow.createdBy.id}", it.event.id, - it.rawEvent + it.rawMessage ) } @@ -91,7 +123,7 @@ class EventNotification( body: ByteArray ) { runBlocking { - decodeEvent(body) { + decodeMessage(body) { notificationEmailSender.sendEmail(it.follow) logger.debug("EmailSend to: ${it.follow.createdBy.id}") } @@ -104,21 +136,21 @@ class EventNotification( rabbitChannel.basicConsume("email", false, consumerEmail) } - private suspend fun decodeEvent(body: ByteArray, action: suspend (Msg) -> Unit) { - val rawEvent: String = body.toString(Charsets.UTF_8) - val event: EntityEvent = mapper.readValue(rawEvent) ?: error("Unable to deserialize event message from rabbit") - val targets = when (event.type) { - "article" -> followArticleRepo.findFollowsByTarget(event.target) - "constitution" -> followConstitutionRepo.findFollowsByTarget(event.target) - else -> error("event '${event.type}' not implemented") + private suspend fun decodeMessage(body: ByteArray, action: suspend (DecodedMessage) -> Unit) { + val rawMessage: String = body.toString(Charsets.UTF_8) + val notification: EntityNotification = Notification.deserialize(rawMessage) ?: error("Unable to deserialize notification message from rabbit") + val follows = when (notification.type) { + "article" -> followArticleRepo.findFollowsByTarget(notification.target) + "constitution" -> followConstitutionRepo.findFollowsByTarget(notification.target) + else -> error("event '${notification.type}' not implemented") } - targets.collect { action(Msg(event, rawEvent, it)) } + follows.collect { action(DecodedMessage(notification, rawMessage, it)) } } - private class Msg( - val event: EntityEvent, - val rawEvent: String, + private class DecodedMessage( + val event: EntityNotification, + val rawMessage: String, val follow: FollowSimple ) } diff --git a/src/main/kotlin/event/publisher/Publisher.kt b/src/main/kotlin/notification/publisher/Publisher.kt similarity index 65% rename from src/main/kotlin/event/publisher/Publisher.kt rename to src/main/kotlin/notification/publisher/Publisher.kt index c74453f..d4df7af 100644 --- a/src/main/kotlin/event/publisher/Publisher.kt +++ b/src/main/kotlin/notification/publisher/Publisher.kt @@ -1,8 +1,7 @@ -package fr.dcproject.event.publisher +package fr.dcproject.notification.publisher -import com.fasterxml.jackson.databind.ObjectMapper import com.rabbitmq.client.ConnectionFactory -import fr.dcproject.event.EntityEvent +import fr.dcproject.notification.EntityNotification import kotlinx.coroutines.Deferred import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope @@ -10,12 +9,11 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory class Publisher( - private val mapper: ObjectMapper, private val factory: ConnectionFactory, private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName), private val exchangeName: String, ) { - suspend fun publish(it: T): Deferred = coroutineScope { + suspend fun publish(it: T): Deferred = coroutineScope { async { factory.newConnection().use { connection -> connection.createChannel().use { channel -> @@ -25,8 +23,4 @@ class Publisher( } } } - - private fun EntityEvent.serialize(): String { - return mapper.writeValueAsString(this) ?: error("Unable tu serialize message") - } } diff --git a/src/main/kotlin/routes/Notification.kt b/src/main/kotlin/routes/Notification.kt index 930b5d9..b968a58 100644 --- a/src/main/kotlin/routes/Notification.kt +++ b/src/main/kotlin/routes/Notification.kt @@ -1,9 +1,7 @@ package fr.dcproject.routes import fr.dcproject.component.auth.citizen -import fr.dcproject.event.Event -import fr.postgresjson.serializer.deserialize -import io.ktor.client.HttpClient +import fr.dcproject.notification.Notification import io.ktor.http.cio.websocket.Frame import io.ktor.http.cio.websocket.readText import io.ktor.locations.KtorExperimentalLocationsAPI @@ -17,6 +15,7 @@ import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.launch +import org.slf4j.LoggerFactory @ExperimentalCoroutinesApi @KtorExperimentalLocationsAPI @@ -26,15 +25,19 @@ fun Route.notificationArticle(redis: RedisAsyncCommands, client: launch { incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect { - val notificationMessage = it.readText().deserialize() ?: error("unable to deserialize message") - - redis.zremrangebyscore( - "notification:$citizenId", - Range.from( - Range.Boundary.including(notificationMessage.id), - Range.Boundary.including(notificationMessage.id) + try { + val notificationMessage: Notification = Notification.deserialize(it.readText()) + redis.zremrangebyscore( + "notification:$citizenId", + Range.from( + Range.Boundary.including(notificationMessage.id), + Range.Boundary.including(notificationMessage.id) + ) ) - ) + } catch (e: Throwable) { + LoggerFactory.getLogger(Route::class.qualifiedName) + .error("Unable to deserialize notification") + } } } diff --git a/src/test/kotlin/functional/EventNotificationTest.kt b/src/test/kotlin/functional/EventNotificationTest.kt index 87fba6e..23a4683 100644 --- a/src/test/kotlin/functional/EventNotificationTest.kt +++ b/src/test/kotlin/functional/EventNotificationTest.kt @@ -1,11 +1,5 @@ package functional -import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.databind.PropertyNamingStrategies -import com.fasterxml.jackson.databind.SerializationFeature -import com.fasterxml.jackson.databind.module.SimpleModule -import com.fasterxml.jackson.datatype.joda.JodaModule -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.rabbitmq.client.ConnectionFactory import fr.dcproject.application.Configuration import fr.dcproject.component.article.ArticleForView @@ -13,9 +7,9 @@ import fr.dcproject.component.article.ArticleRef import fr.dcproject.component.citizen.CitizenRef import fr.dcproject.component.follow.FollowArticleRepository import fr.dcproject.component.follow.FollowSimple -import fr.dcproject.event.ArticleUpdate -import fr.dcproject.event.EventNotification -import fr.dcproject.event.publisher.Publisher +import fr.dcproject.notification.ArticleUpdateNotification +import fr.dcproject.notification.EventNotification +import fr.dcproject.notification.publisher.Publisher import fr.dcproject.messages.NotificationEmailSender import io.ktor.locations.KtorExperimentalLocationsAPI import io.ktor.util.KtorExperimentalAPI @@ -63,14 +57,7 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() { ).let { emit(it) } } } - val mapper = jacksonObjectMapper().apply { - registerModule(SimpleModule()) - propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE - registerModule(JodaModule()) - disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) - configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true) - } /* Purge rabbit notification queues */ rabbitFactory.newConnection().createChannel().apply { queuePurge("push") @@ -85,17 +72,15 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() { followConstitutionRepo = mockk(), notificationEmailSender = emailSender, exchangeName = "notification_test", - mapper = mapper, ).config() verify { rabbitFactory.newConnection() } /* Push message */ Publisher( - mapper = mapper, factory = rabbitFactory, exchangeName = "notification_test", ).publish( - ArticleUpdate( + ArticleUpdateNotification( ArticleForView( title = "MyTitle", content = "myContent", @@ -106,7 +91,7 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() { ).await() /* Wait to receive message */ - delay(300) + delay(1000) /* Check if notifications sent */ verify { followArticleRepo.findFollowsByTarget(any()) }