diff --git a/build.gradle.kts b/build.gradle.kts index 5ca1696..4d0c8b3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -151,6 +151,7 @@ dependencies { testImplementation("io.ktor:ktor-client-mock:$ktor_version") testImplementation("io.ktor:ktor-client-mock-jvm:$ktor_version") testImplementation("org.koin:koin-test:$koinVersion") + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutinesVersion") testImplementation("io.mockk:mockk:1.10.5") testImplementation("org.junit.jupiter:junit-jupiter:5.7.0") testImplementation("org.amshove.kluent:kluent:1.61") diff --git a/src/main/kotlin/application/Application.kt b/src/main/kotlin/application/Application.kt index f479d39..2f34a84 100644 --- a/src/main/kotlin/application/Application.kt +++ b/src/main/kotlin/application/Application.kt @@ -68,7 +68,6 @@ import io.ktor.util.KtorExperimentalAPI import io.ktor.websocket.WebSockets import kotlinx.coroutines.ExperimentalCoroutinesApi import org.eclipse.jetty.util.log.Slf4jLog -import org.koin.core.qualifier.named import org.koin.ktor.ext.Koin import org.koin.ktor.ext.get import org.slf4j.event.Level @@ -162,12 +161,11 @@ fun Application.module(env: Env = PROD) { installCommentConstitutionRoutes() authenticate(optional = true) { - /* TODO */ definition() } authenticate("url") { - notificationArticle(get(), get(named("ws"))) + notificationArticle(get()) } } diff --git a/src/main/kotlin/application/KoinModule.kt b/src/main/kotlin/application/KoinModule.kt index 4869330..32d8a3b 100644 --- a/src/main/kotlin/application/KoinModule.kt +++ b/src/main/kotlin/application/KoinModule.kt @@ -39,8 +39,10 @@ val KoinModule = module { single { Migrations(get(), Configuration.Sql.migrationFiles, Configuration.Sql.functionFiles) } // Redis client - single> { - RedisClient.create(Configuration.redis).connect()?.async() ?: error("Unable to connect to redis") + single { + RedisClient.create(Configuration.redis).apply { + connect().sync().configSet("notify-keyspace-events", "KEA") + } } // RabbitMQ diff --git a/src/main/kotlin/notification/EventNotification.kt b/src/main/kotlin/notification/EventNotification.kt index b8106c6..b226e81 100644 --- a/src/main/kotlin/notification/EventNotification.kt +++ b/src/main/kotlin/notification/EventNotification.kt @@ -23,6 +23,7 @@ import fr.dcproject.notification.publisher.Publisher import fr.dcproject.messages.NotificationEmailSender import fr.postgresjson.entity.UuidEntity import io.ktor.utils.io.errors.IOException +import io.lettuce.core.RedisClient import io.lettuce.core.api.async.RedisAsyncCommands import kotlinx.coroutines.flow.collect import kotlinx.coroutines.runBlocking @@ -41,9 +42,9 @@ open class Notification( return (time.toString() + Random.nextInt(1000, 9999).toString()).toDouble() } - fun serialize(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification") + override fun toString(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification") - fun toByteArray() = serialize().toByteArray() + fun toByteArray() = toString().toByteArray() companion object { val mapper = jacksonObjectMapper().apply { @@ -55,7 +56,7 @@ open class Notification( configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) } - inline fun deserialize(raw: String): T = mapper.readValue(raw) + inline fun fromString(raw: String): T = mapper.readValue(raw) } } @@ -71,12 +72,13 @@ class ArticleUpdateNotification( class EventNotification( private val rabbitFactory: ConnectionFactory, - private val redis: RedisAsyncCommands, + private val redisClient: RedisClient, private val followConstitutionRepo: FollowConstitutionRepository, private val followArticleRepo: FollowArticleRepository, private val notificationEmailSender: NotificationEmailSender, private val exchangeName: String, ) { + val redis: RedisAsyncCommands = redisClient.connect()?.async() ?: error("Unable to connect to redis") private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName) fun config() { @@ -102,7 +104,7 @@ class EventNotification( properties: AMQP.BasicProperties, body: ByteArray ) = runBlocking { - decodeMessage(body) { + followersFromMessage(body) { redis.zadd( "notification:${it.follow.createdBy.id}", it.event.id, @@ -123,7 +125,7 @@ class EventNotification( body: ByteArray ) { runBlocking { - decodeMessage(body) { + followersFromMessage(body) { notificationEmailSender.sendEmail(it.follow) logger.debug("EmailSend to: ${it.follow.createdBy.id}") } @@ -136,9 +138,9 @@ class EventNotification( rabbitChannel.basicConsume("email", false, consumerEmail) } - private suspend fun decodeMessage(body: ByteArray, action: suspend (DecodedMessage) -> Unit) { + private suspend fun followersFromMessage(body: ByteArray, action: suspend (DecodedMessage) -> Unit) { val rawMessage: String = body.toString(Charsets.UTF_8) - val notification: EntityNotification = Notification.deserialize(rawMessage) ?: error("Unable to deserialize notification message from rabbit") + val notification: EntityNotification = Notification.fromString(rawMessage) ?: error("Unable to deserialize notification message from rabbit") val follows = when (notification.type) { "article" -> followArticleRepo.findFollowsByTarget(notification.target) "constitution" -> followConstitutionRepo.findFollowsByTarget(notification.target) diff --git a/src/main/kotlin/notification/NotificationsPush.kt b/src/main/kotlin/notification/NotificationsPush.kt new file mode 100644 index 0000000..be0a270 --- /dev/null +++ b/src/main/kotlin/notification/NotificationsPush.kt @@ -0,0 +1,95 @@ +package notification + +import com.fasterxml.jackson.core.JsonProcessingException +import fr.dcproject.component.citizen.CitizenI +import fr.dcproject.notification.Notification +import io.ktor.routing.Route +import io.lettuce.core.Limit +import io.lettuce.core.Range +import io.lettuce.core.Range.Boundary +import io.lettuce.core.RedisClient +import io.lettuce.core.api.async.RedisAsyncCommands +import io.lettuce.core.pubsub.RedisPubSubAdapter +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory + +class NotificationsPush ( + val redisClient: RedisClient, + citizen: CitizenI, + incoming: Flow, + onRecieve: suspend (Notification) -> Unit, +) +{ + val redis: RedisAsyncCommands = redisClient.connect()?.async() ?: error("Unable to connect to redis") + val key = "notification:${citizen.id}" + private var score: Double = 0.0 + + init { + /* Mark as read all incoming notifications */ + GlobalScope.launch { + incoming.collect { + markAsRead(it) + } + } + + /* Get old notification and sent it to websocket */ + runBlocking { + getNotifications().collect { onRecieve(it) } + } + + /* Lisen redis event, and sent the new notification into websocket */ + redisClient.connectPubSub()?.run { + addListener(object : RedisPubSubAdapter() { + /* On new key publish */ + override fun message(pattern: String?, channel: String?, message: String?) { + runBlocking { + getNotifications().collect { + onRecieve(it) + } + } + } + }) + + /* Register to the events */ + async()?.psubscribe("__key*__:$key") ?: error("Unable to connect to redis") + } ?: error("PubSub Fail") + } + + /* Return flow with all new notifications */ + private fun getNotifications() = flow { + redis + .zrangebyscoreWithScores( + key, + Range.from( + Boundary.excluding(score), + Boundary.including(Double.POSITIVE_INFINITY) + ), + Limit.from(100) + ) + .get().forEach { + emit(Notification.fromString(it.value)) + if (it.score > score) score = it.score + } + } + + private suspend fun markAsRead(notificationMessage: Notification) = coroutineScope { + try { + redis.zremrangebyscore( + key, + Range.from( + Boundary.including(notificationMessage.id), + Boundary.including(notificationMessage.id) + ) + ) + } catch (e: JsonProcessingException) { + LoggerFactory.getLogger(Route::class.qualifiedName) + .error("Unable to deserialize notification") + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/notification/publisher/Publisher.kt b/src/main/kotlin/notification/publisher/Publisher.kt index d4df7af..ec528b6 100644 --- a/src/main/kotlin/notification/publisher/Publisher.kt +++ b/src/main/kotlin/notification/publisher/Publisher.kt @@ -17,7 +17,7 @@ class Publisher( async { factory.newConnection().use { connection -> connection.createChannel().use { channel -> - channel.basicPublish(exchangeName, "", null, it.serialize().toByteArray()) + channel.basicPublish(exchangeName, "", null, it.toString().toByteArray()) logger.debug("Publish message ${it.target.id}") } } diff --git a/src/main/kotlin/routes/Notification.kt b/src/main/kotlin/routes/Notification.kt index b968a58..4df3ba7 100644 --- a/src/main/kotlin/routes/Notification.kt +++ b/src/main/kotlin/routes/Notification.kt @@ -3,59 +3,38 @@ package fr.dcproject.routes import fr.dcproject.component.auth.citizen import fr.dcproject.notification.Notification import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.Frame.Text import io.ktor.http.cio.websocket.readText import io.ktor.locations.KtorExperimentalLocationsAPI import io.ktor.routing.Route import io.ktor.websocket.webSocket -import io.lettuce.core.Range -import io.lettuce.core.api.async.RedisAsyncCommands +import io.lettuce.core.RedisClient import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.launch -import org.slf4j.LoggerFactory +import notification.NotificationsPush +/** + * Consume Websocket, then remove notification in redis. + * + * Sent all notification to websocket. + */ @ExperimentalCoroutinesApi @KtorExperimentalLocationsAPI -fun Route.notificationArticle(redis: RedisAsyncCommands, client: HttpClient) { +fun Route.notificationArticle(redisClient: RedisClient) { webSocket("/notifications") { - val citizenId = call.citizen.id + /* Convert channel of string from websocket, to a flow of Notification object */ + val incomingFlow: Flow = incoming.consumeAsFlow() + .mapNotNull { it as? Frame.Text } + .map { it.readText() } + .map { Notification.fromString(it) } - launch { - incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect { - try { - val notificationMessage: Notification = Notification.deserialize(it.readText()) - redis.zremrangebyscore( - "notification:$citizenId", - Range.from( - Range.Boundary.including(notificationMessage.id), - Range.Boundary.including(notificationMessage.id) - ) - ) - } catch (e: Throwable) { - LoggerFactory.getLogger(Route::class.qualifiedName) - .error("Unable to deserialize notification") - } - } - } - - var score = 0.0 - while (!outgoing.isClosedForSend) { - val result = redis.zrangebyscoreWithScores( - "notification:$citizenId", - Range.from( - Range.Boundary.excluding(score), - Range.Boundary.including(Double.POSITIVE_INFINITY) - ) - ) - - result.get().forEach { - outgoing.send(Frame.Text(it.value)) - if (it.score > score) score = it.score - } - delay(1000) + /* Read user notifications in redis then sent it to the websocket */ + NotificationsPush(redisClient, call.citizen, incomingFlow) { + outgoing.send(Text(it.toString())) } } } + diff --git a/src/test/kotlin/functional/EventNotificationTest.kt b/src/test/kotlin/functional/EventNotificationTest.kt index 23a4683..60177a3 100644 --- a/src/test/kotlin/functional/EventNotificationTest.kt +++ b/src/test/kotlin/functional/EventNotificationTest.kt @@ -31,7 +31,7 @@ import org.koin.test.AutoCloseKoinTest import org.koin.test.KoinTest @TestInstance(TestInstance.Lifecycle.PER_CLASS) -class EventNotificationTest : KoinTest, AutoCloseKoinTest() { +class EventNotificationTest { @InternalCoroutinesApi @KtorExperimentalLocationsAPI @KtorExperimentalAPI @@ -43,9 +43,12 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() { val emailSender = mockk() { every { sendEmail(any()) } returns Unit } - val redisClient = spyk> { - RedisClient.create(Configuration.redis).connect().async() ?: error("Unable to connect to redis") - } + + /* Init Spy on redis client */ + val redisClient = spyk(RedisClient.create(Configuration.redis)) + val asyncCommand = spyk(redisClient.connect().async()) + every { redisClient.connect().async() } returns asyncCommand + val rabbitFactory: ConnectionFactory = spyk { ConnectionFactory().apply { setUri(Configuration.rabbitmq) } } @@ -67,7 +70,7 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() { /* Config consumer */ EventNotification( rabbitFactory = rabbitFactory, - redis = redisClient, + redisClient = redisClient, followArticleRepo = followArticleRepo, followConstitutionRepo = mockk(), notificationEmailSender = emailSender, @@ -90,12 +93,9 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() { ) ).await() - /* Wait to receive message */ - delay(1000) - /* Check if notifications sent */ - verify { followArticleRepo.findFollowsByTarget(any()) } - verify { emailSender.sendEmail(any()) } - verify { redisClient.zadd(any(), any(), any()) } + verify(timeout = 1000) { followArticleRepo.findFollowsByTarget(any()) } + verify(timeout = 1000) { emailSender.sendEmail(any()) } + verify(timeout = 1000) { asyncCommand.zadd(any(), any(), any()) } } } diff --git a/src/test/kotlin/functional/NotificationsPushTest.kt b/src/test/kotlin/functional/NotificationsPushTest.kt new file mode 100644 index 0000000..2b7ee6a --- /dev/null +++ b/src/test/kotlin/functional/NotificationsPushTest.kt @@ -0,0 +1,89 @@ +package functional + +import fr.dcproject.application.Configuration +import fr.dcproject.component.article.ArticleForView +import fr.dcproject.component.citizen.CitizenRef +import fr.dcproject.notification.ArticleUpdateNotification +import fr.dcproject.notification.Notification +import io.lettuce.core.Limit +import io.lettuce.core.RedisClient +import io.mockk.every +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.runBlocking +import notification.NotificationsPush +import org.amshove.kluent.`should be equal to` +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.Test +import org.koin.test.AutoCloseKoinTest +import org.koin.test.KoinTest + +internal class NotificationsPushTest { + @Test + @Tag("functional") + fun `Notification from redis is well catch and return`() = runBlocking { + /* Redis client for test */ + val redisClientTest = RedisClient.create(Configuration.redis) + + /* Init Spy on redis client */ + val redisClient = spyk(RedisClient.create(Configuration.redis)) + val asyncCommand = spyk(redisClient.connect().async()) + every { redisClient.connect().async() } returns asyncCommand + + /* Citizen of notification */ + val citizen = CitizenRef() + /* Article is the target of the notification */ + val article = ArticleForView( + content = "content..", + createdBy = citizen, + description = "desc", + title = "Super Title", + ) + /* Init two notification, one called before subscription, and the other after */ + val notifBeforeSubscribe = ArticleUpdateNotification(article) + val notifAfterSubscribe = ArticleUpdateNotification(article) + + /* init event for emulate incomint message from websocket */ + val event = MutableSharedFlow() + val incomingFlow = event.asSharedFlow() + + spyk(object { var counter = 0}).run { /* Counter for count the callback of notification */ + /* Sent notification */ + redisClientTest.connect().sync().run { + zadd( + "notification:${citizen.id}", + notifBeforeSubscribe.id, + notifBeforeSubscribe.toString() + ) + } + + /* Init NotificationPush system, and set assertion in callback */ + NotificationsPush(redisClient, citizen, incomingFlow) { + counter++ + if (counter == 1) it.id `should be equal to` notifBeforeSubscribe.id + else it.id `should be equal to` notifAfterSubscribe.id + } + + /* Sent the notification */ + redisClientTest.connect().sync().run { + zadd( + "notification:${citizen.id}", + notifAfterSubscribe.id, + notifAfterSubscribe.toString() + ) + } + + /* Verify if the callback is called 2 times */ + verify(exactly = 4, timeout = 200) { counter } + assertEquals(2, counter, "The notification must be call 2 times") + + /* Emit an event to delete notification */ + event.emit(notifAfterSubscribe) + /* Verify the "mark as read" is called */ + verify(timeout = 300) { asyncCommand.zremrangebyscore(any(), any()) } + } + } +} \ No newline at end of file