Refactor Notification System

Add Tests for notification system
This commit is contained in:
2021-02-03 01:21:13 +01:00
parent b54a40cef4
commit d479cf6bca
9 changed files with 232 additions and 66 deletions

View File

@@ -68,7 +68,6 @@ import io.ktor.util.KtorExperimentalAPI
import io.ktor.websocket.WebSockets
import kotlinx.coroutines.ExperimentalCoroutinesApi
import org.eclipse.jetty.util.log.Slf4jLog
import org.koin.core.qualifier.named
import org.koin.ktor.ext.Koin
import org.koin.ktor.ext.get
import org.slf4j.event.Level
@@ -162,12 +161,11 @@ fun Application.module(env: Env = PROD) {
installCommentConstitutionRoutes()
authenticate(optional = true) {
/* TODO */
definition()
}
authenticate("url") {
notificationArticle(get(), get(named("ws")))
notificationArticle(get())
}
}

View File

@@ -39,8 +39,10 @@ val KoinModule = module {
single { Migrations(get(), Configuration.Sql.migrationFiles, Configuration.Sql.functionFiles) }
// Redis client
single<RedisAsyncCommands<String, String>> {
RedisClient.create(Configuration.redis).connect()?.async() ?: error("Unable to connect to redis")
single<RedisClient> {
RedisClient.create(Configuration.redis).apply {
connect().sync().configSet("notify-keyspace-events", "KEA")
}
}
// RabbitMQ

View File

@@ -23,6 +23,7 @@ import fr.dcproject.notification.publisher.Publisher
import fr.dcproject.messages.NotificationEmailSender
import fr.postgresjson.entity.UuidEntity
import io.ktor.utils.io.errors.IOException
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
@@ -41,9 +42,9 @@ open class Notification(
return (time.toString() + Random.nextInt(1000, 9999).toString()).toDouble()
}
fun serialize(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification")
override fun toString(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification")
fun toByteArray() = serialize().toByteArray()
fun toByteArray() = toString().toByteArray()
companion object {
val mapper = jacksonObjectMapper().apply {
@@ -55,7 +56,7 @@ open class Notification(
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}
inline fun <reified T : Notification> deserialize(raw: String): T = mapper.readValue(raw)
inline fun <reified T : Notification> fromString(raw: String): T = mapper.readValue(raw)
}
}
@@ -71,12 +72,13 @@ class ArticleUpdateNotification(
class EventNotification(
private val rabbitFactory: ConnectionFactory,
private val redis: RedisAsyncCommands<String, String>,
private val redisClient: RedisClient,
private val followConstitutionRepo: FollowConstitutionRepository,
private val followArticleRepo: FollowArticleRepository,
private val notificationEmailSender: NotificationEmailSender,
private val exchangeName: String,
) {
val redis: RedisAsyncCommands<String, String> = redisClient.connect()?.async() ?: error("Unable to connect to redis")
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName)
fun config() {
@@ -102,7 +104,7 @@ class EventNotification(
properties: AMQP.BasicProperties,
body: ByteArray
) = runBlocking {
decodeMessage(body) {
followersFromMessage(body) {
redis.zadd(
"notification:${it.follow.createdBy.id}",
it.event.id,
@@ -123,7 +125,7 @@ class EventNotification(
body: ByteArray
) {
runBlocking {
decodeMessage(body) {
followersFromMessage(body) {
notificationEmailSender.sendEmail(it.follow)
logger.debug("EmailSend to: ${it.follow.createdBy.id}")
}
@@ -136,9 +138,9 @@ class EventNotification(
rabbitChannel.basicConsume("email", false, consumerEmail)
}
private suspend fun decodeMessage(body: ByteArray, action: suspend (DecodedMessage) -> Unit) {
private suspend fun followersFromMessage(body: ByteArray, action: suspend (DecodedMessage) -> Unit) {
val rawMessage: String = body.toString(Charsets.UTF_8)
val notification: EntityNotification = Notification.deserialize<EntityNotification>(rawMessage) ?: error("Unable to deserialize notification message from rabbit")
val notification: EntityNotification = Notification.fromString<EntityNotification>(rawMessage) ?: error("Unable to deserialize notification message from rabbit")
val follows = when (notification.type) {
"article" -> followArticleRepo.findFollowsByTarget(notification.target)
"constitution" -> followConstitutionRepo.findFollowsByTarget(notification.target)

View File

@@ -0,0 +1,95 @@
package notification
import com.fasterxml.jackson.core.JsonProcessingException
import fr.dcproject.component.citizen.CitizenI
import fr.dcproject.notification.Notification
import io.ktor.routing.Route
import io.lettuce.core.Limit
import io.lettuce.core.Range
import io.lettuce.core.Range.Boundary
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.pubsub.RedisPubSubAdapter
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
class NotificationsPush (
val redisClient: RedisClient,
citizen: CitizenI,
incoming: Flow<Notification>,
onRecieve: suspend (Notification) -> Unit,
)
{
val redis: RedisAsyncCommands<String, String> = redisClient.connect()?.async() ?: error("Unable to connect to redis")
val key = "notification:${citizen.id}"
private var score: Double = 0.0
init {
/* Mark as read all incoming notifications */
GlobalScope.launch {
incoming.collect {
markAsRead(it)
}
}
/* Get old notification and sent it to websocket */
runBlocking {
getNotifications().collect { onRecieve(it) }
}
/* Lisen redis event, and sent the new notification into websocket */
redisClient.connectPubSub()?.run {
addListener(object : RedisPubSubAdapter<String, String>() {
/* On new key publish */
override fun message(pattern: String?, channel: String?, message: String?) {
runBlocking {
getNotifications().collect {
onRecieve(it)
}
}
}
})
/* Register to the events */
async()?.psubscribe("__key*__:$key") ?: error("Unable to connect to redis")
} ?: error("PubSub Fail")
}
/* Return flow with all new notifications */
private fun getNotifications() = flow<Notification> {
redis
.zrangebyscoreWithScores(
key,
Range.from(
Boundary.excluding(score),
Boundary.including(Double.POSITIVE_INFINITY)
),
Limit.from(100)
)
.get().forEach {
emit(Notification.fromString(it.value))
if (it.score > score) score = it.score
}
}
private suspend fun markAsRead(notificationMessage: Notification) = coroutineScope {
try {
redis.zremrangebyscore(
key,
Range.from(
Boundary.including(notificationMessage.id),
Boundary.including(notificationMessage.id)
)
)
} catch (e: JsonProcessingException) {
LoggerFactory.getLogger(Route::class.qualifiedName)
.error("Unable to deserialize notification")
}
}
}

View File

@@ -17,7 +17,7 @@ class Publisher(
async {
factory.newConnection().use { connection ->
connection.createChannel().use { channel ->
channel.basicPublish(exchangeName, "", null, it.serialize().toByteArray())
channel.basicPublish(exchangeName, "", null, it.toString().toByteArray())
logger.debug("Publish message ${it.target.id}")
}
}

View File

@@ -3,59 +3,38 @@ package fr.dcproject.routes
import fr.dcproject.component.auth.citizen
import fr.dcproject.notification.Notification
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.Frame.Text
import io.ktor.http.cio.websocket.readText
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.routing.Route
import io.ktor.websocket.webSocket
import io.lettuce.core.Range
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.RedisClient
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import notification.NotificationsPush
/**
* Consume Websocket, then remove notification in redis.
*
* Sent all notification to websocket.
*/
@ExperimentalCoroutinesApi
@KtorExperimentalLocationsAPI
fun Route.notificationArticle(redis: RedisAsyncCommands<String, String>, client: HttpClient) {
fun Route.notificationArticle(redisClient: RedisClient) {
webSocket("/notifications") {
val citizenId = call.citizen.id
/* Convert channel of string from websocket, to a flow of Notification object */
val incomingFlow: Flow<Notification> = incoming.consumeAsFlow()
.mapNotNull<Frame, Text> { it as? Frame.Text }
.map { it.readText() }
.map { Notification.fromString(it) }
launch {
incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect {
try {
val notificationMessage: Notification = Notification.deserialize(it.readText())
redis.zremrangebyscore(
"notification:$citizenId",
Range.from(
Range.Boundary.including(notificationMessage.id),
Range.Boundary.including(notificationMessage.id)
)
)
} catch (e: Throwable) {
LoggerFactory.getLogger(Route::class.qualifiedName)
.error("Unable to deserialize notification")
}
}
}
var score = 0.0
while (!outgoing.isClosedForSend) {
val result = redis.zrangebyscoreWithScores(
"notification:$citizenId",
Range.from(
Range.Boundary.excluding(score),
Range.Boundary.including(Double.POSITIVE_INFINITY)
)
)
result.get().forEach {
outgoing.send(Frame.Text(it.value))
if (it.score > score) score = it.score
}
delay(1000)
/* Read user notifications in redis then sent it to the websocket */
NotificationsPush(redisClient, call.citizen, incomingFlow) {
outgoing.send(Text(it.toString()))
}
}
}

View File

@@ -31,7 +31,7 @@ import org.koin.test.AutoCloseKoinTest
import org.koin.test.KoinTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EventNotificationTest : KoinTest, AutoCloseKoinTest() {
class EventNotificationTest {
@InternalCoroutinesApi
@KtorExperimentalLocationsAPI
@KtorExperimentalAPI
@@ -43,9 +43,12 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() {
val emailSender = mockk<NotificationEmailSender>() {
every { sendEmail(any()) } returns Unit
}
val redisClient = spyk<RedisAsyncCommands<String, String>> {
RedisClient.create(Configuration.redis).connect().async() ?: error("Unable to connect to redis")
}
/* Init Spy on redis client */
val redisClient = spyk<RedisClient>(RedisClient.create(Configuration.redis))
val asyncCommand = spyk(redisClient.connect().async())
every { redisClient.connect().async() } returns asyncCommand
val rabbitFactory: ConnectionFactory = spyk {
ConnectionFactory().apply { setUri(Configuration.rabbitmq) }
}
@@ -67,7 +70,7 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() {
/* Config consumer */
EventNotification(
rabbitFactory = rabbitFactory,
redis = redisClient,
redisClient = redisClient,
followArticleRepo = followArticleRepo,
followConstitutionRepo = mockk(),
notificationEmailSender = emailSender,
@@ -90,12 +93,9 @@ class EventNotificationTest : KoinTest, AutoCloseKoinTest() {
)
).await()
/* Wait to receive message */
delay(1000)
/* Check if notifications sent */
verify { followArticleRepo.findFollowsByTarget(any()) }
verify { emailSender.sendEmail(any()) }
verify { redisClient.zadd(any<String>(), any<Double>(), any<String>()) }
verify(timeout = 1000) { followArticleRepo.findFollowsByTarget(any()) }
verify(timeout = 1000) { emailSender.sendEmail(any()) }
verify(timeout = 1000) { asyncCommand.zadd(any<String>(), any<Double>(), any<String>()) }
}
}

View File

@@ -0,0 +1,89 @@
package functional
import fr.dcproject.application.Configuration
import fr.dcproject.component.article.ArticleForView
import fr.dcproject.component.citizen.CitizenRef
import fr.dcproject.notification.ArticleUpdateNotification
import fr.dcproject.notification.Notification
import io.lettuce.core.Limit
import io.lettuce.core.RedisClient
import io.mockk.every
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.runBlocking
import notification.NotificationsPush
import org.amshove.kluent.`should be equal to`
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Test
import org.koin.test.AutoCloseKoinTest
import org.koin.test.KoinTest
internal class NotificationsPushTest {
@Test
@Tag("functional")
fun `Notification from redis is well catch and return`() = runBlocking {
/* Redis client for test */
val redisClientTest = RedisClient.create(Configuration.redis)
/* Init Spy on redis client */
val redisClient = spyk<RedisClient>(RedisClient.create(Configuration.redis))
val asyncCommand = spyk(redisClient.connect().async())
every { redisClient.connect().async() } returns asyncCommand
/* Citizen of notification */
val citizen = CitizenRef()
/* Article is the target of the notification */
val article = ArticleForView(
content = "content..",
createdBy = citizen,
description = "desc",
title = "Super Title",
)
/* Init two notification, one called before subscription, and the other after */
val notifBeforeSubscribe = ArticleUpdateNotification(article)
val notifAfterSubscribe = ArticleUpdateNotification(article)
/* init event for emulate incomint message from websocket */
val event = MutableSharedFlow<Notification>()
val incomingFlow = event.asSharedFlow()
spyk(object { var counter = 0}).run { /* Counter for count the callback of notification */
/* Sent notification */
redisClientTest.connect().sync().run {
zadd(
"notification:${citizen.id}",
notifBeforeSubscribe.id,
notifBeforeSubscribe.toString()
)
}
/* Init NotificationPush system, and set assertion in callback */
NotificationsPush(redisClient, citizen, incomingFlow) {
counter++
if (counter == 1) it.id `should be equal to` notifBeforeSubscribe.id
else it.id `should be equal to` notifAfterSubscribe.id
}
/* Sent the notification */
redisClientTest.connect().sync().run {
zadd(
"notification:${citizen.id}",
notifAfterSubscribe.id,
notifAfterSubscribe.toString()
)
}
/* Verify if the callback is called 2 times */
verify(exactly = 4, timeout = 200) { counter }
assertEquals(2, counter, "The notification must be call 2 times")
/* Emit an event to delete notification */
event.emit(notifAfterSubscribe)
/* Verify the "mark as read" is called */
verify(timeout = 300) { asyncCommand.zremrangebyscore(any(), any()) }
}
}
}