Add consumer for Notification

This commit is contained in:
2020-02-24 20:44:37 +01:00
parent af33ed9ec3
commit b678f7f2cc
14 changed files with 192 additions and 47 deletions

View File

@@ -7,15 +7,18 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.joda.JodaModule
import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.*
import com.rabbitmq.client.BuiltinExchangeType.DIRECT
import fr.dcproject.Env.PROD
import fr.dcproject.entity.*
import fr.dcproject.event.EntityEvent
import fr.dcproject.event.EventNotification
import fr.dcproject.event.publisher.Publisher
import fr.dcproject.repository.FollowArticle
import fr.dcproject.routes.*
import fr.dcproject.security.voter.*
import fr.postgresjson.migration.Migrations
import fr.postgresjson.serializer.deserialize
import io.ktor.application.Application
import io.ktor.application.ApplicationCall
import io.ktor.application.call
@@ -35,10 +38,18 @@ import io.ktor.response.respondText
import io.ktor.routing.Routing
import io.ktor.routing.get
import io.ktor.util.KtorExperimentalAPI
import io.lettuce.core.api.async.RedisAsyncCommands
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.eclipse.jetty.util.log.Slf4jLog
import org.joda.time.DateTime
import org.koin.ktor.ext.Koin
import org.koin.ktor.ext.get
import org.slf4j.event.Level
import java.io.IOException
import java.time.Duration
import java.util.*
import java.util.concurrent.CompletionException
@@ -52,6 +63,7 @@ fun main(args: Array<String>): Unit = io.ktor.server.jetty.EngineMain.main(args)
enum class Env { PROD, TEST, CUCUMBER }
@InternalCoroutinesApi
@KtorExperimentalAPI
@KtorExperimentalLocationsAPI
@Suppress("unused") // Referenced in application.conf
@@ -133,7 +145,8 @@ fun Application.module(env: Env = PROD) {
decode { values, _ ->
val id = values.singleOrNull()?.let { UUID.fromString(it) }
?: throw InternalError("Cannot convert $values to UUID")
get<OpinionChoiceRepository>().findOpinionChoiceById(id) ?: throw NotFoundException("OpinionChoice $values not found")
get<OpinionChoiceRepository>().findOpinionChoiceById(id)
?: throw NotFoundException("OpinionChoice $values not found")
}
}
}
@@ -157,13 +170,15 @@ fun Application.module(env: Env = PROD) {
install(EventNotification) {
/* Config Rabbit */
val exchangeName = config.exchangeNotificationName
get<ConnectionFactory>().newConnection().use { connection -> connection.createChannel().use { channel ->
channel.queueDeclare("sse", true, false, false, null)
channel.queueDeclare("email", true, false, false, null)
channel.exchangeDeclare(exchangeName, "direct")
channel.queueBind("sse", exchangeName, "")
channel.queueBind("email", exchangeName, "")
}}
get<ConnectionFactory>().newConnection().use { connection ->
connection.createChannel().use { channel ->
channel.queueDeclare("sse", true, false, false, null)
channel.queueDeclare("email", true, false, false, null)
channel.exchangeDeclare(exchangeName, DIRECT, true)
channel.queueBind("sse", exchangeName, "")
channel.queueBind("email", exchangeName, "")
}
}
/* Declare publisher on event */
val publisher = Publisher(get(), get())
@@ -171,6 +186,61 @@ fun Application.module(env: Env = PROD) {
println("Article is updated ${it.target.id}")
publisher.publish(it)
}
/* Launch Consumer */
GlobalScope.launch {
val connection = get<ConnectionFactory>().newConnection()
val channel = connection.createChannel()
val redis = get<RedisAsyncCommands<String, String>>()
val consumerSSE: Consumer = object : DefaultConsumer(channel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) {
val message = body.toString(Charsets.UTF_8)
val event =
message.deserialize<EntityEvent>() ?: error("Unable to unserialise event message from rabbit")
val followRepo = when (event.type) {
"article" -> get<FollowArticle>()
else -> error("type of event not supported")
}
runBlocking {
followRepo
.findFollowsByTarget(event.target)
.collect { follow ->
redis.zadd(
"notification:${follow.createdBy.id}",
DateTime.now().millis.toDouble(),
message
)
}
}
channel.basicAck(envelope.deliveryTag, false)
}
}
val consumerEmail: Consumer = object : DefaultConsumer(channel) {
@Throws(IOException::class)
override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) {
val message = body.toString(Charsets.UTF_8)
println("The message is receive for send email: $message")
// TODO implement email sender
channel.basicAck(envelope.deliveryTag, false)
}
}
channel.basicConsume("sse", false, consumerSSE)
channel.basicConsume("email", false, consumerEmail)
}
}
install(Authentication) {
@@ -226,17 +296,7 @@ fun Application.module(env: Env = PROD) {
opinionChoice(get())
definition()
get("/sse") {
// environment.monitor.raise(EntityEvent.Type.UPDATE_ARTICLE.event, ArticleUpdate(ArticleRef()))
// val redis = this@authenticate.getKoin().get<RedisReactiveCommands<String, String>>()
// redis.set("key", "test").awaitSingle()
// redis.lpush("list", "test2").asFlow().map {
// println(it)
// }.collect()
// redis.get("key").asFlow().collect { println(it) }
// redis.rpop("list").asFlow().collect {
// println(it)
// call.respondText { it }
// }
call.respondText("OK")
}
}

View File

@@ -15,7 +15,7 @@ import fr.postgresjson.connexion.Requester
import fr.postgresjson.migration.Migrations
import io.ktor.util.KtorExperimentalAPI
import io.lettuce.core.RedisClient
import io.lettuce.core.api.reactive.RedisReactiveCommands
import io.lettuce.core.api.async.RedisAsyncCommands
import org.koin.dsl.module
import fr.dcproject.repository.Article as ArticleRepository
import fr.dcproject.repository.Citizen as CitizenRepository
@@ -49,8 +49,10 @@ val Module = module {
)
}
single<RedisReactiveCommands<String, String>> {
RedisClient.create(config.redis).connect()?.reactive() ?: error("Unable to connect to redis")
single { Migrations(connection = get(), directory = config.sqlFiles) }
single<RedisAsyncCommands<String, String>> {
RedisClient.create(config.redis).connect()?.async() ?: error("Unable to connect to redis")
}
single<ConnectionFactory> {
@@ -91,8 +93,6 @@ val Module = module {
single { OpinionChoiceRepository(get()) }
single { OpinionArticleRepository(get()) }
single { Migrations(connection = get(), directory = config.sqlFiles) }
single { Mailer(config.sendGridKey) }
single { SsoManager(get<Mailer>(), config.domain, get()) }
}

View File

@@ -37,15 +37,19 @@ open class CitizenSimple(
id: UUID = UUID.randomUUID(),
var name: Name,
user: UserRef
) : CitizenRef(id, user)
) : CitizenRefWithUser(id, user)
open class CitizenRef(
open class CitizenRefWithUser(
id: UUID = UUID.randomUUID(),
open val user: UserRef
) : UuidEntity(id),
CitizenI,
) : CitizenRef(id),
EntityDeletedAt by EntityDeletedAtImp()
open class CitizenRef(
id: UUID = UUID.randomUUID()
) : UuidEntity(id),
CitizenI
interface CitizenI : UuidEntityI {
data class Name(
var firstName: String,

View File

@@ -14,7 +14,7 @@ open class Comment<T : TargetI>(
var parent: Comment<T>? = null,
val parentsIds: List<UUID>? = null,
val childrenCount: Int? = null
) : ExtraI<T>,
) : ExtraI<T, CitizenBasicI>,
CommentRef(id),
EntityCreatedAt by EntityCreatedAtImp(),
EntityCreatedBy<CitizenBasicI> by EntityCreatedByImp(createdBy),

View File

@@ -31,7 +31,7 @@ class Constitution(
) : ConstitutionSimple.TitleSimple<ArticleSimple>(id, name, rank)
}
open class ConstitutionSimple<Cr : CitizenRef, T : ConstitutionSimple.TitleSimple<*>>(
open class ConstitutionSimple<Cr : CitizenRefWithUser, T : ConstitutionSimple.TitleSimple<*>>(
id: UUID = UUID.randomUUID(),
var title: String,
var anonymous: Boolean = true,

View File

@@ -8,18 +8,21 @@ import java.util.*
import kotlin.reflect.KClass
import kotlin.reflect.full.isSubclassOf
interface ExtraI<T : TargetI> :
interface ExtraI<T : TargetI, C: CitizenI> :
UuidEntityI,
EntityCreatedAt,
EntityCreatedBy<CitizenBasicI> {
EntityCreatedBy<C> {
val target: T
}
open class TargetRef(id: UUID = UUID.randomUUID()) : TargetI, UuidEntity(id) {
override val reference: String = ""
get() {
return if (field != "") field else TargetI.getReference(this)
}
open class TargetRef(id: UUID = UUID.randomUUID(), reference: String = "") : TargetI, UuidEntity(id) {
final override val reference: String
get() = if (field != "") field else TargetI.getReference(this)
init {
this.reference = reference
}
}
interface TargetI : UuidEntityI {
@@ -43,7 +46,7 @@ interface TargetI : UuidEntityI {
fun getReference(t: TargetI): String {
val ref = this.getReference(t::class)
return if (t is ExtraI<*>) {
return if (t is ExtraI<*, *>) {
"${ref}_on_${t.target.reference}"
} else {
ref

View File

@@ -7,7 +7,14 @@ class Follow<T : TargetI>(
id: UUID = UUID.randomUUID(),
override val createdBy: CitizenBasic,
override var target: T
) : ExtraI<T>,
) : ExtraI<T, CitizenBasicI>,
FollowSimple<T, CitizenBasicI>(id, createdBy, target)
open class FollowSimple<T : TargetI, C: CitizenI>(
id: UUID = UUID.randomUUID(),
override val createdBy: C,
override var target: T
) : ExtraI<T, C>,
UuidEntity(id),
EntityCreatedAt by EntityCreatedAtImp(),
EntityCreatedBy<CitizenBasicI> by EntityCreatedByImp(createdBy)
EntityCreatedBy<C> by EntityCreatedByImp(createdBy)

View File

@@ -11,7 +11,7 @@ open class Opinion<T : TargetI>(
override val createdBy: CitizenBasic,
override val target: T,
val choice: OpinionChoice
) : ExtraI<T>,
) : ExtraI<T,CitizenBasicI>,
TargetRef(id),
EntityCreatedAt by EntityCreatedAtImp(),
EntityCreatedBy<CitizenBasicI> by EntityCreatedByImp(createdBy) {

View File

@@ -9,7 +9,7 @@ open class Vote<T : TargetI>(
override var target: T,
var note: Int,
var anonymous: Boolean = true
) : ExtraI<T>,
) : ExtraI<T, CitizenBasicI>,
UuidEntity(id),
EntityCreatedAt by EntityCreatedAtImp(),
EntityCreatedBy<CitizenBasicI> by EntityCreatedByImp(createdBy),

View File

@@ -1,6 +1,7 @@
package fr.dcproject.event
import fr.dcproject.entity.Article
import fr.postgresjson.entity.Serializable
import fr.postgresjson.entity.immutable.UuidEntity
import io.ktor.application.*
import io.ktor.util.AttributeKey
@@ -12,11 +13,11 @@ abstract class Notification(
val type: String,
val createdAt: DateTime = DateTime.now()
)
abstract class EntityEvent(
open class EntityEvent(
val target: UuidEntity,
type: String,
val action: String
) : Notification(type) {
) : Notification(type), Serializable {
enum class Type(val event: EventDefinition<ArticleUpdate>) {
UPDATE_ARTICLE(EventDefinition<ArticleUpdate>())
}

View File

@@ -1,10 +1,12 @@
package fr.dcproject.repository
import fr.dcproject.entity.CitizenI
import fr.dcproject.entity.TargetI
import fr.dcproject.entity.*
import fr.postgresjson.connexion.Paginated
import fr.postgresjson.connexion.Requester
import fr.postgresjson.entity.immutable.UuidEntity
import fr.postgresjson.repository.RepositoryI
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import java.util.*
import fr.dcproject.entity.Article as ArticleEntity
import fr.dcproject.entity.Constitution as ConstitutionEntity
@@ -67,6 +69,32 @@ class FollowArticle(requester: Requester) : Follow<ArticleEntity>(requester) {
)
}
}
fun findFollowsByTarget(
target: UuidEntity,
page: Int = 1,
limit: Int = 300
): Paginated<FollowSimple<ArticleRef, CitizenRef>> {
return requester
.getFunction("find_follows_article_by_target")
.select(page, limit,
"target_id" to target.id
)
}
fun findFollowsByTarget(
target: UuidEntity,
limit: Int = 300
): Flow<FollowSimple<ArticleRef, CitizenRef>> = flow {
var nextPage = 1
do {
val paginate = findFollowsByTarget(target, nextPage, limit)
paginate.result.forEach {
emit(it)
}
nextPage = paginate.currentPage+1
} while (!paginate.isLastPage())
}
}
class FollowConstitution(requester: Requester) : Follow<ConstitutionEntity>(requester) {

View File

@@ -0,0 +1,37 @@
create or replace function find_follows_article_by_target(
_target_id uuid,
"limit" int default 50,
"offset" int default 0,
out resource json,
out total int
) language plpgsql as
$$
declare
_version_id uuid = (select version_id from article where id = _target_id);
begin
select json_agg(t), (
select count(f.id)
from follow f
join article a on f.target_id = a.id
where a.version_id = _version_id)
into resource, total
from (
select
f.id,
f.created_at,
f.target_reference,
json_build_object('id', f.target_id) as target,
json_build_object('id', f.created_by_id) as created_by
from follow_article as f
join article a on f.target_id = a.id
where a.version_id = _version_id
order by f.created_at
limit "limit" offset "offset"
) as t;
end
$$;
-- drop function if exists find_follows_article_by_target(uuid, int, int);
-- select * from find_follows_article_by_target('32518c76-5c58-3cd1-00cd-7f9d0bb872cd', 20, 0);
-- select * from find_follows_article_by_target('24a373f4-c321-4006-8d05-3c50f95a561b', 100, 0);
-- SELECT * FROM find_follows_article_by_target ("_target_id" := '24a373f4-c321-4006-8d05-3c50f95a561b'::uuid, "offset" := 0::int, "limit" := 300::int)

View File

@@ -7,6 +7,7 @@ import fr.dcproject.module
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.server.testing.withTestApplication
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.InternalCoroutinesApi
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
@@ -18,6 +19,7 @@ import org.koin.test.get
@KtorExperimentalAPI
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MailerTest : KoinTest, AutoCloseKoinTest() {
@InternalCoroutinesApi
@Test
@Tag("online")
fun `can be send an email`() {

View File

@@ -13,6 +13,7 @@ import io.cucumber.junit.CucumberOptions
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.server.testing.withTestApplication
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.InternalCoroutinesApi
import org.junit.runner.RunWith
import org.koin.test.KoinTest
import org.koin.test.get
@@ -20,6 +21,7 @@ import org.slf4j.Logger
var unitialized: Boolean = false
@InternalCoroutinesApi
@KtorExperimentalAPI
@KtorExperimentalLocationsAPI
@RunWith(Cucumber::class)
@@ -27,6 +29,7 @@ var unitialized: Boolean = false
class RunCucumberTest : En, KoinTest {
private val logger: Logger? by LoggerDelegate()
@InternalCoroutinesApi
val ktorContext = KtorServerContext {
module(CUCUMBER)
}