Publish message into rabbitmq on create article

Create Redis and Rabbit in docker-compose
This commit is contained in:
2020-02-22 02:26:52 +01:00
parent 471013984c
commit af33ed9ec3
12 changed files with 206 additions and 4 deletions

View File

@@ -7,8 +7,12 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.joda.JodaModule
import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException
import com.rabbitmq.client.ConnectionFactory
import fr.dcproject.Env.PROD
import fr.dcproject.entity.*
import fr.dcproject.event.EntityEvent
import fr.dcproject.event.EventNotification
import fr.dcproject.event.publisher.Publisher
import fr.dcproject.routes.*
import fr.dcproject.security.voter.*
import fr.postgresjson.migration.Migrations
@@ -27,7 +31,9 @@ import io.ktor.jackson.jackson
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.locations.Locations
import io.ktor.response.respond
import io.ktor.response.respondText
import io.ktor.routing.Routing
import io.ktor.routing.get
import io.ktor.util.KtorExperimentalAPI
import org.eclipse.jetty.util.log.Slf4jLog
import org.koin.ktor.ext.Koin
@@ -148,6 +154,25 @@ fun Application.module(env: Env = PROD) {
)
}
install(EventNotification) {
/* Config Rabbit */
val exchangeName = config.exchangeNotificationName
get<ConnectionFactory>().newConnection().use { connection -> connection.createChannel().use { channel ->
channel.queueDeclare("sse", true, false, false, null)
channel.queueDeclare("email", true, false, false, null)
channel.exchangeDeclare(exchangeName, "direct")
channel.queueBind("sse", exchangeName, "")
channel.queueBind("email", exchangeName, "")
}}
/* Declare publisher on event */
val publisher = Publisher(get(), get())
subscribe(EntityEvent.Type.UPDATE_ARTICLE.event) {
println("Article is updated ${it.target.id}")
publisher.publish(it)
}
}
install(Authentication) {
/**
* Setup the JWT authentication to be used in [Routing].
@@ -200,6 +225,20 @@ fun Application.module(env: Env = PROD) {
opinionArticle(get())
opinionChoice(get())
definition()
get("/sse") {
// environment.monitor.raise(EntityEvent.Type.UPDATE_ARTICLE.event, ArticleUpdate(ArticleRef()))
// val redis = this@authenticate.getKoin().get<RedisReactiveCommands<String, String>>()
// redis.set("key", "test").awaitSingle()
// redis.lpush("list", "test2").asFlow().map {
// println(it)
// }.collect()
// redis.get("key").asFlow().collect { println(it) }
// redis.rpop("list").asFlow().collect {
// println(it)
// call.respondText { it }
// }
call.respondText("OK")
}
}
}

View File

@@ -26,7 +26,9 @@ class Config {
var username: String = config.getString("db.username")
var password: String = config.getString("db.password")
val port: Int = config.getInt("db.port")
val redis: String = config.getString("redis.connection")
val rabbitmq: String = config.getString("rabbitmq.connection")
val exchangeNotificationName = "notification"
val sendGridKey: String = config.getString("mail.sendGrid.key")
}

View File

@@ -1,11 +1,21 @@
package fr.dcproject
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.PropertyNamingStrategy
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.datatype.joda.JodaModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.rabbitmq.client.ConnectionFactory
import fr.dcproject.messages.Mailer
import fr.dcproject.messages.SsoManager
import fr.postgresjson.connexion.Connection
import fr.postgresjson.connexion.Requester
import fr.postgresjson.migration.Migrations
import io.ktor.util.KtorExperimentalAPI
import io.lettuce.core.RedisClient
import io.lettuce.core.api.reactive.RedisReactiveCommands
import org.koin.dsl.module
import fr.dcproject.repository.Article as ArticleRepository
import fr.dcproject.repository.Citizen as CitizenRepository
@@ -39,6 +49,25 @@ val Module = module {
)
}
single<RedisReactiveCommands<String, String>> {
RedisClient.create(config.redis).connect()?.reactive() ?: error("Unable to connect to redis")
}
single<ConnectionFactory> {
ConnectionFactory().apply { setUri(config.rabbitmq) }
}
single<ObjectMapper> {
jacksonObjectMapper().apply {
registerModule(SimpleModule())
propertyNamingStrategy = PropertyNamingStrategy.SNAKE_CASE
registerModule(JodaModule())
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true)
}
}
single {
Requester.RequesterFactory(
connection = get(),

View File

@@ -0,0 +1,54 @@
package fr.dcproject.event
import fr.dcproject.entity.Article
import fr.postgresjson.entity.immutable.UuidEntity
import io.ktor.application.*
import io.ktor.util.AttributeKey
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.DisposableHandle
import org.joda.time.DateTime
abstract class Notification(
val type: String,
val createdAt: DateTime = DateTime.now()
)
abstract class EntityEvent(
val target: UuidEntity,
type: String,
val action: String
) : Notification(type) {
enum class Type(val event: EventDefinition<ArticleUpdate>) {
UPDATE_ARTICLE(EventDefinition<ArticleUpdate>())
}
}
class ArticleUpdate(
target: Article
) : EntityEvent(target, "article", "update")
/**
* Installation Class
*/
class EventNotification {
class Configuration(private val monitor: ApplicationEvents) {
private val subscribers = mutableListOf<DisposableHandle>()
fun <T: Notification> subscribe(definition: EventDefinition<T>, handler: EventHandler<T>): DisposableHandle {
return monitor.subscribe(definition, handler).also {
subscribers.add(it)
}
}
}
companion object Feature : ApplicationFeature<Application, Configuration, EventNotification> {
override val key = AttributeKey<EventNotification>("EventNotification")
@KtorExperimentalAPI
override fun install(
pipeline: Application,
configure: Configuration.() -> Unit
): EventNotification {
Configuration(pipeline.environment.monitor).apply(configure)
return EventNotification()
}
}
}

View File

@@ -0,0 +1,30 @@
package fr.dcproject.event.publisher
import com.fasterxml.jackson.databind.ObjectMapper
import com.rabbitmq.client.ConnectionFactory
import fr.dcproject.config
import fr.dcproject.event.EntityEvent
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class Publisher(
private val mapper: ObjectMapper,
private val factory: ConnectionFactory,
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName)
) {
fun <T: EntityEvent>publish(it: T): Job {
return GlobalScope.launch {
factory.newConnection().use { connection -> connection.createChannel().use { channel ->
channel.basicPublish(config.exchangeNotificationName, "", null, it.serialize().toByteArray())
logger.debug("Publish message ${it.target.id}")
} }
}
}
private fun EntityEvent.serialize(): String {
return mapper.writeValueAsString(this) ?: error("Unable tu serialize message")
}
}

View File

@@ -1,11 +1,14 @@
package fr.dcproject.routes
import fr.dcproject.citizen
import fr.dcproject.event.ArticleUpdate
import fr.dcproject.event.EntityEvent
import fr.dcproject.repository.Article.Filter
import fr.dcproject.security.voter.ArticleVoter.Action.CREATE
import fr.dcproject.security.voter.ArticleVoter.Action.VIEW
import fr.dcproject.security.voter.assertCan
import fr.postgresjson.repository.RepositoryI
import io.ktor.application.application
import io.ktor.application.call
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.locations.Location
@@ -83,6 +86,7 @@ fun Route.article(repo: ArticleRepository) {
assertCan(CREATE, article)
repo.upsert(article)
application.environment.monitor.raise(EntityEvent.Type.UPDATE_ARTICLE.event, ArticleUpdate(article))
call.respond(article)
}