Rename NotificationConsumer
This commit is contained in:
@@ -33,7 +33,7 @@ import fr.dcproject.component.vote.routes.installVoteRoutes
|
||||
import fr.dcproject.component.vote.voteKoinModule
|
||||
import fr.dcproject.component.workgroup.routes.installWorkgroupRoutes
|
||||
import fr.dcproject.component.workgroup.workgroupKoinModule
|
||||
import fr.dcproject.notification.EventNotification
|
||||
import fr.dcproject.notification.NotificationConsumer
|
||||
import fr.dcproject.routes.definition
|
||||
import fr.dcproject.routes.notificationArticle
|
||||
import fr.dcproject.security.AccessDeniedException
|
||||
@@ -122,7 +122,7 @@ fun Application.module(env: Env = PROD) {
|
||||
masking = false
|
||||
}
|
||||
|
||||
EventNotification(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName).config()
|
||||
NotificationConsumer(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName).config()
|
||||
|
||||
install(Authentication, jwtInstallation(get()))
|
||||
|
||||
|
||||
51
src/main/kotlin/notification/Notification.kt
Normal file
51
src/main/kotlin/notification/Notification.kt
Normal file
@@ -0,0 +1,51 @@
|
||||
package fr.dcproject.notification
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature
|
||||
import com.fasterxml.jackson.databind.PropertyNamingStrategies
|
||||
import com.fasterxml.jackson.databind.SerializationFeature
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule
|
||||
import com.fasterxml.jackson.datatype.joda.JodaModule
|
||||
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import fr.dcproject.component.article.ArticleForView
|
||||
import fr.postgresjson.entity.UuidEntity
|
||||
import org.joda.time.DateTime
|
||||
import kotlin.random.Random
|
||||
|
||||
open class Notification(
|
||||
val type: String,
|
||||
val createdAt: DateTime = DateTime.now()
|
||||
) {
|
||||
val id: Double = randId(createdAt.millis)
|
||||
|
||||
private fun randId(time: Long): Double {
|
||||
return (time.toString() + Random.nextInt(1000, 9999).toString()).toDouble()
|
||||
}
|
||||
|
||||
override fun toString(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification")
|
||||
|
||||
fun toByteArray() = toString().toByteArray()
|
||||
|
||||
companion object {
|
||||
val mapper = jacksonObjectMapper().apply {
|
||||
registerModule(SimpleModule())
|
||||
propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE
|
||||
|
||||
registerModule(JodaModule())
|
||||
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
|
||||
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
}
|
||||
|
||||
inline fun <reified T : Notification> fromString(raw: String): T = mapper.readValue(raw)
|
||||
}
|
||||
}
|
||||
|
||||
open class EntityNotification(
|
||||
val target: UuidEntity,
|
||||
type: String,
|
||||
val action: String
|
||||
) : Notification(type)
|
||||
|
||||
class ArticleUpdateNotification(
|
||||
target: ArticleForView
|
||||
) : EntityNotification(target, "article", "update")
|
||||
@@ -1,76 +1,27 @@
|
||||
package fr.dcproject.notification
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature
|
||||
import com.fasterxml.jackson.databind.PropertyNamingStrategies
|
||||
import com.fasterxml.jackson.databind.SerializationFeature
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule
|
||||
import com.fasterxml.jackson.datatype.joda.JodaModule
|
||||
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import com.rabbitmq.client.AMQP
|
||||
import com.rabbitmq.client.AMQP.BasicProperties
|
||||
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
|
||||
import com.rabbitmq.client.ConnectionFactory
|
||||
import com.rabbitmq.client.Consumer
|
||||
import com.rabbitmq.client.DefaultConsumer
|
||||
import com.rabbitmq.client.Envelope
|
||||
import fr.dcproject.common.entity.TargetRef
|
||||
import fr.dcproject.component.article.ArticleForView
|
||||
import fr.dcproject.component.citizen.CitizenRef
|
||||
import fr.dcproject.component.follow.FollowArticleRepository
|
||||
import fr.dcproject.component.follow.FollowConstitutionRepository
|
||||
import fr.dcproject.component.follow.FollowSimple
|
||||
import fr.dcproject.notification.publisher.Publisher
|
||||
import fr.dcproject.messages.NotificationEmailSender
|
||||
import fr.postgresjson.entity.UuidEntity
|
||||
import fr.dcproject.notification.publisher.Publisher
|
||||
import io.ktor.utils.io.errors.IOException
|
||||
import io.lettuce.core.RedisClient
|
||||
import io.lettuce.core.api.async.RedisAsyncCommands
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.joda.time.DateTime
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import kotlin.random.Random
|
||||
|
||||
open class Notification(
|
||||
val type: String,
|
||||
val createdAt: DateTime = DateTime.now()
|
||||
) {
|
||||
val id: Double = randId(createdAt.millis)
|
||||
|
||||
private fun randId(time: Long): Double {
|
||||
return (time.toString() + Random.nextInt(1000, 9999).toString()).toDouble()
|
||||
}
|
||||
|
||||
override fun toString(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification")
|
||||
|
||||
fun toByteArray() = toString().toByteArray()
|
||||
|
||||
companion object {
|
||||
val mapper = jacksonObjectMapper().apply {
|
||||
registerModule(SimpleModule())
|
||||
propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE
|
||||
|
||||
registerModule(JodaModule())
|
||||
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
|
||||
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
}
|
||||
|
||||
inline fun <reified T : Notification> fromString(raw: String): T = mapper.readValue(raw)
|
||||
}
|
||||
}
|
||||
|
||||
open class EntityNotification(
|
||||
val target: UuidEntity,
|
||||
type: String,
|
||||
val action: String
|
||||
) : Notification(type)
|
||||
|
||||
class ArticleUpdateNotification(
|
||||
target: ArticleForView
|
||||
) : EntityNotification(target, "article", "update")
|
||||
|
||||
class EventNotification(
|
||||
class NotificationConsumer(
|
||||
private val rabbitFactory: ConnectionFactory,
|
||||
private val redisClient: RedisClient,
|
||||
private val followConstitutionRepo: FollowConstitutionRepository,
|
||||
@@ -101,7 +52,7 @@ class EventNotification(
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
envelope: Envelope,
|
||||
properties: AMQP.BasicProperties,
|
||||
properties: BasicProperties,
|
||||
body: ByteArray
|
||||
) = runBlocking {
|
||||
followersFromMessage(body) {
|
||||
@@ -121,7 +72,7 @@ class EventNotification(
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
envelope: Envelope,
|
||||
properties: AMQP.BasicProperties,
|
||||
properties: BasicProperties,
|
||||
body: ByteArray
|
||||
) {
|
||||
runBlocking {
|
||||
@@ -155,4 +106,4 @@ class EventNotification(
|
||||
val rawMessage: String,
|
||||
val follow: FollowSimple<out TargetRef, CitizenRef>
|
||||
)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user