move notification to component

This commit is contained in:
2021-02-04 23:34:20 +01:00
parent 89c15eb1cf
commit bb212f9c6c
14 changed files with 55 additions and 34 deletions

View File

@@ -7,9 +7,9 @@ import fr.dcproject.component.article.ArticleRepository
import fr.dcproject.component.article.routes.UpsertArticle.UpsertArticleRequest.Input
import fr.dcproject.component.auth.citizen
import fr.dcproject.component.auth.citizenOrNull
import fr.dcproject.component.notification.ArticleUpdateNotification
import fr.dcproject.component.notification.Publisher
import fr.dcproject.component.workgroup.WorkgroupRef
import fr.dcproject.notification.ArticleUpdateNotification
import fr.dcproject.notification.publisher.Publisher
import fr.dcproject.security.assert
import io.ktor.application.ApplicationCall
import io.ktor.application.call

View File

@@ -25,6 +25,7 @@ fun jwtInstallation(userRepo: UserRepository): Authentication.Configuration.() -
}
}
/* Token in URL */
jwt("url") {
verifier(JwtConfig.verifier)
realm = "dc-project.fr"

View File

@@ -0,0 +1,18 @@
package fr.dcproject.component.doc.routes
import fr.dcproject.utils.readResource
import io.ktor.application.call
import io.ktor.http.ContentType
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.response.respondText
import io.ktor.routing.Route
import io.ktor.routing.get
import io.ktor.util.KtorExperimentalAPI
@KtorExperimentalLocationsAPI
@KtorExperimentalAPI
fun Route.definition() {
get("/") {
call.respondText("/openapi.yaml".readResource(), ContentType("text", "yaml"))
}
}

View File

@@ -0,0 +1,16 @@
package fr.dcproject.component.doc.routes
import io.ktor.auth.authenticate
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.routing.Routing
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.ExperimentalCoroutinesApi
@KtorExperimentalAPI
@ExperimentalCoroutinesApi
@KtorExperimentalLocationsAPI
fun Routing.installDocRoutes() {
authenticate(optional = true) {
definition()
}
}

View File

@@ -0,0 +1,57 @@
package fr.dcproject.component.notification
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.PropertyNamingStrategies
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.datatype.joda.JodaModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import fr.dcproject.component.article.ArticleForView
import fr.postgresjson.entity.UuidEntity
import org.joda.time.DateTime
import java.util.concurrent.atomic.AtomicInteger
open class Notification(
val type: String,
val createdAt: DateTime = DateTime.now()
) {
val id: Double = nextId()
private fun nextId(): Double {
return (createdAt.millis.toString() + nextInt().toString()).toDouble()
}
override fun toString(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification")
fun toByteArray() = toString().toByteArray()
companion object {
private val counter: AtomicInteger = AtomicInteger(1000)
fun nextInt(): Int {
counter.compareAndSet(9999, 1000)
return counter.incrementAndGet()
}
val mapper = jacksonObjectMapper().apply {
registerModule(SimpleModule())
propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE
registerModule(JodaModule())
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}
inline fun <reified T : Notification> fromString(raw: String): T = mapper.readValue(raw)
}
}
open class EntityNotification(
val target: UuidEntity,
type: String,
val action: String
) : Notification(type)
class ArticleUpdateNotification(
target: ArticleForView
) : EntityNotification(target, "article", "update")

View File

@@ -0,0 +1,114 @@
package fr.dcproject.component.notification
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Consumer
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
import fr.dcproject.common.entity.TargetRef
import fr.dcproject.component.citizen.CitizenRef
import fr.dcproject.component.follow.FollowArticleRepository
import fr.dcproject.component.follow.FollowConstitutionRepository
import fr.dcproject.component.follow.FollowSimple
import fr.dcproject.messages.NotificationEmailSender
import io.ktor.utils.io.errors.IOException
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class NotificationConsumer(
private val rabbitFactory: ConnectionFactory,
private val redisClient: RedisClient,
private val followConstitutionRepo: FollowConstitutionRepository,
private val followArticleRepo: FollowArticleRepository,
private val notificationEmailSender: NotificationEmailSender,
private val exchangeName: String,
) {
private val redisConnection = redisClient.connect() ?: error("Unable to connect to redis")
private val redis: RedisAsyncCommands<String, String> = redisConnection.async() ?: error("Unable to connect to redis")
private val rabbitConnection = rabbitFactory.newConnection()
private val rabbitChannel = rabbitConnection.createChannel()
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName)
fun close() {
rabbitChannel.close()
rabbitConnection.close()
}
fun start() {
/* Config Rabbit */
rabbitFactory.newConnection().use { connection ->
connection.createChannel().use { channel ->
channel.queueDeclare("push", true, false, false, null)
channel.queueDeclare("email", true, false, false, null)
channel.exchangeDeclare(exchangeName, DIRECT, true)
channel.queueBind("push", exchangeName, "")
channel.queueBind("email", exchangeName, "")
}
}
/* Define Consumer */
val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: ByteArray
) = runBlocking {
followersFromMessage(body) {
redis.zadd(
"notification:${it.follow.createdBy.id}",
it.event.id,
it.rawMessage
)
}
rabbitChannel.basicAck(envelope.deliveryTag, false)
}
}
val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: ByteArray
) {
runBlocking {
followersFromMessage(body) {
notificationEmailSender.sendEmail(it.follow)
logger.debug("EmailSend to: ${it.follow.createdBy.id}")
}
}
rabbitChannel.basicAck(envelope.deliveryTag, false)
}
}
/* Launch Consumer */
rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket
rabbitChannel.basicConsume("email", false, consumerEmail)
}
private suspend fun followersFromMessage(body: ByteArray, action: suspend (DecodedMessage) -> Unit) {
val rawMessage: String = body.toString(Charsets.UTF_8)
val notification: EntityNotification = Notification.fromString<EntityNotification>(rawMessage) ?: error("Unable to deserialize notification message from rabbit")
val follows = when (notification.type) {
"article" -> followArticleRepo.findFollowsByTarget(notification.target)
"constitution" -> followConstitutionRepo.findFollowsByTarget(notification.target)
else -> error("event '${notification.type}' not implemented")
}
follows.collect { action(DecodedMessage(notification, rawMessage, it)) }
}
private class DecodedMessage(
val event: EntityNotification,
val rawMessage: String,
val follow: FollowSimple<out TargetRef, CitizenRef>
)
}

View File

@@ -0,0 +1,135 @@
package fr.dcproject.component.notification
import com.fasterxml.jackson.core.JsonProcessingException
import fr.dcproject.component.auth.citizen
import fr.dcproject.component.citizen.CitizenI
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.Frame.Text
import io.ktor.http.cio.websocket.readText
import io.ktor.routing.Route
import io.ktor.websocket.DefaultWebSocketServerSession
import io.lettuce.core.Limit
import io.lettuce.core.Range
import io.lettuce.core.Range.Boundary
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.pubsub.RedisPubSubAdapter
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
class NotificationsPush private constructor(
private val redis: RedisAsyncCommands<String, String>,
private val redisConnectionPubSub: StatefulRedisPubSubConnection<String, String>,
citizen: CitizenI,
incoming: Flow<Notification>,
onRecieve: suspend (Notification) -> Unit,
) {
class Builder(val redisClient: RedisClient) {
private val redisConnection = redisClient.connect() ?: error("Unable to connect to redis")
private val redisConnectionPubSub = redisClient.connectPubSub() ?: error("Unable to connect to redis")
private val redis: RedisAsyncCommands<String, String> = redisConnection.async() ?: error("Unable to connect to redis")
fun build(
citizen: CitizenI,
incoming: Flow<Notification>,
onRecieve: suspend (Notification) -> Unit,
): NotificationsPush = NotificationsPush(redis, redisConnectionPubSub, citizen, incoming, onRecieve)
@ExperimentalCoroutinesApi
fun build(ws: DefaultWebSocketServerSession): NotificationsPush {
/* Convert channel of string from websocket, to a flow of Notification object */
val incomingFlow: Flow<Notification> = ws.incoming.consumeAsFlow()
.mapNotNull<Frame, Text> { it as? Frame.Text }
.map { it.readText() }
.map { Notification.fromString(it) }
return build(ws.call.citizen, incomingFlow) {
ws.outgoing.send(Text(it.toString()))
}.apply {
ws.outgoing.invokeOnClose { close() }
}
}
}
private val key = "notification:${citizen.id}"
private var score: Double = 0.0
private val listener = object : RedisPubSubAdapter<String, String>() {
/* On new key publish */
override fun message(pattern: String?, channel: String?, message: String?) {
runBlocking {
getNotifications().collect {
onRecieve(it)
}
}
}
}
init {
/* Mark as read all incoming notifications */
GlobalScope.launch {
incoming.collect {
markAsRead(it)
}
}
/* Get old notification and sent it to websocket */
runBlocking {
getNotifications().collect { onRecieve(it) }
}
/* Lisen redis event, and sent the new notification into websocket */
redisConnectionPubSub.run {
addListener(listener)
/* Register to the events */
async()?.psubscribe("__key*__:$key") ?: error("Unable to connect to redis")
}
}
fun close() {
redisConnectionPubSub.removeListener(listener)
}
/* Return flow with all new notifications */
private fun getNotifications() = flow<Notification> {
redis
.zrangebyscoreWithScores(
key,
Range.from(
Boundary.excluding(score),
Boundary.including(Double.POSITIVE_INFINITY)
),
Limit.from(100)
)
.get().forEach {
emit(Notification.fromString(it.value))
if (it.score > score) score = it.score
}
}
private suspend fun markAsRead(notificationMessage: Notification) = coroutineScope {
try {
redis.zremrangebyscore(
key,
Range.from(
Boundary.including(notificationMessage.id),
Boundary.including(notificationMessage.id)
)
)
} catch (e: JsonProcessingException) {
LoggerFactory.getLogger(Route::class.qualifiedName)
.error("Unable to deserialize notification")
}
}
}

View File

@@ -0,0 +1,25 @@
package fr.dcproject.component.notification
import com.rabbitmq.client.ConnectionFactory
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class Publisher(
private val factory: ConnectionFactory,
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName),
private val exchangeName: String,
) {
suspend fun <T : EntityNotification> publish(it: T): Deferred<Unit> = coroutineScope {
async {
factory.newConnection().use { connection ->
connection.createChannel().use { channel ->
channel.basicPublish(exchangeName, "", null, it.toString().toByteArray())
logger.debug("Publish message ${it.target.id}")
}
}
}
}
}

View File

@@ -0,0 +1,20 @@
package fr.dcproject.component.notification.routes
import fr.dcproject.component.notification.NotificationsPush
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.routing.Route
import io.ktor.websocket.webSocket
import kotlinx.coroutines.ExperimentalCoroutinesApi
/**
* Consume Websocket, then remove notification in redis.
*
* Sent all notification to websocket.
*/
@ExperimentalCoroutinesApi
@KtorExperimentalLocationsAPI
fun Route.notificationArticle(pushBuilder: NotificationsPush.Builder) {
webSocket("/notifications") {
pushBuilder.build(this)
}
}

View File

@@ -0,0 +1,15 @@
package fr.dcproject.component.notification.routes
import io.ktor.auth.authenticate
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.routing.Routing
import kotlinx.coroutines.ExperimentalCoroutinesApi
import org.koin.ktor.ext.get
@ExperimentalCoroutinesApi
@KtorExperimentalLocationsAPI
fun Routing.installNotificationsRoutes() {
authenticate("url") {
notificationArticle(get())
}
}