cleanup and refactoring of notification

close rabbit and redis connexion on application close
Refactoring of Configuration class
fix notification id increment
Add builder for NotificationPush
Add close to notificationPush to remove listener
Clean tags of tests
purge queue before functional tests
This commit is contained in:
2021-02-04 02:36:02 +01:00
parent a05b5edc86
commit 89c15eb1cf
26 changed files with 289 additions and 149 deletions

View File

@@ -39,6 +39,7 @@ import fr.dcproject.routes.notificationArticle
import fr.dcproject.security.AccessDeniedException
import fr.postgresjson.migration.Migrations
import io.ktor.application.Application
import io.ktor.application.ApplicationStopped
import io.ktor.application.call
import io.ktor.application.install
import io.ktor.auth.Authentication
@@ -122,7 +123,12 @@ fun Application.module(env: Env = PROD) {
masking = false
}
NotificationConsumer(get(), get(), get(), get(), get(), Configuration.exchangeNotificationName).config()
get<NotificationConsumer>().run {
start()
environment.monitor.subscribe(ApplicationStopped) {
close()
}
}
install(Authentication, jwtInstallation(get()))

View File

@@ -1,23 +1,39 @@
package fr.dcproject.application
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import java.net.URI
object Configuration {
private var config = ConfigFactory.load()
class Configuration(val config: Config) {
constructor(resourceBasename: String? = null) : this(if (resourceBasename == null) ConfigFactory.load() else ConfigFactory.load(resourceBasename))
object Sql {
val migrationFiles: URI = this::class.java.getResource("/sql/migrations")?.toURI() ?: error("No migrations found")
val functionFiles: URI = this::class.java.getResource("/sql/functions")?.toURI() ?: error("No sql function found")
val fixtureFiles: URI = this::class.java.getResource("/sql/fixtures")?.toURI() ?: error("No sql fixture found")
interface Sql {
val migrationFiles: URI
val functionFiles: URI
val fixtureFiles: URI
}
object Database {
val host: String = config.getString("db.host")
val port: Int = config.getInt("db.port")
var database: String = config.getString("db.database")
var username: String = config.getString("db.username")
var password: String = config.getString("db.password")
val sql
get() = object : Sql {
override val migrationFiles: URI = this::class.java.getResource("/sql/migrations")?.toURI() ?: error("No migrations found")
override val functionFiles: URI = this::class.java.getResource("/sql/functions")?.toURI() ?: error("No sql function found")
override val fixtureFiles: URI = this::class.java.getResource("/sql/fixtures")?.toURI() ?: error("No sql fixture found")
}
interface Database {
val host: String
val port: Int
var database: String
var username: String
var password: String
}
val database
get() = object : Database {
override val host: String = config.getString("db.host")
override val port: Int = config.getInt("db.port")
override var database: String = config.getString("db.database")
override var username: String = config.getString("db.username")
override var password: String = config.getString("db.password")
}
val envName: String = config.getString("app.envName")
val domain: String = config.getString("app.domain")

View File

@@ -8,9 +8,10 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.datatype.joda.JodaModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.rabbitmq.client.ConnectionFactory
import fr.dcproject.notification.publisher.Publisher
import fr.dcproject.messages.Mailer
import fr.dcproject.messages.NotificationEmailSender
import fr.dcproject.notification.NotificationConsumer
import fr.dcproject.notification.publisher.Publisher
import fr.postgresjson.connexion.Connection
import fr.postgresjson.connexion.Requester
import fr.postgresjson.migration.Migrations
@@ -18,36 +19,52 @@ import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.util.KtorExperimentalAPI
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import notification.NotificationsPush
import org.koin.core.qualifier.named
import org.koin.dsl.module
import org.koin.ktor.ext.get
@KtorExperimentalAPI
val KoinModule = module {
single { Configuration() }
// SQL connection
single {
val config: Configuration = get()
Connection(
host = Configuration.Database.host,
port = Configuration.Database.port,
database = Configuration.Database.database,
username = Configuration.Database.username,
password = Configuration.Database.password
host = config.database.host,
port = config.database.port,
database = config.database.database,
username = config.database.username,
password = config.database.password
)
}
// Launch Database migration
single { Migrations(get(), Configuration.Sql.migrationFiles, Configuration.Sql.functionFiles) }
single {
val config: Configuration = get()
Migrations(get(), config.sql.migrationFiles, config.sql.functionFiles)
}
// Redis client
single<RedisClient> {
RedisClient.create(Configuration.redis).apply {
val config: Configuration = get()
RedisClient.create(config.redis).apply {
connect().sync().configSet("notify-keyspace-events", "KEA")
}
}
single { NotificationsPush.Builder(get()) }
single {
val config: Configuration = get()
NotificationConsumer(get(), get(), get(), get(), get(), config.exchangeNotificationName)
}
// RabbitMQ
single<ConnectionFactory> {
ConnectionFactory().apply { setUri(Configuration.rabbitmq) }
val config: Configuration = get()
ConnectionFactory().apply { setUri(config.rabbitmq) }
}
// JsonSerializer
@@ -71,16 +88,26 @@ val KoinModule = module {
// SQL Requester (postgresJson)
single {
val config: Configuration = get()
Requester.RequesterFactory(
connection = get(),
functionsDirectory = Configuration.Sql.functionFiles
functionsDirectory = config.sql.functionFiles
).createRequester()
}
// Mailer
single { Mailer(Configuration.sendGridKey) }
single {
val config: Configuration = get()
Mailer(config.sendGridKey)
}
single { Publisher(factory = get(), exchangeName = Configuration.exchangeNotificationName) }
single {
val config: Configuration = get()
Publisher(factory = get(), exchangeName = config.exchangeNotificationName)
}
single { NotificationEmailSender(get<Mailer>(), Configuration.domain, get(), get()) }
single {
val config: Configuration = get()
NotificationEmailSender(get<Mailer>(), config.domain, get(), get())
}
}

View File

@@ -7,5 +7,8 @@ import org.koin.dsl.module
val authKoinModule = module {
single { UserRepository(get()) }
// Used to send a connexion link by email
single { PasswordlessAuth(get<Mailer>(), Configuration.domain, get()) }
single {
val config: Configuration = get()
PasswordlessAuth(get<Mailer>(), config.domain, get())
}
}

View File

@@ -8,12 +8,15 @@ import org.elasticsearch.client.RestClient
import org.koin.dsl.module
val viewKoinModule = module {
// Elasticsearch Client
val esClient = RestClient.builder(
HttpHost.create(Configuration.elasticsearch)
).build().apply {
createEsIndexForViews()
}
single { ArticleViewManager<ArticleForView>(esClient) }
single {
val config: Configuration = get()
// Elasticsearch Client
val esClient = RestClient.builder(
HttpHost.create(config.elasticsearch)
).build().apply {
createEsIndexForViews()
}
ArticleViewManager<ArticleForView>(esClient)
}
}

View File

@@ -10,16 +10,16 @@ import com.fasterxml.jackson.module.kotlin.readValue
import fr.dcproject.component.article.ArticleForView
import fr.postgresjson.entity.UuidEntity
import org.joda.time.DateTime
import kotlin.random.Random
import java.util.concurrent.atomic.AtomicInteger
open class Notification(
val type: String,
val createdAt: DateTime = DateTime.now()
) {
val id: Double = randId(createdAt.millis)
val id: Double = nextId()
private fun randId(time: Long): Double {
return (time.toString() + Random.nextInt(1000, 9999).toString()).toDouble()
private fun nextId(): Double {
return (createdAt.millis.toString() + nextInt().toString()).toDouble()
}
override fun toString(): String = mapper.writeValueAsString(this) ?: error("Unable to serialize notification")
@@ -27,6 +27,12 @@ open class Notification(
fun toByteArray() = toString().toByteArray()
companion object {
private val counter: AtomicInteger = AtomicInteger(1000)
fun nextInt(): Int {
counter.compareAndSet(9999, 1000)
return counter.incrementAndGet()
}
val mapper = jacksonObjectMapper().apply {
registerModule(SimpleModule())
propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE

View File

@@ -29,10 +29,18 @@ class NotificationConsumer(
private val notificationEmailSender: NotificationEmailSender,
private val exchangeName: String,
) {
val redis: RedisAsyncCommands<String, String> = redisClient.connect()?.async() ?: error("Unable to connect to redis")
private val redisConnection = redisClient.connect() ?: error("Unable to connect to redis")
private val redis: RedisAsyncCommands<String, String> = redisConnection.async() ?: error("Unable to connect to redis")
private val rabbitConnection = rabbitFactory.newConnection()
private val rabbitChannel = rabbitConnection.createChannel()
private val logger: Logger = LoggerFactory.getLogger(Publisher::class.qualifiedName)
fun config() {
fun close() {
rabbitChannel.close()
rabbitConnection.close()
}
fun start() {
/* Config Rabbit */
rabbitFactory.newConnection().use { connection ->
connection.createChannel().use { channel ->
@@ -45,8 +53,6 @@ class NotificationConsumer(
}
/* Define Consumer */
val rabbitChannel = rabbitFactory.newConnection().createChannel()
val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) {
@Throws(IOException::class)
override fun handleDelivery(
@@ -106,4 +112,4 @@ class NotificationConsumer(
val rawMessage: String,
val follow: FollowSimple<out TargetRef, CitizenRef>
)
}
}

View File

@@ -1,34 +1,80 @@
package notification
import com.fasterxml.jackson.core.JsonProcessingException
import fr.dcproject.component.auth.citizen
import fr.dcproject.component.citizen.CitizenI
import fr.dcproject.notification.Notification
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.Frame.Text
import io.ktor.http.cio.websocket.readText
import io.ktor.routing.Route
import io.ktor.websocket.DefaultWebSocketServerSession
import io.lettuce.core.Limit
import io.lettuce.core.Range
import io.lettuce.core.Range.Boundary
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.pubsub.RedisPubSubAdapter
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
class NotificationsPush (
val redisClient: RedisClient,
class NotificationsPush private constructor(
private val redis: RedisAsyncCommands<String, String>,
private val redisConnectionPubSub: StatefulRedisPubSubConnection<String, String>,
citizen: CitizenI,
incoming: Flow<Notification>,
onRecieve: suspend (Notification) -> Unit,
)
{
val redis: RedisAsyncCommands<String, String> = redisClient.connect()?.async() ?: error("Unable to connect to redis")
val key = "notification:${citizen.id}"
) {
class Builder(val redisClient: RedisClient) {
private val redisConnection = redisClient.connect() ?: error("Unable to connect to redis")
private val redisConnectionPubSub = redisClient.connectPubSub() ?: error("Unable to connect to redis")
private val redis: RedisAsyncCommands<String, String> = redisConnection.async() ?: error("Unable to connect to redis")
fun build(
citizen: CitizenI,
incoming: Flow<Notification>,
onRecieve: suspend (Notification) -> Unit,
): NotificationsPush = NotificationsPush(redis, redisConnectionPubSub, citizen, incoming, onRecieve)
@ExperimentalCoroutinesApi
fun build(ws: DefaultWebSocketServerSession): NotificationsPush {
/* Convert channel of string from websocket, to a flow of Notification object */
val incomingFlow: Flow<Notification> = ws.incoming.consumeAsFlow()
.mapNotNull<Frame, Text> { it as? Frame.Text }
.map { it.readText() }
.map { Notification.fromString(it) }
return build(ws.call.citizen, incomingFlow) {
ws.outgoing.send(Text(it.toString()))
}.apply {
ws.outgoing.invokeOnClose { close() }
}
}
}
private val key = "notification:${citizen.id}"
private var score: Double = 0.0
private val listener = object : RedisPubSubAdapter<String, String>() {
/* On new key publish */
override fun message(pattern: String?, channel: String?, message: String?) {
runBlocking {
getNotifications().collect {
onRecieve(it)
}
}
}
}
init {
/* Mark as read all incoming notifications */
@@ -44,21 +90,16 @@ class NotificationsPush (
}
/* Lisen redis event, and sent the new notification into websocket */
redisClient.connectPubSub()?.run {
addListener(object : RedisPubSubAdapter<String, String>() {
/* On new key publish */
override fun message(pattern: String?, channel: String?, message: String?) {
runBlocking {
getNotifications().collect {
onRecieve(it)
}
}
}
})
redisConnectionPubSub.run {
addListener(listener)
/* Register to the events */
async()?.psubscribe("__key*__:$key") ?: error("Unable to connect to redis")
} ?: error("PubSub Fail")
}
}
fun close() {
redisConnectionPubSub.removeListener(listener)
}
/* Return flow with all new notifications */
@@ -92,4 +133,4 @@ class NotificationsPush (
.error("Unable to deserialize notification")
}
}
}
}

View File

@@ -1,19 +1,9 @@
package fr.dcproject.routes
import fr.dcproject.component.auth.citizen
import fr.dcproject.notification.Notification
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.Frame.Text
import io.ktor.http.cio.websocket.readText
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.routing.Route
import io.ktor.websocket.webSocket
import io.lettuce.core.RedisClient
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import notification.NotificationsPush
/**
@@ -23,18 +13,8 @@ import notification.NotificationsPush
*/
@ExperimentalCoroutinesApi
@KtorExperimentalLocationsAPI
fun Route.notificationArticle(redisClient: RedisClient) {
fun Route.notificationArticle(pushBuilder: NotificationsPush.Builder) {
webSocket("/notifications") {
/* Convert channel of string from websocket, to a flow of Notification object */
val incomingFlow: Flow<Notification> = incoming.consumeAsFlow()
.mapNotNull<Frame, Text> { it as? Frame.Text }
.map { it.readText() }
.map { Notification.fromString(it) }
/* Read user notifications in redis then sent it to the websocket */
NotificationsPush(redisClient, call.citizen, incomingFlow) {
outgoing.send(Text(it.toString()))
}
pushBuilder.build(this)
}
}