Refactoring of Notification system

This commit is contained in:
2021-04-18 01:57:04 +02:00
parent 1c33c026f0
commit fee5e5784b
23 changed files with 497 additions and 286 deletions

View File

@@ -9,6 +9,7 @@ import com.fasterxml.jackson.datatype.joda.JodaModule
import fr.dcproject.application.Env.PROD
import fr.dcproject.application.Env.TEST
import fr.dcproject.application.http.statusPagesInstallation
import fr.dcproject.common.utils.onApplicationStopped
import fr.dcproject.component.article.articleKoinModule
import fr.dcproject.component.article.routes.installArticleRoutes
import fr.dcproject.component.auth.authKoinModule
@@ -27,7 +28,8 @@ import fr.dcproject.component.follow.followKoinModule
import fr.dcproject.component.follow.routes.article.installFollowArticleRoutes
import fr.dcproject.component.follow.routes.citizen.installFollowCitizenRoutes
import fr.dcproject.component.follow.routes.constitution.installFollowConstitutionRoutes
import fr.dcproject.component.notification.NotificationConsumer
import fr.dcproject.component.notification.email.NotificationEmailConsumer
import fr.dcproject.component.notification.push.NotificationPushConsumer
import fr.dcproject.component.notification.routes.installNotificationsRoutes
import fr.dcproject.component.opinion.opinionKoinModule
import fr.dcproject.component.opinion.routes.installOpinionRoutes
@@ -38,7 +40,6 @@ import fr.dcproject.component.workgroup.routes.installWorkgroupRoutes
import fr.dcproject.component.workgroup.workgroupKoinModule
import fr.postgresjson.migration.Migrations
import io.ktor.application.Application
import io.ktor.application.ApplicationStopped
import io.ktor.application.install
import io.ktor.auth.Authentication
import io.ktor.client.HttpClient
@@ -118,11 +119,14 @@ fun Application.module(env: Env = PROD) {
masking = false
}
get<NotificationConsumer>().run {
get<NotificationEmailConsumer>().run {
start()
environment.monitor.subscribe(ApplicationStopped) {
close()
}
onApplicationStopped { close() }
}
get<NotificationPushConsumer>().run {
start()
onApplicationStopped { close() }
}
install(Authentication, jwtInstallation(get(), get()))

View File

@@ -10,10 +10,11 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.rabbitmq.client.ConnectionFactory
import fr.dcproject.common.email.Mailer
import fr.dcproject.component.auth.jwt.JwtConfig
import fr.dcproject.component.notification.NotificationConsumer
import fr.dcproject.component.notification.NotificationEmailSender
import fr.dcproject.component.notification.NotificationsPush
import fr.dcproject.component.notification.Publisher
import fr.dcproject.component.notification.NotificationPublisherAsync
import fr.dcproject.component.notification.email.NotificationEmailConsumer
import fr.dcproject.component.notification.email.NotificationEmailSender
import fr.dcproject.component.notification.push.NotificationPushConsumer
import fr.dcproject.component.notification.push.NotificationPushListener
import fr.postgresjson.connexion.Connection
import fr.postgresjson.connexion.Requester
import fr.postgresjson.migration.Migrations
@@ -65,11 +66,15 @@ val KoinModule = module {
}
}
single { NotificationsPush.Builder(get()) }
single { NotificationPushListener.Builder(get()) }
single {
val config: Configuration = get()
NotificationConsumer(get(), get(), get(), get(), get(), get(), config.exchangeNotificationName)
NotificationEmailConsumer(get(), get(), get(), get(), get(), config.exchangeNotificationName)
}
single {
val config: Configuration = get()
NotificationPushConsumer(get(), get(), get(), get(), get(), config.exchangeNotificationName)
}
// RabbitMQ
@@ -114,7 +119,7 @@ val KoinModule = module {
single {
val config: Configuration = get()
Publisher(factory = get(), exchangeName = config.exchangeNotificationName)
NotificationPublisherAsync(factory = get(), exchangeName = config.exchangeNotificationName)
}
single {

View File

@@ -9,6 +9,9 @@ import java.io.IOException
class Mailer(
private val key: String
) {
/**
* Send email via Sendgrid
*/
fun sendEmail(action: () -> Mail): Boolean {
val mail = action()

View File

@@ -0,0 +1,10 @@
package fr.dcproject.common.utils
import io.ktor.application.Application
import io.ktor.application.ApplicationStopped
fun Application.onApplicationStopped(callback: Application.() -> Unit) {
environment.monitor.subscribe(ApplicationStopped) {
callback()
}
}

View File

@@ -0,0 +1,30 @@
package fr.dcproject.common.utils
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Consumer
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
import kotlinx.coroutines.runBlocking
import java.io.IOException
fun Channel.consumeQueue(queueName: String, callback: DefaultConsumer.(ByteArray) -> Unit) {
val consumer: Consumer = object : DefaultConsumer(this) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) = runBlocking {
try {
callback(body)
basicAck(envelope.deliveryTag, false)
} catch (e: Throwable) {
basicNack(envelope.deliveryTag, false, true)
}
}
}
/* Launch Consumer */
basicConsume(queueName, false, consumer)
}

View File

@@ -10,8 +10,8 @@ import fr.dcproject.component.article.routes.UpsertArticle.UpsertArticleRequest.
import fr.dcproject.component.auth.citizen
import fr.dcproject.component.auth.citizenOrNull
import fr.dcproject.component.auth.mustBeAuth
import fr.dcproject.component.notification.ArticleUpdateNotification
import fr.dcproject.component.notification.Publisher
import fr.dcproject.component.notification.ArticleUpdateNotificationMessage
import fr.dcproject.component.notification.NotificationPublisherAsync
import fr.dcproject.component.workgroup.database.WorkgroupRef
import io.konform.validation.Validation
import io.konform.validation.jsonschema.maxItems
@@ -63,7 +63,7 @@ object UpsertArticle {
}
}
fun Route.upsertArticle(repo: ArticleRepository, publisher: Publisher, ac: ArticleAccessControl) {
fun Route.upsertArticle(repo: ArticleRepository, notificationPublisher: NotificationPublisherAsync, ac: ArticleAccessControl) {
suspend fun ApplicationCall.convertRequestToEntity(): ArticleForUpdate = receiveOrBadRequest<Input>().run {
validate().badRequestIfNotValid()
ArticleForUpdate(
@@ -92,7 +92,7 @@ object UpsertArticle {
val versionNumber = a.versionNumber
}
)
publisher.publish(ArticleUpdateNotification(a))
notificationPublisher.publishAsync(ArticleUpdateNotificationMessage(a))
} ?: error("Article not updated")
}
}

View File

@@ -1,115 +0,0 @@
package fr.dcproject.component.notification
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Consumer
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
import fr.dcproject.common.entity.TargetRef
import fr.dcproject.component.follow.database.FollowArticleRepository
import fr.dcproject.component.follow.database.FollowCitizenRepository
import fr.dcproject.component.follow.database.FollowConstitutionRepository
import fr.dcproject.component.follow.database.FollowForView
import io.ktor.utils.io.errors.IOException
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class NotificationConsumer(
private val rabbitFactory: ConnectionFactory,
private val redisClient: RedisClient,
private val followConstitutionRepo: FollowConstitutionRepository,
private val followArticleRepo: FollowArticleRepository,
private val followCitizenRepo: FollowCitizenRepository,
private val notificationEmailSender: NotificationEmailSender,
private val exchangeName: String,
) {
private val redisConnection = redisClient.connect() ?: error("Unable to connect to redis")
private val redis: RedisAsyncCommands<String, String> = redisConnection.async() ?: error("Unable to connect to redis")
private val rabbitConnection = rabbitFactory.newConnection()
private val rabbitChannel = rabbitConnection.createChannel()
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName)
fun close() {
rabbitChannel.close()
rabbitConnection.close()
}
fun start() {
/* Config Rabbit */
rabbitFactory.newConnection().use { connection ->
connection.createChannel().use { channel ->
channel.queueDeclare("push", true, false, false, null)
channel.queueDeclare("email", true, false, false, null)
channel.exchangeDeclare(exchangeName, DIRECT, true)
channel.queueBind("push", exchangeName, "")
channel.queueBind("email", exchangeName, "")
}
}
/* Define Consumer */
val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: ByteArray
) = runBlocking {
followersFromMessage(body) {
redis.zadd(
"notification:${it.follow.createdBy.id}",
it.event.id,
it.rawMessage
)
}
rabbitChannel.basicAck(envelope.deliveryTag, false)
}
}
val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: ByteArray
) {
runBlocking {
followersFromMessage(body) {
notificationEmailSender.sendEmail(it.follow)
logger.debug("EmailSend to: ${it.follow.createdBy.id}")
}
}
rabbitChannel.basicAck(envelope.deliveryTag, false)
}
}
/* Launch Consumer */
rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket
rabbitChannel.basicConsume("email", false, consumerEmail)
}
private suspend fun followersFromMessage(body: ByteArray, action: suspend (DecodedMessage) -> Unit) {
val rawMessage: String = body.toString(Charsets.UTF_8)
val notification: EntityNotification = Notification.fromString(rawMessage)
val follows = when (notification.type) {
"article" -> followArticleRepo.findFollowsByTarget(notification.target)
"constitution" -> followConstitutionRepo.findFollowsByTarget(notification.target)
"citizen" -> followCitizenRepo.findFollowsByTarget(notification.target)
else -> error("event '${notification.type}' not implemented")
}
follows.collect { action(DecodedMessage(notification, rawMessage, it)) }
}
private class DecodedMessage(
val event: EntityNotification,
val rawMessage: String,
val follow: FollowForView<out TargetRef>
)
}

View File

@@ -0,0 +1,67 @@
package fr.dcproject.component.notification
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.DefaultConsumer
import fr.dcproject.common.entity.Entity
import fr.dcproject.common.entity.TargetRef
import fr.dcproject.common.utils.consumeQueue
import fr.dcproject.component.follow.database.FollowArticleRepository
import fr.dcproject.component.follow.database.FollowCitizenRepository
import fr.dcproject.component.follow.database.FollowConstitutionRepository
import fr.dcproject.component.follow.database.FollowForView
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
abstract class NotificationConsumerAbstract(
private val rabbitFactory: ConnectionFactory,
private val followConstitutionRepo: FollowConstitutionRepository,
private val followArticleRepo: FollowArticleRepository,
private val followCitizenRepo: FollowCitizenRepository,
) {
private val rabbitConnection = rabbitFactory.newConnection()
private val rabbitChannel = rabbitConnection.createChannel()
fun close() {
rabbitChannel.close()
rabbitConnection.close()
}
fun declareQueue(queueName: String, exchangeName: String) {
rabbitFactory.newConnection().use { connection ->
connection.createChannel().use { channel ->
channel.queueDeclare(queueName, true, false, false, null)
channel.exchangeDeclare(exchangeName, DIRECT, true)
channel.queueBind(queueName, exchangeName, "")
}
}
}
protected fun consumeQueue(queueName: String, callback: DefaultConsumer.(DecodedMessage<*>) -> Unit) =
rabbitChannel.consumeQueue(queueName) { body ->
runBlocking {
followersFromMessage(body) {
callback(it)
}
}
}
protected suspend fun followersFromMessage(body: ByteArray, action: suspend (DecodedMessage<*>) -> Unit) {
val rawMessage: String = body.toString(Charsets.UTF_8)
val notification: EntityNotificationMessage<*> = NotificationMessage.fromString(rawMessage)
val follows = when (notification.type) {
"article" -> followArticleRepo.findFollowsByTarget(notification.target)
"constitution" -> followConstitutionRepo.findFollowsByTarget(notification.target)
"citizen" -> followCitizenRepo.findFollowsByTarget(notification.target)
else -> error("event '${notification.type}' not implemented")
}
follows.collect { action(DecodedMessage(notification, rawMessage, it)) }
}
protected class DecodedMessage <E : Entity> (
val event: EntityNotificationMessage<E>,
val rawMessage: String,
val follow: FollowForView<out TargetRef>
)
}

View File

@@ -1,70 +0,0 @@
package fr.dcproject.component.notification
import com.sendgrid.helpers.mail.Mail
import com.sendgrid.helpers.mail.objects.Content
import com.sendgrid.helpers.mail.objects.Email
import fr.dcproject.common.email.Mailer
import fr.dcproject.common.entity.EntityI
import fr.dcproject.common.entity.TargetRef
import fr.dcproject.component.article.database.ArticleRepository
import fr.dcproject.component.article.database.ArticleWithTitleI
import fr.dcproject.component.citizen.database.CitizenCreatorI
import fr.dcproject.component.citizen.database.CitizenRepository
import fr.dcproject.component.follow.database.FollowForView
import java.util.UUID
class NotificationEmailSender(
private val mailer: Mailer,
private val domain: String,
private val citizenRepo: CitizenRepository,
private val articleRepo: ArticleRepository
) {
fun sendEmail(follow: FollowForView<out TargetRef>) {
val citizen = citizenRepo.findById(follow.createdBy.id) ?: noCitizen(follow.createdBy.id)
val target = when (follow.target.reference) {
"article" ->
articleRepo.findById(follow.target.id) ?: noTarget(follow.target.id)
else -> noTarget(follow.target.id)
}
val subject = when (follow.target.reference) {
"article" -> """New version for article "${target.title}""""
else -> "Notification"
}
mailer.sendEmail {
Mail(
Email("notification@$domain"),
subject,
Email(citizen.email),
Content("text/plain", generateContent(citizen, target))
).apply {
addContent(Content("text/html", generateHtmlContent(citizen, target)))
}
}
}
private fun generateHtmlContent(citizen: CitizenCreatorI, target: EntityI): String? {
return when (target) {
is ArticleWithTitleI -> """
Hello ${citizen.name.getFullName()},<br/>
The article "${target.title}" was updated, check it <a href="http://$domain/articles/${target.id}">here</a>
""".trimIndent()
else -> noTarget(target.id)
}
}
private fun generateContent(citizen: CitizenCreatorI, target: EntityI): String {
return when (target) {
is ArticleWithTitleI -> """
Hello ${citizen.name.getFullName()},
The article "${target.title}" was updated, check it here: http://$domain/articles/${target.id}
""".trimIndent()
else -> noTarget(target.id)
}
}
class NoCitizen(message: String) : Exception(message)
class NoTarget(message: String) : Exception(message)
private fun noCitizen(id: UUID): Nothing = throw NoCitizen("No Citizen with this id : $id")
private fun noTarget(id: UUID): Nothing = throw NoTarget("No Target with this id : $id")
}

View File

@@ -16,9 +16,9 @@ import java.util.concurrent.atomic.AtomicInteger
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type", visible = true)
@JsonSubTypes(
JsonSubTypes.Type(value = ArticleUpdateNotification::class, name = "article")
JsonSubTypes.Type(value = ArticleUpdateNotificationMessage::class, name = "article")
)
open class Notification(
open class NotificationMessage(
val type: String,
val createdAt: DateTime = DateTime.now()
) {
@@ -48,16 +48,16 @@ open class Notification(
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}
inline fun <reified T : Notification> fromString(raw: String): T = mapper.readValue(raw)
inline fun <reified T : NotificationMessage> fromString(raw: String): T = mapper.readValue(raw)
}
}
open class EntityNotification(
val target: Entity,
open class EntityNotificationMessage <E : Entity> (
val target: E,
type: String,
val action: String
) : Notification(type)
) : NotificationMessage(type)
class ArticleUpdateNotification(
class ArticleUpdateNotificationMessage(
target: ArticleForView
) : EntityNotification(target, "article", "update")
) : EntityNotificationMessage<ArticleForView>(target, "article", "update")

View File

@@ -7,12 +7,15 @@ import kotlinx.coroutines.coroutineScope
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class Publisher(
class NotificationPublisherAsync(
private val factory: ConnectionFactory,
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName),
private val logger: Logger = LoggerFactory.getLogger(NotificationPublisherAsync::class.qualifiedName),
private val exchangeName: String,
) {
suspend fun <T : EntityNotification> publish(it: T): Deferred<Unit> = coroutineScope {
/**
* Publish a new notification message to RabbitMQ
*/
suspend fun <T : EntityNotificationMessage<*>> publishAsync(it: T): Deferred<Unit> = coroutineScope {
async {
factory.newConnection().use { connection ->
connection.createChannel().use { channel ->

View File

@@ -0,0 +1,33 @@
package fr.dcproject.component.notification.email
import com.rabbitmq.client.ConnectionFactory
import fr.dcproject.component.follow.database.FollowArticleRepository
import fr.dcproject.component.follow.database.FollowCitizenRepository
import fr.dcproject.component.follow.database.FollowConstitutionRepository
import fr.dcproject.component.notification.NotificationConsumerAbstract
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class NotificationEmailConsumer(
rabbitFactory: ConnectionFactory,
followConstitutionRepo: FollowConstitutionRepository,
followArticleRepo: FollowArticleRepository,
followCitizenRepo: FollowCitizenRepository,
private val notificationEmailSender: NotificationEmailSender,
private val exchangeName: String,
) : NotificationConsumerAbstract(rabbitFactory, followConstitutionRepo, followArticleRepo, followCitizenRepo) {
private val logger: Logger = LoggerFactory.getLogger(NotificationEmailConsumer::class.qualifiedName)
fun start() {
/* Config Rabbit */
declareQueue(QUEUE_NAME, exchangeName)
consumeQueue(QUEUE_NAME) { message ->
notificationEmailSender.sendEmail(message.follow)
logger.debug("EmailSend to: ${message.follow.createdBy.id}")
}
}
companion object {
private const val QUEUE_NAME = "email"
}
}

View File

@@ -0,0 +1,70 @@
package fr.dcproject.component.notification.email
import com.sendgrid.helpers.mail.Mail
import com.sendgrid.helpers.mail.objects.Content
import com.sendgrid.helpers.mail.objects.Email
import fr.dcproject.common.email.Mailer
import fr.dcproject.common.entity.TargetRef
import fr.dcproject.component.article.database.ArticleForView
import fr.dcproject.component.article.database.ArticleRepository
import fr.dcproject.component.citizen.database.Citizen
import fr.dcproject.component.citizen.database.CitizenRepository
import fr.dcproject.component.follow.database.FollowForView
import fr.dcproject.component.notification.email.content.ArticleNotificationEmailContent
import fr.dcproject.component.notification.email.content.CitizenNotificationEmailContent
import java.util.UUID
/**
* Send notification email on the follower
*/
class NotificationEmailSender(
private val mailer: Mailer,
private val domain: String,
private val citizenRepo: CitizenRepository,
private val articleRepo: ArticleRepository
) {
/**
* Send the Notification Email to the follower user
*/
fun sendEmail(follow: FollowForView<out TargetRef>) {
val citizen = citizenRepo.findById(follow.createdBy.id) ?: noCitizen(follow.createdBy.id)
/**
* Find the complete target entity by its ID according to its reference
*/
val target = when (follow.target.reference) {
"article" -> articleRepo.findById(follow.target.id) ?: noTarget(follow.target.id)
"citizen" -> citizenRepo.findById(follow.target.id) ?: noTarget(follow.target.id)
else -> noTarget(follow.target.id)
}
/**
* Find content of the email according to the target type
*/
val content = when (target) {
is ArticleForView -> ArticleNotificationEmailContent(citizen, target, domain)
is Citizen -> CitizenNotificationEmailContent(citizen, target, domain)
else -> noTargetTypeImplementation(follow.target.reference)
}
/* Send email */
mailer.sendEmail {
Mail(
Email("notification@$domain"),
content.subject,
Email(citizen.email),
Content("text/plain", content.content)
).apply {
addContent(Content("text/html", content.contentHtml))
}
}
}
class NoCitizen(message: String) : Exception(message)
class NoTarget(message: String) : Exception(message)
class NoTargetTypeImplement(message: String) : Exception(message)
private fun noCitizen(id: UUID): Nothing = throw NoCitizen("No Citizen with this id : $id")
private fun noTarget(id: UUID): Nothing = throw NoTarget("No Target with this id : $id")
private fun noTargetTypeImplementation(type: String): Nothing = throw NoTargetTypeImplement("No Target type implemented: $type")
}

View File

@@ -0,0 +1,29 @@
package fr.dcproject.component.notification.email.content
import fr.dcproject.component.article.database.ArticleWithTitleI
import fr.dcproject.component.citizen.database.Citizen
class ArticleNotificationEmailContent(
private val citizen: Citizen,
private val target: ArticleWithTitleI,
private val domain: String,
) : NotificationEmailContent {
override val subject: String
get() = """New version for article "${target.title}""""
override val contentHtml
get() = run {
"""
Hello ${citizen.name.getFullName()},<br/>
The article "${target.title}" was updated, check it <a href="http://$domain/articles/${target.id}">here</a>
""".trimIndent()
}
override val content
get() = run {
"""
Hello ${citizen.name.getFullName()},
The article "${target.title}" was updated, check it here: http://$domain/articles/${target.id}
""".trimIndent()
}
}

View File

@@ -0,0 +1,28 @@
package fr.dcproject.component.notification.email.content
import fr.dcproject.component.citizen.database.Citizen
class CitizenNotificationEmailContent(
private val citizen: Citizen,
private val target: Citizen,
private val domain: String,
) : NotificationEmailContent {
override val subject: String
get() = """New activity for the citizen "${target.name}""""
override val contentHtml
get() = run {
"""
Hello ${citizen.name.getFullName()},
The citizen "${target.name}" was new activity, check it here: <a href="http://$domain/citizens/${target.id}">here</a>
""".trimIndent()
}
override val content
get() = run {
"""
Hello ${citizen.name.getFullName()},
The citizen "${target.name}" was new activity, check it here: http://$domain/citizens/${target.id}
""".trimIndent()
}
}

View File

@@ -0,0 +1,7 @@
package fr.dcproject.component.notification.email.content
interface NotificationEmailContent {
val subject: String
val content: String
val contentHtml: String
}

View File

@@ -0,0 +1,41 @@
package fr.dcproject.component.notification.push
import com.rabbitmq.client.ConnectionFactory
import fr.dcproject.component.follow.database.FollowArticleRepository
import fr.dcproject.component.follow.database.FollowCitizenRepository
import fr.dcproject.component.follow.database.FollowConstitutionRepository
import fr.dcproject.component.notification.NotificationConsumerAbstract
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class NotificationPushConsumer(
rabbitFactory: ConnectionFactory,
followConstitutionRepo: FollowConstitutionRepository,
followArticleRepo: FollowArticleRepository,
followCitizenRepo: FollowCitizenRepository,
redisClient: RedisClient,
private val exchangeName: String,
) : NotificationConsumerAbstract(rabbitFactory, followConstitutionRepo, followArticleRepo, followCitizenRepo) {
private val redisConnection = redisClient.connect() ?: error("Unable to connect to redis")
private val redis: RedisAsyncCommands<String, String> = redisConnection.async() ?: error("Unable to connect to redis")
private val logger: Logger = LoggerFactory.getLogger(NotificationPushConsumer::class.qualifiedName)
fun start() {
/* Config Rabbit */
declareQueue(QUEUE_NAME, exchangeName)
consumeQueue(QUEUE_NAME) { message ->
redis.zadd(
"notification:${message.follow.createdBy.id}",
message.event.id,
message.rawMessage
)
logger.debug("Notification was transferred to the redis (follower: ${message.follow.createdBy.id})")
}
}
companion object {
private const val QUEUE_NAME = "push"
}
}

View File

@@ -1,8 +1,9 @@
package fr.dcproject.component.notification
package fr.dcproject.component.notification.push
import com.fasterxml.jackson.core.JsonProcessingException
import fr.dcproject.component.auth.citizen
import fr.dcproject.component.citizen.database.CitizenI
import fr.dcproject.component.notification.NotificationMessage
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.Frame.Text
import io.ktor.http.cio.websocket.readText
@@ -28,31 +29,42 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
class NotificationsPush(
/**
* Listen a custom flow to mark as read a message.
*
* And listen the redis subscription flow and call a callback when a new message arrives
*/
class NotificationPushListener(
private val redis: RedisAsyncCommands<String, String>,
private val redisConnectionPubSub: StatefulRedisPubSubConnection<String, String>,
citizen: CitizenI,
incoming: Flow<Notification>,
onReceive: suspend (Notification) -> Unit,
incoming: Flow<NotificationMessage>,
onReceive: suspend (NotificationMessage) -> Unit,
) {
class Builder(val redisClient: RedisClient) {
class Builder(redisClient: RedisClient) {
private val redisConnection = redisClient.connect() ?: error("Unable to connect to redis")
private val redisConnectionPubSub = redisClient.connectPubSub() ?: error("Unable to connect to redis PubSub")
private val redis: RedisAsyncCommands<String, String> = redisConnection.async() ?: error("Unable to connect to redis Async")
/**
* Build Listener with citizen, incoming flow and set an outgoing callback
*/
fun build(
citizen: CitizenI,
incoming: Flow<Notification>,
onReceive: suspend (Notification) -> Unit,
): NotificationsPush = NotificationsPush(redis, redisConnectionPubSub, citizen, incoming, onReceive)
incoming: Flow<NotificationMessage>,
onReceive: suspend (NotificationMessage) -> Unit,
): NotificationPushListener = NotificationPushListener(redis, redisConnectionPubSub, citizen, incoming, onReceive)
/**
* Build NotificationPush with only a WebSocket session
*/
@ExperimentalCoroutinesApi
fun build(ws: DefaultWebSocketServerSession): NotificationsPush {
fun build(ws: DefaultWebSocketServerSession): NotificationPushListener {
/* Convert channel of string from websocket, to a flow of Notification object */
val incomingFlow: Flow<Notification> = ws.incoming.consumeAsFlow()
val incomingFlow: Flow<NotificationMessage> = ws.incoming.consumeAsFlow()
.mapNotNull<Frame, Text> { it as? Frame.Text }
.map { it.readText() }
.map { Notification.fromString(it) }
.map { NotificationMessage.fromString(it) }
return build(ws.call.citizen, incomingFlow) {
ws.outgoing.send(Text(it.toString()))
@@ -62,30 +74,42 @@ class NotificationsPush(
}
}
/**
* The key of the SortedSet in Redis which contains all the messages of a user
*/
private val key = "notification:${citizen.id}"
private var score: Double = 0.0
/**
* The last score (a kind of sorted ids) of message
*/
private var lastScore: Double = 0.0
/**
* Configure the listener to listen all new notifications
*/
private val listener = object : RedisPubSubAdapter<String, String>() {
/* On new key publish */
override fun message(pattern: String?, channel: String?, message: String?) {
runBlocking {
getNotifications().collect {
getNewUnreadNotifications().collect {
onReceive(it)
}
}
}
}
/**
* Init the listener and the callback
*/
init {
/* Mark as read all incoming notifications */
GlobalScope.launch {
incoming.collect {
markAsRead(it)
it.markAsRead()
}
}
/* Get old notification and sent it to websocket */
runBlocking {
getNotifications().collect {
getNewUnreadNotifications().collect {
onReceive(it)
}
}
@@ -99,34 +123,51 @@ class NotificationsPush(
}
}
/**
* Close the redis subscription
*/
fun close() {
redisConnectionPubSub.removeListener(listener)
}
/* Return flow with all new notifications */
private fun getNotifications() = flow<Notification> {
/**
* Get All new notification from redis and
* Return flow with notifications
*
* On start, on the first call, this method return all unread notification of the user
*
* Internally this method return all messages that greater of the lastScore,
* then define the lastScore with the score of the last message.
*/
private fun getNewUnreadNotifications() = flow<NotificationMessage> {
redis
.zrangebyscoreWithScores(
key,
Range.from(
Boundary.excluding(score),
Boundary.excluding(lastScore),
Boundary.including(Double.POSITIVE_INFINITY)
),
Limit.from(100)
)
.get().forEach {
emit(Notification.fromString(it.value))
if (it.score > score) score = it.score
/* Build message object from raw string and return it */
emit(NotificationMessage.fromString(it.value))
if (it.score > lastScore) lastScore = it.score
}
}
private suspend fun markAsRead(notificationMessage: Notification) = coroutineScope {
/**
* Mark one notification as read.
*
* Internally, this method remove the message of the SortedSet in redis
*/
private suspend fun NotificationMessage.markAsRead() = coroutineScope {
try {
redis.zremrangebyscore(
key,
Range.from(
Boundary.including(notificationMessage.id),
Boundary.including(notificationMessage.id)
Boundary.including(id),
Boundary.including(id)
)
)
} catch (e: JsonProcessingException) {

View File

@@ -1,6 +1,6 @@
package fr.dcproject.component.notification.routes
import fr.dcproject.component.notification.NotificationsPush
import fr.dcproject.component.notification.push.NotificationPushListener
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.routing.Route
import io.ktor.websocket.webSocket
@@ -13,8 +13,8 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
*/
@ExperimentalCoroutinesApi
@KtorExperimentalLocationsAPI
fun Route.notificationArticle(pushBuilder: NotificationsPush.Builder) {
fun Route.notificationArticle(pushListenerBuilder: NotificationPushListener.Builder) {
webSocket("/notifications") {
pushBuilder.build(this)
pushListenerBuilder.build(this)
}
}

View File

@@ -10,10 +10,11 @@ import fr.dcproject.component.citizen.database.CitizenCreator
import fr.dcproject.component.citizen.database.CitizenI
import fr.dcproject.component.follow.database.FollowArticleRepository
import fr.dcproject.component.follow.database.FollowForView
import fr.dcproject.component.notification.ArticleUpdateNotification
import fr.dcproject.component.notification.NotificationConsumer
import fr.dcproject.component.notification.NotificationEmailSender
import fr.dcproject.component.notification.Publisher
import fr.dcproject.component.notification.ArticleUpdateNotificationMessage
import fr.dcproject.component.notification.NotificationPublisherAsync
import fr.dcproject.component.notification.email.NotificationEmailConsumer
import fr.dcproject.component.notification.email.NotificationEmailSender
import fr.dcproject.component.notification.push.NotificationPushConsumer
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.util.KtorExperimentalAPI
import io.lettuce.core.RedisClient
@@ -88,9 +89,8 @@ class NotificationConsumerTest {
}
/* Config consumer */
val consumer = NotificationConsumer(
val emailConsumer = NotificationEmailConsumer(
rabbitFactory = rabbitFactory,
redisClient = redisClient,
followArticleRepo = followArticleRepo,
followConstitutionRepo = mockk(), // TODO test followConstitution
followCitizenRepo = mockk(), // TODO test followCitizen
@@ -98,12 +98,21 @@ class NotificationConsumerTest {
exchangeName = "notification",
).apply { start() }
val pushConsumer = NotificationPushConsumer(
rabbitFactory = rabbitFactory,
followArticleRepo = followArticleRepo,
followConstitutionRepo = mockk(), // TODO test followConstitution
followCitizenRepo = mockk(), // TODO test followCitizen
redisClient = redisClient,
exchangeName = "notification",
).apply { start() }
/* Push message */
Publisher(
NotificationPublisherAsync(
factory = rabbitFactory,
exchangeName = "notification",
).publish(
ArticleUpdateNotification(
).publishAsync(
ArticleUpdateNotificationMessage(
ArticleForView(
title = "MyTitle",
content = "myContent",
@@ -122,6 +131,7 @@ class NotificationConsumerTest {
verify(timeout = 2000) { emailSender.sendEmail(any()) }
verify(timeout = 2000) { asyncCommand.zadd(any<String>(), any<Double>(), any<String>()) }
consumer.close()
emailConsumer.close()
pushConsumer.close()
}
}

View File

@@ -6,9 +6,9 @@ import fr.dcproject.component.article.database.ArticleForView
import fr.dcproject.component.auth.database.UserCreator
import fr.dcproject.component.citizen.database.CitizenCreator
import fr.dcproject.component.citizen.database.CitizenI
import fr.dcproject.component.notification.ArticleUpdateNotification
import fr.dcproject.component.notification.Notification
import fr.dcproject.component.notification.NotificationsPush
import fr.dcproject.component.notification.ArticleUpdateNotificationMessage
import fr.dcproject.component.notification.NotificationMessage
import fr.dcproject.component.notification.push.NotificationPushListener
import io.lettuce.core.RedisClient
import io.mockk.every
import io.mockk.spyk
@@ -68,14 +68,14 @@ internal class NotificationsPushTest {
title = "Super Title",
)
/* Init two notification, one called before subscription, and the other after */
val notifBeforeSubscribe = ArticleUpdateNotification(article)
val notifBeforeSubscribe = ArticleUpdateNotificationMessage(article)
runBlocking {
delay(100)
}
val notifAfterSubscribe = ArticleUpdateNotification(article)
val notifAfterSubscribe = ArticleUpdateNotificationMessage(article)
/* init event for emulate incoming message from websocket */
val event = MutableSharedFlow<Notification>()
val event = MutableSharedFlow<NotificationMessage>()
val incomingFlow = event.asSharedFlow()
spyk(object { var counter = 0 }).run { /* Counter for count the callback of notification */
@@ -90,7 +90,7 @@ internal class NotificationsPushTest {
}
/* Init NotificationPush system, and set assertion in callback */
val notificationPush = NotificationsPush.Builder(redisClient).build(citizen, incomingFlow) {
val notificationPush = NotificationPushListener.Builder(redisClient).build(citizen, incomingFlow) {
counter++
if (counter == 1) it.id `should be equal to` notifBeforeSubscribe.id
else it.id `should be equal to` notifAfterSubscribe.id

View File

@@ -1,25 +1,19 @@
package integration
import fr.dcproject.common.utils.toUUID
import fr.dcproject.component.article.database.ArticleForView
import fr.dcproject.component.auth.database.UserCreator
import fr.dcproject.component.citizen.database.CitizenCreator
import fr.dcproject.component.citizen.database.CitizenI.Name
import fr.dcproject.component.notification.ArticleUpdateNotification
import fr.dcproject.component.notification.Notification
import fr.dcproject.component.notification.Publisher
import fr.dcproject.component.notification.ArticleUpdateNotificationMessage
import fr.dcproject.component.notification.NotificationMessage
import integration.steps.given.`Given I have article update notification`
import integration.steps.given.`Given I have article`
import integration.steps.given.`Given I have citizen`
import integration.steps.given.`Given I have follow on article`
import integration.steps.given.`authenticated in url as`
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.readText
import kotlinx.coroutines.launch
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Tags
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.koin.test.get
import kotlin.test.assertEquals
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@@ -31,26 +25,7 @@ class `Notification routes` : BaseTest() {
`Given I have citizen`("John", "Doe", id = "1a34191a-9cde-45ba-8ac1-230138a102d3")
`Given I have article`(id = "a06cbfb7-3094-4d64-aaa1-7486c0c292f4", createdBy = Name(firstName = "John", lastName = "Doe"))
`Given I have follow on article`("John", "Doe", article = "a06cbfb7-3094-4d64-aaa1-7486c0c292f4")
val notification = ArticleUpdateNotification(
ArticleForView(
id = "a06cbfb7-3094-4d64-aaa1-7486c0c292f4".toUUID(),
title = "MyTitle",
content = "myContent",
description = "myDescription",
createdBy = CitizenCreator(
id = "1a34191a-9cde-45ba-8ac1-230138a102d3".toUUID(),
name = Name(firstName = "John", lastName = "Doe"),
email = "john-doe@plop.com",
user = UserCreator(username = "john-doe"),
)
)
)
val publisher = get<Publisher>()
launch {
publisher
.publish(notification)
.await()
}
`Given I have article update notification`("a06cbfb7-3094-4d64-aaa1-7486c0c292f4")
Thread.sleep(1000)
@@ -62,7 +37,7 @@ class `Notification routes` : BaseTest() {
) { incoming, outgoing ->
incoming.receive().let {
when (it) {
is Frame.Text -> Notification.fromString<ArticleUpdateNotification>(it.readText()).let { notif ->
is Frame.Text -> NotificationMessage.fromString<ArticleUpdateNotificationMessage>(it.readText()).let { notif ->
assertEquals(
"a06cbfb7-3094-4d64-aaa1-7486c0c292f4",
notif.target.id.toString()

View File

@@ -0,0 +1,40 @@
package integration.steps.given
import fr.dcproject.common.utils.toUUID
import fr.dcproject.component.article.database.ArticleForView
import fr.dcproject.component.auth.database.UserCreator
import fr.dcproject.component.citizen.database.CitizenCreator
import fr.dcproject.component.citizen.database.CitizenI
import fr.dcproject.component.notification.ArticleUpdateNotificationMessage
import fr.dcproject.component.notification.NotificationPublisherAsync
import io.ktor.server.testing.TestApplicationEngine
import kotlinx.coroutines.launch
import org.koin.mp.KoinPlatformTools
import java.util.UUID
fun TestApplicationEngine.`Given I have article update notification`(
id: String = UUID.randomUUID().toString()
) {
val notification = ArticleUpdateNotificationMessage(
ArticleForView(
id = id.toUUID(),
title = "MyTitle",
content = "myContent",
description = "myDescription",
createdBy = CitizenCreator(
id = "1a34191a-9cde-45ba-8ac1-230138a102d3".toUUID(),
name = CitizenI.Name(firstName = "John", lastName = "Doe"),
email = "john-doe@plop.com",
user = UserCreator(username = "john-doe"),
)
)
)
launch {
KoinPlatformTools
.defaultContext()
.get()
.get<NotificationPublisherAsync>()
.publishAsync(notification)
.await()
}
}