remove raiseEvent for notifications
Add Test for EventNotification Add application.conf for test
This commit is contained in:
@@ -34,7 +34,6 @@ import fr.dcproject.component.vote.voteKoinModule
|
||||
import fr.dcproject.component.workgroup.routes.installWorkgroupRoutes
|
||||
import fr.dcproject.component.workgroup.workgroupKoinModule
|
||||
import fr.dcproject.event.EventNotification
|
||||
import fr.dcproject.event.EventSubscriber
|
||||
import fr.dcproject.routes.definition
|
||||
import fr.dcproject.routes.notificationArticle
|
||||
import fr.dcproject.security.AccessDeniedException
|
||||
@@ -124,9 +123,7 @@ fun Application.module(env: Env = PROD) {
|
||||
masking = false
|
||||
}
|
||||
|
||||
install(EventSubscriber) {
|
||||
EventNotification(this, get(), get(), get(), get(), get(), Configuration.exchangeNotificationName).config()
|
||||
}
|
||||
EventNotification(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName, get()).config()
|
||||
|
||||
install(Authentication, jwtInstallation(get()))
|
||||
|
||||
|
||||
@@ -8,9 +8,8 @@ import fr.dcproject.component.article.routes.UpsertArticle.UpsertArticleRequest.
|
||||
import fr.dcproject.component.auth.citizen
|
||||
import fr.dcproject.component.auth.citizenOrNull
|
||||
import fr.dcproject.component.workgroup.WorkgroupRef
|
||||
import fr.dcproject.component.workgroup.WorkgroupRepository
|
||||
import fr.dcproject.event.ArticleUpdate
|
||||
import fr.dcproject.event.raiseEvent
|
||||
import fr.dcproject.event.publisher.Publisher
|
||||
import fr.dcproject.security.assert
|
||||
import io.ktor.application.ApplicationCall
|
||||
import io.ktor.application.call
|
||||
@@ -35,11 +34,11 @@ object UpsertArticle {
|
||||
val tags: List<String> = emptyList(),
|
||||
val draft: Boolean = false,
|
||||
val versionId: UUID,
|
||||
val workgroup: WorkgroupRef? = null
|
||||
val workgroup: WorkgroupRef? = null,
|
||||
)
|
||||
}
|
||||
|
||||
fun Route.upsertArticle(repo: ArticleRepository, workgroupRepository: WorkgroupRepository, ac: ArticleAccessControl) {
|
||||
fun Route.upsertArticle(repo: ArticleRepository, publisher: Publisher, ac: ArticleAccessControl) {
|
||||
suspend fun ApplicationCall.convertRequestToEntity(): ArticleForUpdate = receive<Input>().run {
|
||||
ArticleForUpdate(
|
||||
id = id ?: UUID.randomUUID(),
|
||||
@@ -60,7 +59,7 @@ object UpsertArticle {
|
||||
ac.assert { canUpsert(article, citizenOrNull) }
|
||||
val newArticle: ArticleForView = repo.upsert(article) ?: error("Article not updated")
|
||||
call.respond(newArticle)
|
||||
raiseEvent(ArticleUpdate.event, ArticleUpdate(newArticle))
|
||||
publisher.publish(ArticleUpdate(newArticle))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package fr.dcproject.event
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import com.rabbitmq.client.AMQP
|
||||
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
|
||||
import com.rabbitmq.client.ConnectionFactory
|
||||
@@ -9,24 +12,18 @@ import com.rabbitmq.client.Envelope
|
||||
import fr.dcproject.common.entity.TargetRef
|
||||
import fr.dcproject.component.article.ArticleForView
|
||||
import fr.dcproject.component.citizen.CitizenRef
|
||||
import fr.dcproject.component.follow.FollowRepository
|
||||
import fr.dcproject.component.follow.FollowArticleRepository
|
||||
import fr.dcproject.component.follow.FollowConstitutionRepository
|
||||
import fr.dcproject.component.follow.FollowSimple
|
||||
import fr.dcproject.event.publisher.Publisher
|
||||
import fr.dcproject.messages.NotificationEmailSender
|
||||
import fr.postgresjson.serializer.deserialize
|
||||
import io.ktor.application.ApplicationCall
|
||||
import io.ktor.application.EventDefinition
|
||||
import io.ktor.application.application
|
||||
import io.ktor.util.pipeline.PipelineContext
|
||||
import io.ktor.utils.io.errors.IOException
|
||||
import io.lettuce.core.api.async.RedisAsyncCommands
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import fr.dcproject.component.follow.FollowArticleRepository as FollowArticleRepository
|
||||
|
||||
class ArticleUpdate(
|
||||
target: ArticleForView
|
||||
@@ -36,18 +33,18 @@ class ArticleUpdate(
|
||||
}
|
||||
}
|
||||
|
||||
fun <T : Event> PipelineContext<Unit, ApplicationCall>.raiseEvent(definition: EventDefinition<T>, value: T) =
|
||||
application.environment.monitor.raise(definition, value)
|
||||
|
||||
class EventNotification(
|
||||
private val config: EventSubscriber.Configuration,
|
||||
private val rabbitFactory: ConnectionFactory,
|
||||
private val redis: RedisAsyncCommands<String, String>,
|
||||
private val followRepo: FollowArticleRepository,
|
||||
private val publisher: Publisher,
|
||||
private val followConstitutionRepo: FollowConstitutionRepository,
|
||||
private val followArticleRepo: FollowArticleRepository,
|
||||
private val notificationEmailSender: NotificationEmailSender,
|
||||
private val exchangeName: String,
|
||||
mapper: ObjectMapper,
|
||||
) {
|
||||
private val mapper: ObjectMapper = mapper.copy()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
|
||||
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName)
|
||||
|
||||
fun config() {
|
||||
@@ -62,70 +59,61 @@ class EventNotification(
|
||||
}
|
||||
}
|
||||
|
||||
/* Declare publisher on event */
|
||||
config.subscribe(ArticleUpdate.event) {
|
||||
publisher.publish(it)
|
||||
/* Define Consumer */
|
||||
val rabbitChannel = rabbitFactory.newConnection().createChannel()
|
||||
|
||||
val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) {
|
||||
@Throws(IOException::class)
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
envelope: Envelope,
|
||||
properties: AMQP.BasicProperties,
|
||||
body: ByteArray
|
||||
) = runBlocking {
|
||||
decodeEvent(body) {
|
||||
redis.zadd(
|
||||
"notification:${it.follow.createdBy.id}",
|
||||
it.event.id,
|
||||
it.rawEvent
|
||||
)
|
||||
}
|
||||
|
||||
rabbitChannel.basicAck(envelope.deliveryTag, false)
|
||||
}
|
||||
}
|
||||
|
||||
/* Launch Consumer */
|
||||
GlobalScope.launch {
|
||||
val rabbitChannel = rabbitFactory.newConnection().createChannel()
|
||||
|
||||
val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) {
|
||||
@Throws(IOException::class)
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
envelope: Envelope,
|
||||
properties: AMQP.BasicProperties,
|
||||
body: ByteArray
|
||||
) = runBlocking {
|
||||
val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) {
|
||||
@Throws(IOException::class)
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
envelope: Envelope,
|
||||
properties: AMQP.BasicProperties,
|
||||
body: ByteArray
|
||||
) {
|
||||
runBlocking {
|
||||
decodeEvent(body) {
|
||||
redis.zadd(
|
||||
"notification:${follow.createdBy.id}",
|
||||
event.id,
|
||||
rawEvent
|
||||
)
|
||||
notificationEmailSender.sendEmail(it.follow)
|
||||
logger.debug("EmailSend to: ${it.follow.createdBy.id}")
|
||||
}
|
||||
|
||||
rabbitChannel.basicAck(envelope.deliveryTag, false)
|
||||
}
|
||||
rabbitChannel.basicAck(envelope.deliveryTag, false)
|
||||
}
|
||||
|
||||
val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) {
|
||||
@Throws(IOException::class)
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
envelope: Envelope,
|
||||
properties: AMQP.BasicProperties,
|
||||
body: ByteArray
|
||||
) {
|
||||
runBlocking {
|
||||
decodeEvent(body) {
|
||||
logger.debug("EmailSend to: ${follow.createdBy.id}")
|
||||
notificationEmailSender.sendEmail(follow)
|
||||
}
|
||||
}
|
||||
rabbitChannel.basicAck(envelope.deliveryTag, false)
|
||||
}
|
||||
}
|
||||
rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket
|
||||
rabbitChannel.basicConsume("email", false, consumerEmail)
|
||||
}
|
||||
/* Launch Consumer */
|
||||
rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket
|
||||
rabbitChannel.basicConsume("email", false, consumerEmail)
|
||||
}
|
||||
|
||||
private suspend fun decodeEvent(body: ByteArray, action: suspend Msg.() -> Unit) {
|
||||
val rawEvent = body.toString(Charsets.UTF_8)
|
||||
val event = rawEvent.deserialize<EntityEvent>() ?: error("Unable to unserialise event message from rabbit")
|
||||
val repo = when (event.type) {
|
||||
"article" -> followRepo
|
||||
private suspend fun decodeEvent(body: ByteArray, action: suspend (Msg) -> Unit) {
|
||||
val rawEvent: String = body.toString(Charsets.UTF_8)
|
||||
val event: EntityEvent = mapper.readValue(rawEvent) ?: error("Unable to deserialize event message from rabbit")
|
||||
val targets = when (event.type) {
|
||||
"article" -> followArticleRepo.findFollowsByTarget(event.target)
|
||||
"constitution" -> followConstitutionRepo.findFollowsByTarget(event.target)
|
||||
else -> error("event '${event.type}' not implemented")
|
||||
} as FollowRepository<*, *>
|
||||
}
|
||||
|
||||
repo
|
||||
.findFollowsByTarget(event.target)
|
||||
.collect {
|
||||
Msg(event, rawEvent, it).action()
|
||||
}
|
||||
targets.collect { action(Msg(event, rawEvent, it)) }
|
||||
}
|
||||
|
||||
private class Msg(
|
||||
|
||||
@@ -3,9 +3,9 @@ package fr.dcproject.event.publisher
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.rabbitmq.client.ConnectionFactory
|
||||
import fr.dcproject.event.EntityEvent
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.Deferred
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
@@ -15,8 +15,8 @@ class Publisher(
|
||||
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName),
|
||||
private val exchangeName: String,
|
||||
) {
|
||||
fun <T : EntityEvent> publish(it: T): Job {
|
||||
return GlobalScope.launch {
|
||||
suspend fun <T : EntityEvent> publish(it: T): Deferred<Unit> = coroutineScope {
|
||||
async {
|
||||
factory.newConnection().use { connection ->
|
||||
connection.createChannel().use { channel ->
|
||||
channel.basicPublish(exchangeName, "", null, it.serialize().toByteArray())
|
||||
|
||||
Reference in New Issue
Block a user