Rename event to notification
This commit is contained in:
@@ -3,7 +3,7 @@ package fr.dcproject.application
|
||||
import com.fasterxml.jackson.core.util.DefaultIndenter
|
||||
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature
|
||||
import com.fasterxml.jackson.databind.PropertyNamingStrategy
|
||||
import com.fasterxml.jackson.databind.PropertyNamingStrategies
|
||||
import com.fasterxml.jackson.databind.SerializationFeature
|
||||
import com.fasterxml.jackson.datatype.joda.JodaModule
|
||||
import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException
|
||||
@@ -33,7 +33,7 @@ import fr.dcproject.component.vote.routes.installVoteRoutes
|
||||
import fr.dcproject.component.vote.voteKoinModule
|
||||
import fr.dcproject.component.workgroup.routes.installWorkgroupRoutes
|
||||
import fr.dcproject.component.workgroup.workgroupKoinModule
|
||||
import fr.dcproject.event.EventNotification
|
||||
import fr.dcproject.notification.EventNotification
|
||||
import fr.dcproject.routes.definition
|
||||
import fr.dcproject.routes.notificationArticle
|
||||
import fr.dcproject.security.AccessDeniedException
|
||||
@@ -123,7 +123,7 @@ fun Application.module(env: Env = PROD) {
|
||||
masking = false
|
||||
}
|
||||
|
||||
EventNotification(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName, get()).config()
|
||||
EventNotification(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName).config()
|
||||
|
||||
install(Authentication, jwtInstallation(get()))
|
||||
|
||||
@@ -131,7 +131,7 @@ fun Application.module(env: Env = PROD) {
|
||||
|
||||
install(ContentNegotiation) {
|
||||
jackson {
|
||||
propertyNamingStrategy = PropertyNamingStrategy.SNAKE_CASE
|
||||
propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE
|
||||
|
||||
registerModule(JodaModule())
|
||||
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
|
||||
|
||||
@@ -8,7 +8,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule
|
||||
import com.fasterxml.jackson.datatype.joda.JodaModule
|
||||
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
||||
import com.rabbitmq.client.ConnectionFactory
|
||||
import fr.dcproject.event.publisher.Publisher
|
||||
import fr.dcproject.notification.publisher.Publisher
|
||||
import fr.dcproject.messages.Mailer
|
||||
import fr.dcproject.messages.NotificationEmailSender
|
||||
import fr.postgresjson.connexion.Connection
|
||||
@@ -78,7 +78,7 @@ val KoinModule = module {
|
||||
// Mailer
|
||||
single { Mailer(Configuration.sendGridKey) }
|
||||
|
||||
single { Publisher(get(), get(), exchangeName = Configuration.exchangeNotificationName) }
|
||||
single { Publisher(factory = get(), exchangeName = Configuration.exchangeNotificationName) }
|
||||
|
||||
single { NotificationEmailSender(get<Mailer>(), Configuration.domain, get(), get()) }
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@ import fr.dcproject.component.article.routes.UpsertArticle.UpsertArticleRequest.
|
||||
import fr.dcproject.component.auth.citizen
|
||||
import fr.dcproject.component.auth.citizenOrNull
|
||||
import fr.dcproject.component.workgroup.WorkgroupRef
|
||||
import fr.dcproject.event.ArticleUpdate
|
||||
import fr.dcproject.event.publisher.Publisher
|
||||
import fr.dcproject.notification.ArticleUpdateNotification
|
||||
import fr.dcproject.notification.publisher.Publisher
|
||||
import fr.dcproject.security.assert
|
||||
import io.ktor.application.ApplicationCall
|
||||
import io.ktor.application.call
|
||||
@@ -59,7 +59,7 @@ object UpsertArticle {
|
||||
ac.assert { canUpsert(article, citizenOrNull) }
|
||||
val newArticle: ArticleForView = repo.upsert(article) ?: error("Article not updated")
|
||||
call.respond(newArticle)
|
||||
publisher.publish(ArticleUpdate(newArticle))
|
||||
publisher.publish(ArticleUpdateNotification(newArticle))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,58 +0,0 @@
|
||||
package fr.dcproject.event
|
||||
|
||||
import fr.postgresjson.entity.Serializable
|
||||
import fr.postgresjson.entity.UuidEntity
|
||||
import io.ktor.application.Application
|
||||
import io.ktor.application.ApplicationEvents
|
||||
import io.ktor.application.ApplicationFeature
|
||||
import io.ktor.application.EventDefinition
|
||||
import io.ktor.application.EventHandler
|
||||
import io.ktor.util.AttributeKey
|
||||
import io.ktor.util.KtorExperimentalAPI
|
||||
import kotlinx.coroutines.DisposableHandle
|
||||
import org.joda.time.DateTime
|
||||
import kotlin.random.Random.Default.nextInt
|
||||
|
||||
open class Event(
|
||||
val type: String,
|
||||
val createdAt: DateTime = DateTime.now()
|
||||
) : Serializable {
|
||||
val id: Double = randId(createdAt.millis)
|
||||
|
||||
private fun randId(time: Long): Double {
|
||||
return (time.toString() + nextInt(1000, 9999).toString()).toDouble()
|
||||
}
|
||||
}
|
||||
|
||||
open class EntityEvent(
|
||||
val target: UuidEntity,
|
||||
type: String,
|
||||
val action: String
|
||||
) : Event(type)
|
||||
|
||||
/**
|
||||
* Installation Class
|
||||
*/
|
||||
class EventSubscriber {
|
||||
class Configuration(private val monitor: ApplicationEvents) {
|
||||
private val subscribers = mutableListOf<DisposableHandle>()
|
||||
fun <T : Event> subscribe(definition: EventDefinition<T>, handler: EventHandler<T>): DisposableHandle {
|
||||
return monitor.subscribe(definition, handler).also {
|
||||
subscribers.add(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
companion object Feature : ApplicationFeature<Application, Configuration, EventSubscriber> {
|
||||
override val key = AttributeKey<EventSubscriber>("EventSubscriber")
|
||||
|
||||
@KtorExperimentalAPI
|
||||
override fun install(
|
||||
pipeline: Application,
|
||||
configure: Configuration.() -> Unit
|
||||
): EventSubscriber {
|
||||
Configuration(pipeline.environment.monitor).apply(configure)
|
||||
return EventSubscriber()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,11 @@
|
||||
package fr.dcproject.event
|
||||
package fr.dcproject.notification
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.PropertyNamingStrategies
|
||||
import com.fasterxml.jackson.databind.SerializationFeature
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule
|
||||
import com.fasterxml.jackson.datatype.joda.JodaModule
|
||||
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import com.rabbitmq.client.AMQP
|
||||
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
|
||||
@@ -15,24 +19,56 @@ import fr.dcproject.component.citizen.CitizenRef
|
||||
import fr.dcproject.component.follow.FollowArticleRepository
|
||||
import fr.dcproject.component.follow.FollowConstitutionRepository
|
||||
import fr.dcproject.component.follow.FollowSimple
|
||||
import fr.dcproject.event.publisher.Publisher
|
||||
import fr.dcproject.notification.publisher.Publisher
|
||||
import fr.dcproject.messages.NotificationEmailSender
|
||||
import io.ktor.application.EventDefinition
|
||||
import fr.postgresjson.entity.UuidEntity
|
||||
import io.ktor.utils.io.errors.IOException
|
||||
import io.lettuce.core.api.async.RedisAsyncCommands
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.joda.time.DateTime
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import kotlin.random.Random
|
||||
|
||||
open class Notification(
|
||||
val type: String,
|
||||
val createdAt: DateTime = DateTime.now()
|
||||
) {
|
||||
val id: Double = randId(createdAt.millis)
|
||||
|
||||
private fun randId(time: Long): Double {
|
||||
return (time.toString() + Random.nextInt(1000, 9999).toString()).toDouble()
|
||||
}
|
||||
|
||||
fun serialize(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification")
|
||||
|
||||
fun toByteArray() = serialize().toByteArray()
|
||||
|
||||
class ArticleUpdate(
|
||||
target: ArticleForView
|
||||
) : EntityEvent(target, "article", "update") {
|
||||
companion object {
|
||||
val event = EventDefinition<ArticleUpdate>()
|
||||
val mapper = jacksonObjectMapper().apply {
|
||||
registerModule(SimpleModule())
|
||||
propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE
|
||||
|
||||
registerModule(JodaModule())
|
||||
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
|
||||
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
}
|
||||
|
||||
inline fun <reified T : Notification> deserialize(raw: String): T = mapper.readValue(raw)
|
||||
}
|
||||
}
|
||||
|
||||
open class EntityNotification(
|
||||
val target: UuidEntity,
|
||||
type: String,
|
||||
val action: String
|
||||
) : Notification(type)
|
||||
|
||||
class ArticleUpdateNotification(
|
||||
target: ArticleForView
|
||||
) : EntityNotification(target, "article", "update")
|
||||
|
||||
class EventNotification(
|
||||
private val rabbitFactory: ConnectionFactory,
|
||||
private val redis: RedisAsyncCommands<String, String>,
|
||||
@@ -40,11 +76,7 @@ class EventNotification(
|
||||
private val followArticleRepo: FollowArticleRepository,
|
||||
private val notificationEmailSender: NotificationEmailSender,
|
||||
private val exchangeName: String,
|
||||
mapper: ObjectMapper,
|
||||
) {
|
||||
private val mapper: ObjectMapper = mapper.copy()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
|
||||
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName)
|
||||
|
||||
fun config() {
|
||||
@@ -70,11 +102,11 @@ class EventNotification(
|
||||
properties: AMQP.BasicProperties,
|
||||
body: ByteArray
|
||||
) = runBlocking {
|
||||
decodeEvent(body) {
|
||||
decodeMessage(body) {
|
||||
redis.zadd(
|
||||
"notification:${it.follow.createdBy.id}",
|
||||
it.event.id,
|
||||
it.rawEvent
|
||||
it.rawMessage
|
||||
)
|
||||
}
|
||||
|
||||
@@ -91,7 +123,7 @@ class EventNotification(
|
||||
body: ByteArray
|
||||
) {
|
||||
runBlocking {
|
||||
decodeEvent(body) {
|
||||
decodeMessage(body) {
|
||||
notificationEmailSender.sendEmail(it.follow)
|
||||
logger.debug("EmailSend to: ${it.follow.createdBy.id}")
|
||||
}
|
||||
@@ -104,21 +136,21 @@ class EventNotification(
|
||||
rabbitChannel.basicConsume("email", false, consumerEmail)
|
||||
}
|
||||
|
||||
private suspend fun decodeEvent(body: ByteArray, action: suspend (Msg) -> Unit) {
|
||||
val rawEvent: String = body.toString(Charsets.UTF_8)
|
||||
val event: EntityEvent = mapper.readValue(rawEvent) ?: error("Unable to deserialize event message from rabbit")
|
||||
val targets = when (event.type) {
|
||||
"article" -> followArticleRepo.findFollowsByTarget(event.target)
|
||||
"constitution" -> followConstitutionRepo.findFollowsByTarget(event.target)
|
||||
else -> error("event '${event.type}' not implemented")
|
||||
private suspend fun decodeMessage(body: ByteArray, action: suspend (DecodedMessage) -> Unit) {
|
||||
val rawMessage: String = body.toString(Charsets.UTF_8)
|
||||
val notification: EntityNotification = Notification.deserialize<EntityNotification>(rawMessage) ?: error("Unable to deserialize notification message from rabbit")
|
||||
val follows = when (notification.type) {
|
||||
"article" -> followArticleRepo.findFollowsByTarget(notification.target)
|
||||
"constitution" -> followConstitutionRepo.findFollowsByTarget(notification.target)
|
||||
else -> error("event '${notification.type}' not implemented")
|
||||
}
|
||||
|
||||
targets.collect { action(Msg(event, rawEvent, it)) }
|
||||
follows.collect { action(DecodedMessage(notification, rawMessage, it)) }
|
||||
}
|
||||
|
||||
private class Msg(
|
||||
val event: EntityEvent,
|
||||
val rawEvent: String,
|
||||
private class DecodedMessage(
|
||||
val event: EntityNotification,
|
||||
val rawMessage: String,
|
||||
val follow: FollowSimple<out TargetRef, CitizenRef>
|
||||
)
|
||||
}
|
||||
@@ -1,8 +1,7 @@
|
||||
package fr.dcproject.event.publisher
|
||||
package fr.dcproject.notification.publisher
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.rabbitmq.client.ConnectionFactory
|
||||
import fr.dcproject.event.EntityEvent
|
||||
import fr.dcproject.notification.EntityNotification
|
||||
import kotlinx.coroutines.Deferred
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
@@ -10,12 +9,11 @@ import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
class Publisher(
|
||||
private val mapper: ObjectMapper,
|
||||
private val factory: ConnectionFactory,
|
||||
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName),
|
||||
private val exchangeName: String,
|
||||
) {
|
||||
suspend fun <T : EntityEvent> publish(it: T): Deferred<Unit> = coroutineScope {
|
||||
suspend fun <T : EntityNotification> publish(it: T): Deferred<Unit> = coroutineScope {
|
||||
async {
|
||||
factory.newConnection().use { connection ->
|
||||
connection.createChannel().use { channel ->
|
||||
@@ -25,8 +23,4 @@ class Publisher(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun EntityEvent.serialize(): String {
|
||||
return mapper.writeValueAsString(this) ?: error("Unable tu serialize message")
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,7 @@
|
||||
package fr.dcproject.routes
|
||||
|
||||
import fr.dcproject.component.auth.citizen
|
||||
import fr.dcproject.event.Event
|
||||
import fr.postgresjson.serializer.deserialize
|
||||
import io.ktor.client.HttpClient
|
||||
import fr.dcproject.notification.Notification
|
||||
import io.ktor.http.cio.websocket.Frame
|
||||
import io.ktor.http.cio.websocket.readText
|
||||
import io.ktor.locations.KtorExperimentalLocationsAPI
|
||||
@@ -17,6 +15,7 @@ import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.consumeAsFlow
|
||||
import kotlinx.coroutines.flow.mapNotNull
|
||||
import kotlinx.coroutines.launch
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
@ExperimentalCoroutinesApi
|
||||
@KtorExperimentalLocationsAPI
|
||||
@@ -26,15 +25,19 @@ fun Route.notificationArticle(redis: RedisAsyncCommands<String, String>, client:
|
||||
|
||||
launch {
|
||||
incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect {
|
||||
val notificationMessage = it.readText().deserialize<Event>() ?: error("unable to deserialize message")
|
||||
|
||||
redis.zremrangebyscore(
|
||||
"notification:$citizenId",
|
||||
Range.from(
|
||||
Range.Boundary.including(notificationMessage.id),
|
||||
Range.Boundary.including(notificationMessage.id)
|
||||
try {
|
||||
val notificationMessage: Notification = Notification.deserialize(it.readText())
|
||||
redis.zremrangebyscore(
|
||||
"notification:$citizenId",
|
||||
Range.from(
|
||||
Range.Boundary.including(notificationMessage.id),
|
||||
Range.Boundary.including(notificationMessage.id)
|
||||
)
|
||||
)
|
||||
)
|
||||
} catch (e: Throwable) {
|
||||
LoggerFactory.getLogger(Route::class.qualifiedName)
|
||||
.error("Unable to deserialize notification")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user