diff --git a/.idea/runConfigurations/Test_Follow.xml b/.idea/runConfigurations/Test_Follow.xml index 68c50d3..3c538f3 100644 --- a/.idea/runConfigurations/Test_Follow.xml +++ b/.idea/runConfigurations/Test_Follow.xml @@ -21,7 +21,7 @@ - \ No newline at end of file diff --git a/.idea/runConfigurations/Test_Opinions.xml b/.idea/runConfigurations/Test_Opinions.xml index a6bb955..1322ba8 100644 --- a/.idea/runConfigurations/Test_Opinions.xml +++ b/.idea/runConfigurations/Test_Opinions.xml @@ -23,7 +23,7 @@ - \ No newline at end of file diff --git a/src/main/kotlin/fr/dcproject/Application.kt b/src/main/kotlin/fr/dcproject/Application.kt index 10653ca..c13f748 100644 --- a/src/main/kotlin/fr/dcproject/Application.kt +++ b/src/main/kotlin/fr/dcproject/Application.kt @@ -7,20 +7,13 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.datatype.joda.JodaModule import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException -import com.rabbitmq.client.* -import com.rabbitmq.client.BuiltinExchangeType.DIRECT import fr.dcproject.Env.PROD import fr.dcproject.entity.* -import fr.dcproject.event.EntityEvent -import fr.dcproject.event.EventNotification -import fr.dcproject.event.Notification -import fr.dcproject.event.publisher.Publisher -import fr.dcproject.repository.Follow -import fr.dcproject.repository.FollowArticle +import fr.dcproject.event.EventSubscriber +import fr.dcproject.event.configEvent import fr.dcproject.routes.* import fr.dcproject.security.voter.* import fr.postgresjson.migration.Migrations -import fr.postgresjson.serializer.deserialize import io.ktor.application.Application import io.ktor.application.ApplicationCall import io.ktor.application.call @@ -42,16 +35,11 @@ import io.ktor.response.respond import io.ktor.routing.Routing import io.ktor.util.KtorExperimentalAPI import io.ktor.websocket.WebSockets -import io.lettuce.core.api.async.RedisAsyncCommands -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import org.eclipse.jetty.util.log.Slf4jLog import org.koin.core.qualifier.named import org.koin.ktor.ext.Koin import org.koin.ktor.ext.get import org.slf4j.event.Level -import java.io.IOException import java.time.Duration import java.util.* import java.util.concurrent.CompletionException @@ -188,76 +176,8 @@ fun Application.module(env: Env = PROD) { masking = false } - install(EventNotification) { - /* Config Rabbit */ - val exchangeName = config.exchangeNotificationName - get().newConnection().use { connection -> - connection.createChannel().use { channel -> - channel.queueDeclare("push", true, false, false, null) - channel.queueDeclare("email", true, false, false, null) - channel.exchangeDeclare(exchangeName, DIRECT, true) - channel.queueBind("push", exchangeName, "") - channel.queueBind("email", exchangeName, "") - } - } - - /* Declare publisher on event */ - val publisher = Publisher(get(), get()) - subscribe(EntityEvent.Type.UPDATE_ARTICLE.event) { - publisher.publish(it) - } - - /* Launch Consumer */ - launch { - val rabbitChannel = get().newConnection().createChannel() - val redis = get>() - - val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) { - @Throws(IOException::class) - override fun handleDelivery( - consumerTag: String, - envelope: Envelope, - properties: AMQP.BasicProperties, - body: ByteArray - ) = runBlocking { - val message = body.toString(Charsets.UTF_8) - val msg = message.deserialize() ?: error("Unable to unserialise event message from rabbit") - - let { - when (msg.type) { - Notification.Type.ARTICLE -> get() - } as Follow<*,*> - } - .findFollowsByTarget(msg.target) - .collect { follow -> - redis.zadd( - "notification:${follow.createdBy.id}", - msg.id, - message - ) - } - - rabbitChannel.basicAck(envelope.deliveryTag, false) - } - } - - val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) { - @Throws(IOException::class) - override fun handleDelivery( - consumerTag: String, - envelope: Envelope, - properties: AMQP.BasicProperties, - body: ByteArray - ) { - val message = body.toString(Charsets.UTF_8) - println("The message is receive for send email: $message") - // TODO implement email sender - rabbitChannel.basicAck(envelope.deliveryTag, false) - } - } - rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket - rabbitChannel.basicConsume("email", false, consumerEmail) - } + install(EventSubscriber) { + configEvent(get(), get(), get(), get()) } install(Authentication) { diff --git a/src/main/kotlin/fr/dcproject/event/ConfigNotification.kt b/src/main/kotlin/fr/dcproject/event/ConfigNotification.kt new file mode 100644 index 0000000..e18da5c --- /dev/null +++ b/src/main/kotlin/fr/dcproject/event/ConfigNotification.kt @@ -0,0 +1,105 @@ +package fr.dcproject.event + +import com.fasterxml.jackson.databind.ObjectMapper +import com.rabbitmq.client.* +import com.rabbitmq.client.BuiltinExchangeType.DIRECT +import fr.dcproject.config +import fr.dcproject.entity.Article +import fr.dcproject.event.publisher.Publisher +import fr.dcproject.repository.Follow +import fr.postgresjson.serializer.deserialize +import io.ktor.application.EventDefinition +import io.lettuce.core.api.async.RedisAsyncCommands +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.io.errors.IOException +import fr.dcproject.repository.FollowArticle as FollowArticleRepository + + +class ArticleUpdate( + target: Article +) : EntityEvent(target, "article", "update") { + companion object { + val event = EventDefinition() + } +} + +fun EventSubscriber.Configuration.configEvent( + rabbitFactory: ConnectionFactory, + redis: RedisAsyncCommands, + followRepo: FollowArticleRepository, + serialiser: ObjectMapper +) { + /* Config Rabbit */ + val exchangeName = config.exchangeNotificationName + rabbitFactory.newConnection().use { connection -> + connection.createChannel().use { channel -> + channel.queueDeclare("push", true, false, false, null) + channel.queueDeclare("email", true, false, false, null) + channel.exchangeDeclare(exchangeName, DIRECT, true) + channel.queueBind("push", exchangeName, "") + channel.queueBind("email", exchangeName, "") + } + } + + /* Declare publisher on event */ + val publisher = Publisher(serialiser, rabbitFactory) + subscribe(ArticleUpdate.event) { + publisher.publish(it) + } + + /* Launch Consumer */ + GlobalScope.launch { + val rabbitChannel = rabbitFactory.newConnection().createChannel() + + val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) { + @Throws(IOException::class) + override fun handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: ByteArray + ) = runBlocking { + val message = body.toString(Charsets.UTF_8) + val msg = + message.deserialize() ?: error("Unable to unserialise event message from rabbit") + + let { + when (msg.type) { + "article" -> followRepo + else -> error("event '${msg.type}' not implemented") + } as Follow<*, *> + } + .findFollowsByTarget(msg.target) + .collect { follow -> + redis.zadd( + "notification:${follow.createdBy.id}", + msg.id, + message + ) + } + + rabbitChannel.basicAck(envelope.deliveryTag, false) + } + } + + val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) { + @Throws(IOException::class) + override fun handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: ByteArray + ) { + val message = body.toString(Charsets.UTF_8) + println("The message is receive for send email: $message") + // TODO implement email sender + rabbitChannel.basicAck(envelope.deliveryTag, false) + } + } + rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket + rabbitChannel.basicConsume("email", false, consumerEmail) + } +} \ No newline at end of file diff --git a/src/main/kotlin/fr/dcproject/event/EventNotification.kt b/src/main/kotlin/fr/dcproject/event/EventSubscriber.kt similarity index 56% rename from src/main/kotlin/fr/dcproject/event/EventNotification.kt rename to src/main/kotlin/fr/dcproject/event/EventSubscriber.kt index ddac324..f26a8ba 100644 --- a/src/main/kotlin/fr/dcproject/event/EventNotification.kt +++ b/src/main/kotlin/fr/dcproject/event/EventSubscriber.kt @@ -1,7 +1,5 @@ package fr.dcproject.event -import com.fasterxml.jackson.annotation.JsonValue -import fr.dcproject.entity.Article import fr.postgresjson.entity.Serializable import fr.postgresjson.entity.immutable.UuidEntity import io.ktor.application.* @@ -11,59 +9,46 @@ import kotlinx.coroutines.DisposableHandle import org.joda.time.DateTime import kotlin.random.Random.Default.nextInt -sealed class NotificationS - -open class Notification( - val type: Type, +abstract class Event( + val type: String, val createdAt: DateTime = DateTime.now() -) : NotificationS(), Serializable { +) : Serializable { val id: Double = randId(createdAt.millis) - enum class Type(@JsonValue val type: String) { - ARTICLE("article"); - } private fun randId(time: Long): Double { return (time.toString() + nextInt(1000, 9999).toString()).toDouble() } } -open class EntityEvent( +abstract class EntityEvent( val target: UuidEntity, - type: Notification.Type, + type: String, val action: String -) : Notification(type) { - enum class Type(val event: EventDefinition) { - UPDATE_ARTICLE(EventDefinition()) - } -} - -class ArticleUpdate( - target: Article -) : EntityEvent(target, Notification.Type.ARTICLE, "update") +) : Event(type) /** * Installation Class */ -class EventNotification { +class EventSubscriber { class Configuration(private val monitor: ApplicationEvents) { private val subscribers = mutableListOf() - fun subscribe(definition: EventDefinition, handler: EventHandler): DisposableHandle { + fun subscribe(definition: EventDefinition, handler: EventHandler): DisposableHandle { return monitor.subscribe(definition, handler).also { subscribers.add(it) } } } - companion object Feature : ApplicationFeature { - override val key = AttributeKey("EventNotification") + companion object Feature : ApplicationFeature { + override val key = AttributeKey("EventSubscriber") @KtorExperimentalAPI override fun install( pipeline: Application, configure: Configuration.() -> Unit - ): EventNotification { + ): EventSubscriber { Configuration(pipeline.environment.monitor).apply(configure) - return EventNotification() + return EventSubscriber() } } } diff --git a/src/main/kotlin/fr/dcproject/routes/Article.kt b/src/main/kotlin/fr/dcproject/routes/Article.kt index b041e56..b76ffa9 100644 --- a/src/main/kotlin/fr/dcproject/routes/Article.kt +++ b/src/main/kotlin/fr/dcproject/routes/Article.kt @@ -2,7 +2,6 @@ package fr.dcproject.routes import fr.dcproject.citizen import fr.dcproject.event.ArticleUpdate -import fr.dcproject.event.EntityEvent import fr.dcproject.repository.Article.Filter import fr.dcproject.security.voter.ArticleVoter.Action.CREATE import fr.dcproject.security.voter.ArticleVoter.Action.VIEW @@ -86,7 +85,7 @@ fun Route.article(repo: ArticleRepository) { assertCan(CREATE, article) repo.upsert(article) - application.environment.monitor.raise(EntityEvent.Type.UPDATE_ARTICLE.event, ArticleUpdate(article)) + application.environment.monitor.raise(ArticleUpdate.event, ArticleUpdate(article)) call.respond(article) } diff --git a/src/main/kotlin/fr/dcproject/routes/Notification.kt b/src/main/kotlin/fr/dcproject/routes/Notification.kt index 88e4eff..c79bedf 100644 --- a/src/main/kotlin/fr/dcproject/routes/Notification.kt +++ b/src/main/kotlin/fr/dcproject/routes/Notification.kt @@ -1,7 +1,7 @@ package fr.dcproject.routes import fr.dcproject.citizen -import fr.dcproject.event.Notification +import fr.dcproject.event.Event import fr.postgresjson.serializer.deserialize import io.ktor.client.HttpClient import io.ktor.http.cio.websocket.Frame @@ -26,7 +26,7 @@ fun Route.notificationArticle(redis: RedisAsyncCommands, client: launch { incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect { - val notificationMessage = it.readText().deserialize() ?: error("unable to deserialize message") + val notificationMessage = it.readText().deserialize() ?: error("unable to deserialize message") redis.zremrangebyscore( "notification:$citizenId",