Move Notifications config from Application.kt to ConfigNotification.kt

This commit is contained in:
2020-03-03 12:23:42 +01:00
parent 0421f3cb55
commit 4a4e9651fd
7 changed files with 126 additions and 117 deletions

View File

@@ -21,7 +21,7 @@
</list> </list>
</option> </option>
<method v="2"> <method v="2">
<option name="RunConfigurationTask" enabled="true" run_configuration_name="Reset database" run_configuration_type="RunSql" /> <option name="RunConfigurationTask" enabled="true" run_configuration_name="Reset Test database" run_configuration_type="RunSql" />
</method> </method>
</configuration> </configuration>
</component> </component>

View File

@@ -23,7 +23,7 @@
</list> </list>
</option> </option>
<method v="2"> <method v="2">
<option name="RunConfigurationTask" enabled="true" run_configuration_name="Reset database" run_configuration_type="RunSql" /> <option name="RunConfigurationTask" enabled="true" run_configuration_name="Reset Test database" run_configuration_type="RunSql" />
</method> </method>
</configuration> </configuration>
</component> </component>

View File

@@ -7,20 +7,13 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy
import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.joda.JodaModule import com.fasterxml.jackson.datatype.joda.JodaModule
import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException
import com.rabbitmq.client.*
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
import fr.dcproject.Env.PROD import fr.dcproject.Env.PROD
import fr.dcproject.entity.* import fr.dcproject.entity.*
import fr.dcproject.event.EntityEvent import fr.dcproject.event.EventSubscriber
import fr.dcproject.event.EventNotification import fr.dcproject.event.configEvent
import fr.dcproject.event.Notification
import fr.dcproject.event.publisher.Publisher
import fr.dcproject.repository.Follow
import fr.dcproject.repository.FollowArticle
import fr.dcproject.routes.* import fr.dcproject.routes.*
import fr.dcproject.security.voter.* import fr.dcproject.security.voter.*
import fr.postgresjson.migration.Migrations import fr.postgresjson.migration.Migrations
import fr.postgresjson.serializer.deserialize
import io.ktor.application.Application import io.ktor.application.Application
import io.ktor.application.ApplicationCall import io.ktor.application.ApplicationCall
import io.ktor.application.call import io.ktor.application.call
@@ -42,16 +35,11 @@ import io.ktor.response.respond
import io.ktor.routing.Routing import io.ktor.routing.Routing
import io.ktor.util.KtorExperimentalAPI import io.ktor.util.KtorExperimentalAPI
import io.ktor.websocket.WebSockets import io.ktor.websocket.WebSockets
import io.lettuce.core.api.async.RedisAsyncCommands
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.eclipse.jetty.util.log.Slf4jLog import org.eclipse.jetty.util.log.Slf4jLog
import org.koin.core.qualifier.named import org.koin.core.qualifier.named
import org.koin.ktor.ext.Koin import org.koin.ktor.ext.Koin
import org.koin.ktor.ext.get import org.koin.ktor.ext.get
import org.slf4j.event.Level import org.slf4j.event.Level
import java.io.IOException
import java.time.Duration import java.time.Duration
import java.util.* import java.util.*
import java.util.concurrent.CompletionException import java.util.concurrent.CompletionException
@@ -188,76 +176,8 @@ fun Application.module(env: Env = PROD) {
masking = false masking = false
} }
install(EventNotification) { install(EventSubscriber) {
/* Config Rabbit */ configEvent(get(), get(), get(), get())
val exchangeName = config.exchangeNotificationName
get<ConnectionFactory>().newConnection().use { connection ->
connection.createChannel().use { channel ->
channel.queueDeclare("push", true, false, false, null)
channel.queueDeclare("email", true, false, false, null)
channel.exchangeDeclare(exchangeName, DIRECT, true)
channel.queueBind("push", exchangeName, "")
channel.queueBind("email", exchangeName, "")
}
}
/* Declare publisher on event */
val publisher = Publisher(get(), get())
subscribe(EntityEvent.Type.UPDATE_ARTICLE.event) {
publisher.publish(it)
}
/* Launch Consumer */
launch {
val rabbitChannel = get<ConnectionFactory>().newConnection().createChannel()
val redis = get<RedisAsyncCommands<String, String>>()
val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) = runBlocking {
val message = body.toString(Charsets.UTF_8)
val msg = message.deserialize<EntityEvent>() ?: error("Unable to unserialise event message from rabbit")
let {
when (msg.type) {
Notification.Type.ARTICLE -> get<FollowArticle>()
} as Follow<*,*>
}
.findFollowsByTarget(msg.target)
.collect { follow ->
redis.zadd(
"notification:${follow.createdBy.id}",
msg.id,
message
)
}
rabbitChannel.basicAck(envelope.deliveryTag, false)
}
}
val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) {
val message = body.toString(Charsets.UTF_8)
println("The message is receive for send email: $message")
// TODO implement email sender
rabbitChannel.basicAck(envelope.deliveryTag, false)
}
}
rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket
rabbitChannel.basicConsume("email", false, consumerEmail)
}
} }
install(Authentication) { install(Authentication) {

View File

@@ -0,0 +1,105 @@
package fr.dcproject.event
import com.fasterxml.jackson.databind.ObjectMapper
import com.rabbitmq.client.*
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
import fr.dcproject.config
import fr.dcproject.entity.Article
import fr.dcproject.event.publisher.Publisher
import fr.dcproject.repository.Follow
import fr.postgresjson.serializer.deserialize
import io.ktor.application.EventDefinition
import io.lettuce.core.api.async.RedisAsyncCommands
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.io.errors.IOException
import fr.dcproject.repository.FollowArticle as FollowArticleRepository
class ArticleUpdate(
target: Article
) : EntityEvent(target, "article", "update") {
companion object {
val event = EventDefinition<ArticleUpdate>()
}
}
fun EventSubscriber.Configuration.configEvent(
rabbitFactory: ConnectionFactory,
redis: RedisAsyncCommands<String, String>,
followRepo: FollowArticleRepository,
serialiser: ObjectMapper
) {
/* Config Rabbit */
val exchangeName = config.exchangeNotificationName
rabbitFactory.newConnection().use { connection ->
connection.createChannel().use { channel ->
channel.queueDeclare("push", true, false, false, null)
channel.queueDeclare("email", true, false, false, null)
channel.exchangeDeclare(exchangeName, DIRECT, true)
channel.queueBind("push", exchangeName, "")
channel.queueBind("email", exchangeName, "")
}
}
/* Declare publisher on event */
val publisher = Publisher(serialiser, rabbitFactory)
subscribe(ArticleUpdate.event) {
publisher.publish(it)
}
/* Launch Consumer */
GlobalScope.launch {
val rabbitChannel = rabbitFactory.newConnection().createChannel()
val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) = runBlocking {
val message = body.toString(Charsets.UTF_8)
val msg =
message.deserialize<EntityEvent>() ?: error("Unable to unserialise event message from rabbit")
let {
when (msg.type) {
"article" -> followRepo
else -> error("event '${msg.type}' not implemented")
} as Follow<*, *>
}
.findFollowsByTarget(msg.target)
.collect { follow ->
redis.zadd(
"notification:${follow.createdBy.id}",
msg.id,
message
)
}
rabbitChannel.basicAck(envelope.deliveryTag, false)
}
}
val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) {
val message = body.toString(Charsets.UTF_8)
println("The message is receive for send email: $message")
// TODO implement email sender
rabbitChannel.basicAck(envelope.deliveryTag, false)
}
}
rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket
rabbitChannel.basicConsume("email", false, consumerEmail)
}
}

View File

@@ -1,7 +1,5 @@
package fr.dcproject.event package fr.dcproject.event
import com.fasterxml.jackson.annotation.JsonValue
import fr.dcproject.entity.Article
import fr.postgresjson.entity.Serializable import fr.postgresjson.entity.Serializable
import fr.postgresjson.entity.immutable.UuidEntity import fr.postgresjson.entity.immutable.UuidEntity
import io.ktor.application.* import io.ktor.application.*
@@ -11,59 +9,46 @@ import kotlinx.coroutines.DisposableHandle
import org.joda.time.DateTime import org.joda.time.DateTime
import kotlin.random.Random.Default.nextInt import kotlin.random.Random.Default.nextInt
sealed class NotificationS abstract class Event(
val type: String,
open class Notification(
val type: Type,
val createdAt: DateTime = DateTime.now() val createdAt: DateTime = DateTime.now()
) : NotificationS(), Serializable { ) : Serializable {
val id: Double = randId(createdAt.millis) val id: Double = randId(createdAt.millis)
enum class Type(@JsonValue val type: String) {
ARTICLE("article");
}
private fun randId(time: Long): Double { private fun randId(time: Long): Double {
return (time.toString() + nextInt(1000, 9999).toString()).toDouble() return (time.toString() + nextInt(1000, 9999).toString()).toDouble()
} }
} }
open class EntityEvent( abstract class EntityEvent(
val target: UuidEntity, val target: UuidEntity,
type: Notification.Type, type: String,
val action: String val action: String
) : Notification(type) { ) : Event(type)
enum class Type(val event: EventDefinition<ArticleUpdate>) {
UPDATE_ARTICLE(EventDefinition<ArticleUpdate>())
}
}
class ArticleUpdate(
target: Article
) : EntityEvent(target, Notification.Type.ARTICLE, "update")
/** /**
* Installation Class * Installation Class
*/ */
class EventNotification { class EventSubscriber {
class Configuration(private val monitor: ApplicationEvents) { class Configuration(private val monitor: ApplicationEvents) {
private val subscribers = mutableListOf<DisposableHandle>() private val subscribers = mutableListOf<DisposableHandle>()
fun <T: Notification> subscribe(definition: EventDefinition<T>, handler: EventHandler<T>): DisposableHandle { fun <T: Event> subscribe(definition: EventDefinition<T>, handler: EventHandler<T>): DisposableHandle {
return monitor.subscribe(definition, handler).also { return monitor.subscribe(definition, handler).also {
subscribers.add(it) subscribers.add(it)
} }
} }
} }
companion object Feature : ApplicationFeature<Application, Configuration, EventNotification> { companion object Feature : ApplicationFeature<Application, Configuration, EventSubscriber> {
override val key = AttributeKey<EventNotification>("EventNotification") override val key = AttributeKey<EventSubscriber>("EventSubscriber")
@KtorExperimentalAPI @KtorExperimentalAPI
override fun install( override fun install(
pipeline: Application, pipeline: Application,
configure: Configuration.() -> Unit configure: Configuration.() -> Unit
): EventNotification { ): EventSubscriber {
Configuration(pipeline.environment.monitor).apply(configure) Configuration(pipeline.environment.monitor).apply(configure)
return EventNotification() return EventSubscriber()
} }
} }
} }

View File

@@ -2,7 +2,6 @@ package fr.dcproject.routes
import fr.dcproject.citizen import fr.dcproject.citizen
import fr.dcproject.event.ArticleUpdate import fr.dcproject.event.ArticleUpdate
import fr.dcproject.event.EntityEvent
import fr.dcproject.repository.Article.Filter import fr.dcproject.repository.Article.Filter
import fr.dcproject.security.voter.ArticleVoter.Action.CREATE import fr.dcproject.security.voter.ArticleVoter.Action.CREATE
import fr.dcproject.security.voter.ArticleVoter.Action.VIEW import fr.dcproject.security.voter.ArticleVoter.Action.VIEW
@@ -86,7 +85,7 @@ fun Route.article(repo: ArticleRepository) {
assertCan(CREATE, article) assertCan(CREATE, article)
repo.upsert(article) repo.upsert(article)
application.environment.monitor.raise(EntityEvent.Type.UPDATE_ARTICLE.event, ArticleUpdate(article)) application.environment.monitor.raise(ArticleUpdate.event, ArticleUpdate(article))
call.respond(article) call.respond(article)
} }

View File

@@ -1,7 +1,7 @@
package fr.dcproject.routes package fr.dcproject.routes
import fr.dcproject.citizen import fr.dcproject.citizen
import fr.dcproject.event.Notification import fr.dcproject.event.Event
import fr.postgresjson.serializer.deserialize import fr.postgresjson.serializer.deserialize
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.http.cio.websocket.Frame import io.ktor.http.cio.websocket.Frame
@@ -26,7 +26,7 @@ fun Route.notificationArticle(redis: RedisAsyncCommands<String, String>, client:
launch { launch {
incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect { incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect {
val notificationMessage = it.readText().deserialize<Notification>() ?: error("unable to deserialize message") val notificationMessage = it.readText().deserialize<Event>() ?: error("unable to deserialize message")
redis.zremrangebyscore( redis.zremrangebyscore(
"notification:$citizenId", "notification:$citizenId",