diff --git a/src/main/kotlin/application/Application.kt b/src/main/kotlin/application/Application.kt index f7e3eef..4dc2a9f 100644 --- a/src/main/kotlin/application/Application.kt +++ b/src/main/kotlin/application/Application.kt @@ -34,7 +34,6 @@ import fr.dcproject.component.vote.voteKoinModule import fr.dcproject.component.workgroup.routes.installWorkgroupRoutes import fr.dcproject.component.workgroup.workgroupKoinModule import fr.dcproject.event.EventNotification -import fr.dcproject.event.EventSubscriber import fr.dcproject.routes.definition import fr.dcproject.routes.notificationArticle import fr.dcproject.security.AccessDeniedException @@ -124,9 +123,7 @@ fun Application.module(env: Env = PROD) { masking = false } - install(EventSubscriber) { - EventNotification(this, get(), get(), get(), get(), get(), Configuration.exchangeNotificationName).config() - } + EventNotification(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName, get()).config() install(Authentication, jwtInstallation(get())) diff --git a/src/main/kotlin/component/article/routes/UpsertArticle.kt b/src/main/kotlin/component/article/routes/UpsertArticle.kt index f4479b0..6b12127 100644 --- a/src/main/kotlin/component/article/routes/UpsertArticle.kt +++ b/src/main/kotlin/component/article/routes/UpsertArticle.kt @@ -8,9 +8,8 @@ import fr.dcproject.component.article.routes.UpsertArticle.UpsertArticleRequest. import fr.dcproject.component.auth.citizen import fr.dcproject.component.auth.citizenOrNull import fr.dcproject.component.workgroup.WorkgroupRef -import fr.dcproject.component.workgroup.WorkgroupRepository import fr.dcproject.event.ArticleUpdate -import fr.dcproject.event.raiseEvent +import fr.dcproject.event.publisher.Publisher import fr.dcproject.security.assert import io.ktor.application.ApplicationCall import io.ktor.application.call @@ -35,11 +34,11 @@ object UpsertArticle { val tags: List = emptyList(), val draft: Boolean = false, val versionId: UUID, - val workgroup: WorkgroupRef? = null + val workgroup: WorkgroupRef? = null, ) } - fun Route.upsertArticle(repo: ArticleRepository, workgroupRepository: WorkgroupRepository, ac: ArticleAccessControl) { + fun Route.upsertArticle(repo: ArticleRepository, publisher: Publisher, ac: ArticleAccessControl) { suspend fun ApplicationCall.convertRequestToEntity(): ArticleForUpdate = receive().run { ArticleForUpdate( id = id ?: UUID.randomUUID(), @@ -60,7 +59,7 @@ object UpsertArticle { ac.assert { canUpsert(article, citizenOrNull) } val newArticle: ArticleForView = repo.upsert(article) ?: error("Article not updated") call.respond(newArticle) - raiseEvent(ArticleUpdate.event, ArticleUpdate(newArticle)) + publisher.publish(ArticleUpdate(newArticle)) } } } diff --git a/src/main/kotlin/event/EventNotification.kt b/src/main/kotlin/event/EventNotification.kt index 9a6ec74..ad4e193 100644 --- a/src/main/kotlin/event/EventNotification.kt +++ b/src/main/kotlin/event/EventNotification.kt @@ -1,5 +1,8 @@ package fr.dcproject.event +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue import com.rabbitmq.client.AMQP import com.rabbitmq.client.BuiltinExchangeType.DIRECT import com.rabbitmq.client.ConnectionFactory @@ -9,24 +12,18 @@ import com.rabbitmq.client.Envelope import fr.dcproject.common.entity.TargetRef import fr.dcproject.component.article.ArticleForView import fr.dcproject.component.citizen.CitizenRef -import fr.dcproject.component.follow.FollowRepository +import fr.dcproject.component.follow.FollowArticleRepository +import fr.dcproject.component.follow.FollowConstitutionRepository import fr.dcproject.component.follow.FollowSimple import fr.dcproject.event.publisher.Publisher import fr.dcproject.messages.NotificationEmailSender -import fr.postgresjson.serializer.deserialize -import io.ktor.application.ApplicationCall import io.ktor.application.EventDefinition -import io.ktor.application.application -import io.ktor.util.pipeline.PipelineContext import io.ktor.utils.io.errors.IOException import io.lettuce.core.api.async.RedisAsyncCommands -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.slf4j.Logger import org.slf4j.LoggerFactory -import fr.dcproject.component.follow.FollowArticleRepository as FollowArticleRepository class ArticleUpdate( target: ArticleForView @@ -36,18 +33,18 @@ class ArticleUpdate( } } -fun PipelineContext.raiseEvent(definition: EventDefinition, value: T) = - application.environment.monitor.raise(definition, value) - class EventNotification( - private val config: EventSubscriber.Configuration, private val rabbitFactory: ConnectionFactory, private val redis: RedisAsyncCommands, - private val followRepo: FollowArticleRepository, - private val publisher: Publisher, + private val followConstitutionRepo: FollowConstitutionRepository, + private val followArticleRepo: FollowArticleRepository, private val notificationEmailSender: NotificationEmailSender, private val exchangeName: String, + mapper: ObjectMapper, ) { + private val mapper: ObjectMapper = mapper.copy() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName) fun config() { @@ -62,70 +59,61 @@ class EventNotification( } } - /* Declare publisher on event */ - config.subscribe(ArticleUpdate.event) { - publisher.publish(it) + /* Define Consumer */ + val rabbitChannel = rabbitFactory.newConnection().createChannel() + + val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) { + @Throws(IOException::class) + override fun handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: ByteArray + ) = runBlocking { + decodeEvent(body) { + redis.zadd( + "notification:${it.follow.createdBy.id}", + it.event.id, + it.rawEvent + ) + } + + rabbitChannel.basicAck(envelope.deliveryTag, false) + } } - /* Launch Consumer */ - GlobalScope.launch { - val rabbitChannel = rabbitFactory.newConnection().createChannel() - - val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) { - @Throws(IOException::class) - override fun handleDelivery( - consumerTag: String, - envelope: Envelope, - properties: AMQP.BasicProperties, - body: ByteArray - ) = runBlocking { + val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) { + @Throws(IOException::class) + override fun handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: ByteArray + ) { + runBlocking { decodeEvent(body) { - redis.zadd( - "notification:${follow.createdBy.id}", - event.id, - rawEvent - ) + notificationEmailSender.sendEmail(it.follow) + logger.debug("EmailSend to: ${it.follow.createdBy.id}") } - - rabbitChannel.basicAck(envelope.deliveryTag, false) } + rabbitChannel.basicAck(envelope.deliveryTag, false) } - - val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) { - @Throws(IOException::class) - override fun handleDelivery( - consumerTag: String, - envelope: Envelope, - properties: AMQP.BasicProperties, - body: ByteArray - ) { - runBlocking { - decodeEvent(body) { - logger.debug("EmailSend to: ${follow.createdBy.id}") - notificationEmailSender.sendEmail(follow) - } - } - rabbitChannel.basicAck(envelope.deliveryTag, false) - } - } - rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket - rabbitChannel.basicConsume("email", false, consumerEmail) } + /* Launch Consumer */ + rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket + rabbitChannel.basicConsume("email", false, consumerEmail) } - private suspend fun decodeEvent(body: ByteArray, action: suspend Msg.() -> Unit) { - val rawEvent = body.toString(Charsets.UTF_8) - val event = rawEvent.deserialize() ?: error("Unable to unserialise event message from rabbit") - val repo = when (event.type) { - "article" -> followRepo + private suspend fun decodeEvent(body: ByteArray, action: suspend (Msg) -> Unit) { + val rawEvent: String = body.toString(Charsets.UTF_8) + val event: EntityEvent = mapper.readValue(rawEvent) ?: error("Unable to deserialize event message from rabbit") + val targets = when (event.type) { + "article" -> followArticleRepo.findFollowsByTarget(event.target) + "constitution" -> followConstitutionRepo.findFollowsByTarget(event.target) else -> error("event '${event.type}' not implemented") - } as FollowRepository<*, *> + } - repo - .findFollowsByTarget(event.target) - .collect { - Msg(event, rawEvent, it).action() - } + targets.collect { action(Msg(event, rawEvent, it)) } } private class Msg( diff --git a/src/main/kotlin/event/publisher/Publisher.kt b/src/main/kotlin/event/publisher/Publisher.kt index a008467..c74453f 100644 --- a/src/main/kotlin/event/publisher/Publisher.kt +++ b/src/main/kotlin/event/publisher/Publisher.kt @@ -3,9 +3,9 @@ package fr.dcproject.event.publisher import com.fasterxml.jackson.databind.ObjectMapper import com.rabbitmq.client.ConnectionFactory import fr.dcproject.event.EntityEvent -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.launch +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -15,8 +15,8 @@ class Publisher( private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName), private val exchangeName: String, ) { - fun publish(it: T): Job { - return GlobalScope.launch { + suspend fun publish(it: T): Deferred = coroutineScope { + async { factory.newConnection().use { connection -> connection.createChannel().use { channel -> channel.basicPublish(exchangeName, "", null, it.serialize().toByteArray()) diff --git a/src/test/kotlin/functional/EventNotificationTest.kt b/src/test/kotlin/functional/EventNotificationTest.kt new file mode 100644 index 0000000..87fba6e --- /dev/null +++ b/src/test/kotlin/functional/EventNotificationTest.kt @@ -0,0 +1,116 @@ +package functional + +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.PropertyNamingStrategies +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.datatype.joda.JodaModule +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.rabbitmq.client.ConnectionFactory +import fr.dcproject.application.Configuration +import fr.dcproject.component.article.ArticleForView +import fr.dcproject.component.article.ArticleRef +import fr.dcproject.component.citizen.CitizenRef +import fr.dcproject.component.follow.FollowArticleRepository +import fr.dcproject.component.follow.FollowSimple +import fr.dcproject.event.ArticleUpdate +import fr.dcproject.event.EventNotification +import fr.dcproject.event.publisher.Publisher +import fr.dcproject.messages.NotificationEmailSender +import io.ktor.locations.KtorExperimentalLocationsAPI +import io.ktor.util.KtorExperimentalAPI +import io.lettuce.core.RedisClient +import io.lettuce.core.api.async.RedisAsyncCommands +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.koin.test.AutoCloseKoinTest +import org.koin.test.KoinTest + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class EventNotificationTest : KoinTest, AutoCloseKoinTest() { + @InternalCoroutinesApi + @KtorExperimentalLocationsAPI + @KtorExperimentalAPI + @ExperimentalCoroutinesApi + @Test + @Tag("functional") + fun `can be send notification`() = runBlocking { + /* Create mocks and spy's */ + val emailSender = mockk() { + every { sendEmail(any()) } returns Unit + } + val redisClient = spyk> { + RedisClient.create(Configuration.redis).connect().async() ?: error("Unable to connect to redis") + } + val rabbitFactory: ConnectionFactory = spyk { + ConnectionFactory().apply { setUri(Configuration.rabbitmq) } + } + val followArticleRepo = mockk { + every { findFollowsByTarget(any()) } returns flow { + FollowSimple( + createdBy = CitizenRef(), + target = ArticleRef(), + ).let { emit(it) } + } + } + val mapper = jacksonObjectMapper().apply { + registerModule(SimpleModule()) + propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE + + registerModule(JodaModule()) + disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true) + } + /* Purge rabbit notification queues */ + rabbitFactory.newConnection().createChannel().apply { + queuePurge("push") + queuePurge("email") + } + + /* Config consumer */ + EventNotification( + rabbitFactory = rabbitFactory, + redis = redisClient, + followArticleRepo = followArticleRepo, + followConstitutionRepo = mockk(), + notificationEmailSender = emailSender, + exchangeName = "notification_test", + mapper = mapper, + ).config() + verify { rabbitFactory.newConnection() } + + /* Push message */ + Publisher( + mapper = mapper, + factory = rabbitFactory, + exchangeName = "notification_test", + ).publish( + ArticleUpdate( + ArticleForView( + title = "MyTitle", + content = "myContent", + description = "myDescription", + createdBy = CitizenRef() + ) + ) + ).await() + + /* Wait to receive message */ + delay(300) + + /* Check if notifications sent */ + verify { followArticleRepo.findFollowsByTarget(any()) } + verify { emailSender.sendEmail(any()) } + verify { redisClient.zadd(any(), any(), any()) } + } +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..8961976 --- /dev/null +++ b/src/test/resources/application.conf @@ -0,0 +1,44 @@ +ktor { + deployment { + port = 8080 + port = ${?PORT} + } + application { + modules = [ fr.dcproject.ApplicationKt.module ] + } +} + +app { + envName = prod + domain = dc-project.fr +} + +db { + host = localhost + host = ${?DB_HOST} + database = test + username = test + password = test + port = 5432 +} + +redis { + connection = "redis://localhost:6379" + connection = ${?REDIS_CONNECTION} +} + +rabbitmq { + connection = "amqp://localhost:5672" + connection = ${?RABBITMQ_CONNECTION} +} + +elasticsearch { + connection = "http://localhost:9200" + connection = ${?ELASTICSEARCH_CONNECTION} +} + +mail { + sendGrid { + key = "abcd" + } +}