diff --git a/src/main/kotlin/fr/dcproject/Application.kt b/src/main/kotlin/fr/dcproject/Application.kt index cd00a73..10653ca 100644 --- a/src/main/kotlin/fr/dcproject/Application.kt +++ b/src/main/kotlin/fr/dcproject/Application.kt @@ -13,7 +13,9 @@ import fr.dcproject.Env.PROD import fr.dcproject.entity.* import fr.dcproject.event.EntityEvent import fr.dcproject.event.EventNotification +import fr.dcproject.event.Notification import fr.dcproject.event.publisher.Publisher +import fr.dcproject.repository.Follow import fr.dcproject.repository.FollowArticle import fr.dcproject.routes.* import fr.dcproject.security.voter.* @@ -41,12 +43,10 @@ import io.ktor.routing.Routing import io.ktor.util.KtorExperimentalAPI import io.ktor.websocket.WebSockets import io.lettuce.core.api.async.RedisAsyncCommands -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.eclipse.jetty.util.log.Slf4jLog -import org.joda.time.DateTime import org.koin.core.qualifier.named import org.koin.ktor.ext.Koin import org.koin.ktor.ext.get @@ -193,10 +193,10 @@ fun Application.module(env: Env = PROD) { val exchangeName = config.exchangeNotificationName get().newConnection().use { connection -> connection.createChannel().use { channel -> - channel.queueDeclare("sse", true, false, false, null) + channel.queueDeclare("push", true, false, false, null) channel.queueDeclare("email", true, false, false, null) channel.exchangeDeclare(exchangeName, DIRECT, true) - channel.queueBind("sse", exchangeName, "") + channel.queueBind("push", exchangeName, "") channel.queueBind("email", exchangeName, "") } } @@ -208,43 +208,40 @@ fun Application.module(env: Env = PROD) { } /* Launch Consumer */ - GlobalScope.launch { - val connection = get().newConnection() - val channel = connection.createChannel() + launch { + val rabbitChannel = get().newConnection().createChannel() val redis = get>() - val consumerSSE: Consumer = object : DefaultConsumer(channel) { + + val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) { @Throws(IOException::class) override fun handleDelivery( consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: ByteArray - ) { + ) = runBlocking { val message = body.toString(Charsets.UTF_8) - val event = - message.deserialize() ?: error("Unable to unserialise event message from rabbit") + val msg = message.deserialize() ?: error("Unable to unserialise event message from rabbit") - val followRepo = when (event.type) { - "article" -> get() - else -> error("type of event not supported") + let { + when (msg.type) { + Notification.Type.ARTICLE -> get() + } as Follow<*,*> + } + .findFollowsByTarget(msg.target) + .collect { follow -> + redis.zadd( + "notification:${follow.createdBy.id}", + msg.id, + message + ) } - runBlocking { - followRepo - .findFollowsByTarget(event.target) - .collect { follow -> - redis.zadd( - "notification:${follow.createdBy.id}", - DateTime.now().millis.toDouble(), - message - ) - } - } - channel.basicAck(envelope.deliveryTag, false) + rabbitChannel.basicAck(envelope.deliveryTag, false) } } - val consumerEmail: Consumer = object : DefaultConsumer(channel) { + val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) { @Throws(IOException::class) override fun handleDelivery( consumerTag: String, @@ -255,11 +252,11 @@ fun Application.module(env: Env = PROD) { val message = body.toString(Charsets.UTF_8) println("The message is receive for send email: $message") // TODO implement email sender - channel.basicAck(envelope.deliveryTag, false) + rabbitChannel.basicAck(envelope.deliveryTag, false) } } - channel.basicConsume("sse", false, consumerSSE) - channel.basicConsume("email", false, consumerEmail) + rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket + rabbitChannel.basicConsume("email", false, consumerEmail) } } diff --git a/src/main/kotlin/fr/dcproject/event/EventNotification.kt b/src/main/kotlin/fr/dcproject/event/EventNotification.kt index e4aea7c..ddac324 100644 --- a/src/main/kotlin/fr/dcproject/event/EventNotification.kt +++ b/src/main/kotlin/fr/dcproject/event/EventNotification.kt @@ -1,5 +1,6 @@ package fr.dcproject.event +import com.fasterxml.jackson.annotation.JsonValue import fr.dcproject.entity.Article import fr.postgresjson.entity.Serializable import fr.postgresjson.entity.immutable.UuidEntity @@ -8,15 +9,27 @@ import io.ktor.util.AttributeKey import io.ktor.util.KtorExperimentalAPI import kotlinx.coroutines.DisposableHandle import org.joda.time.DateTime +import kotlin.random.Random.Default.nextInt + +sealed class NotificationS open class Notification( - val type: String, + val type: Type, val createdAt: DateTime = DateTime.now() -) : Serializable +) : NotificationS(), Serializable { + val id: Double = randId(createdAt.millis) + enum class Type(@JsonValue val type: String) { + ARTICLE("article"); + } + + private fun randId(time: Long): Double { + return (time.toString() + nextInt(1000, 9999).toString()).toDouble() + } +} open class EntityEvent( val target: UuidEntity, - type: String, + type: Notification.Type, val action: String ) : Notification(type) { enum class Type(val event: EventDefinition) { @@ -26,7 +39,7 @@ open class EntityEvent( class ArticleUpdate( target: Article -) : EntityEvent(target, "article", "update") +) : EntityEvent(target, Notification.Type.ARTICLE, "update") /** * Installation Class diff --git a/src/main/kotlin/fr/dcproject/repository/Follow.kt b/src/main/kotlin/fr/dcproject/repository/Follow.kt index 0c2e01d..1476211 100644 --- a/src/main/kotlin/fr/dcproject/repository/Follow.kt +++ b/src/main/kotlin/fr/dcproject/repository/Follow.kt @@ -12,7 +12,7 @@ import fr.dcproject.entity.Article as ArticleEntity import fr.dcproject.entity.Constitution as ConstitutionEntity import fr.dcproject.entity.Follow as FollowEntity -open class Follow(override var requester: Requester) : RepositoryI { +sealed class Follow(override var requester: Requester) : RepositoryI { open fun findByCitizen( citizen: CitizenI, page: Int = 1, @@ -63,6 +63,26 @@ open class Follow(override var requester: Reque "citizen_id" to citizen.id, "target_id" to target.id ) + + fun findFollowsByTarget( + target: UuidEntity, + bulkSize: Int = 300 + ): Flow> = flow { + var nextPage = 1 + do { + val paginate = findFollowsByTarget(target, nextPage, bulkSize) + paginate.result.forEach { + emit(it) + } + nextPage = paginate.currentPage+1 + } while (!paginate.isLastPage()) + } + + abstract fun findFollowsByTarget( + target: UuidEntity, + page: Int = 1, + limit: Int = 300 + ): Paginated> } class FollowArticle(requester: Requester) : Follow(requester) { @@ -80,10 +100,10 @@ class FollowArticle(requester: Requester) : Follow(re } } - fun findFollowsByTarget( + override fun findFollowsByTarget( target: UuidEntity, - page: Int = 1, - limit: Int = 300 + page: Int, + limit: Int ): Paginated> { return requester .getFunction("find_follows_article_by_target") @@ -91,20 +111,6 @@ class FollowArticle(requester: Requester) : Follow(re "target_id" to target.id ) } - - fun findFollowsByTarget( - target: UuidEntity, - limit: Int = 300 - ): Flow> = flow { - var nextPage = 1 - do { - val paginate = findFollowsByTarget(target, nextPage, limit) - paginate.result.forEach { - emit(it) - } - nextPage = paginate.currentPage+1 - } while (!paginate.isLastPage()) - } } class FollowConstitution(requester: Requester) : Follow(requester) { @@ -121,4 +127,12 @@ class FollowConstitution(requester: Requester) : Follow> { + TODO("Not yet implemented") + } } diff --git a/src/main/kotlin/fr/dcproject/routes/Notification.kt b/src/main/kotlin/fr/dcproject/routes/Notification.kt index 8b9c710..88e4eff 100644 --- a/src/main/kotlin/fr/dcproject/routes/Notification.kt +++ b/src/main/kotlin/fr/dcproject/routes/Notification.kt @@ -1,6 +1,8 @@ package fr.dcproject.routes import fr.dcproject.citizen +import fr.dcproject.event.Notification +import fr.postgresjson.serializer.deserialize import io.ktor.client.HttpClient import io.ktor.http.cio.websocket.Frame import io.ktor.http.cio.websocket.readText @@ -21,32 +23,36 @@ import kotlinx.coroutines.launch fun Route.notificationArticle(redis: RedisAsyncCommands, client: HttpClient) { webSocket("/notifications") { val citizenId = call.citizen.id - val job = launch { - var score = 0.0 - while (!outgoing.isClosedForSend) { - val result = redis.zrangebyscoreWithScores( + + launch { + incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect { + val notificationMessage = it.readText().deserialize() ?: error("unable to deserialize message") + + redis.zremrangebyscore( "notification:$citizenId", Range.from( - Range.Boundary.excluding(score), - Range.Boundary.including(Double.POSITIVE_INFINITY) + Range.Boundary.including(notificationMessage.id), + Range.Boundary.including(notificationMessage.id) ) ) - - result.get().forEach { - outgoing.send(Frame.Text(it.value)) - if (it.score > score) score = it.score - } - delay(1000) - // TODO terminate coroutine after connection close ! } } - job.join() - // TODO mark notification as read - incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect { - val text = it.readText() - outgoing.send(Frame.Text(text)) - delay(100) + var score = 0.0 + while (!outgoing.isClosedForSend) { + val result = redis.zrangebyscoreWithScores( + "notification:$citizenId", + Range.from( + Range.Boundary.excluding(score), + Range.Boundary.including(Double.POSITIVE_INFINITY) + ) + ) + + result.get().forEach { + outgoing.send(Frame.Text(it.value)) + if (it.score > score) score = it.score + } + delay(1000) } } }