diff --git a/src/main/kotlin/fr/dcproject/Application.kt b/src/main/kotlin/fr/dcproject/Application.kt index d1ac9a9..006840d 100644 --- a/src/main/kotlin/fr/dcproject/Application.kt +++ b/src/main/kotlin/fr/dcproject/Application.kt @@ -7,15 +7,18 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.datatype.joda.JodaModule import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException -import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.client.* +import com.rabbitmq.client.BuiltinExchangeType.DIRECT import fr.dcproject.Env.PROD import fr.dcproject.entity.* import fr.dcproject.event.EntityEvent import fr.dcproject.event.EventNotification import fr.dcproject.event.publisher.Publisher +import fr.dcproject.repository.FollowArticle import fr.dcproject.routes.* import fr.dcproject.security.voter.* import fr.postgresjson.migration.Migrations +import fr.postgresjson.serializer.deserialize import io.ktor.application.Application import io.ktor.application.ApplicationCall import io.ktor.application.call @@ -35,10 +38,18 @@ import io.ktor.response.respondText import io.ktor.routing.Routing import io.ktor.routing.get import io.ktor.util.KtorExperimentalAPI +import io.lettuce.core.api.async.RedisAsyncCommands +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import org.eclipse.jetty.util.log.Slf4jLog +import org.joda.time.DateTime import org.koin.ktor.ext.Koin import org.koin.ktor.ext.get import org.slf4j.event.Level +import java.io.IOException import java.time.Duration import java.util.* import java.util.concurrent.CompletionException @@ -52,6 +63,7 @@ fun main(args: Array): Unit = io.ktor.server.jetty.EngineMain.main(args) enum class Env { PROD, TEST, CUCUMBER } +@InternalCoroutinesApi @KtorExperimentalAPI @KtorExperimentalLocationsAPI @Suppress("unused") // Referenced in application.conf @@ -133,7 +145,8 @@ fun Application.module(env: Env = PROD) { decode { values, _ -> val id = values.singleOrNull()?.let { UUID.fromString(it) } ?: throw InternalError("Cannot convert $values to UUID") - get().findOpinionChoiceById(id) ?: throw NotFoundException("OpinionChoice $values not found") + get().findOpinionChoiceById(id) + ?: throw NotFoundException("OpinionChoice $values not found") } } } @@ -157,13 +170,15 @@ fun Application.module(env: Env = PROD) { install(EventNotification) { /* Config Rabbit */ val exchangeName = config.exchangeNotificationName - get().newConnection().use { connection -> connection.createChannel().use { channel -> - channel.queueDeclare("sse", true, false, false, null) - channel.queueDeclare("email", true, false, false, null) - channel.exchangeDeclare(exchangeName, "direct") - channel.queueBind("sse", exchangeName, "") - channel.queueBind("email", exchangeName, "") - }} + get().newConnection().use { connection -> + connection.createChannel().use { channel -> + channel.queueDeclare("sse", true, false, false, null) + channel.queueDeclare("email", true, false, false, null) + channel.exchangeDeclare(exchangeName, DIRECT, true) + channel.queueBind("sse", exchangeName, "") + channel.queueBind("email", exchangeName, "") + } + } /* Declare publisher on event */ val publisher = Publisher(get(), get()) @@ -171,6 +186,61 @@ fun Application.module(env: Env = PROD) { println("Article is updated ${it.target.id}") publisher.publish(it) } + + /* Launch Consumer */ + GlobalScope.launch { + val connection = get().newConnection() + val channel = connection.createChannel() + val redis = get>() + val consumerSSE: Consumer = object : DefaultConsumer(channel) { + @Throws(IOException::class) + override fun handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: ByteArray + ) { + val message = body.toString(Charsets.UTF_8) + val event = + message.deserialize() ?: error("Unable to unserialise event message from rabbit") + + val followRepo = when (event.type) { + "article" -> get() + else -> error("type of event not supported") + } + + runBlocking { + followRepo + .findFollowsByTarget(event.target) + .collect { follow -> + redis.zadd( + "notification:${follow.createdBy.id}", + DateTime.now().millis.toDouble(), + message + ) + } + } + channel.basicAck(envelope.deliveryTag, false) + } + } + + val consumerEmail: Consumer = object : DefaultConsumer(channel) { + @Throws(IOException::class) + override fun handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: ByteArray + ) { + val message = body.toString(Charsets.UTF_8) + println("The message is receive for send email: $message") + // TODO implement email sender + channel.basicAck(envelope.deliveryTag, false) + } + } + channel.basicConsume("sse", false, consumerSSE) + channel.basicConsume("email", false, consumerEmail) + } } install(Authentication) { @@ -226,17 +296,7 @@ fun Application.module(env: Env = PROD) { opinionChoice(get()) definition() get("/sse") { -// environment.monitor.raise(EntityEvent.Type.UPDATE_ARTICLE.event, ArticleUpdate(ArticleRef())) -// val redis = this@authenticate.getKoin().get>() -// redis.set("key", "test").awaitSingle() -// redis.lpush("list", "test2").asFlow().map { -// println(it) -// }.collect() -// redis.get("key").asFlow().collect { println(it) } -// redis.rpop("list").asFlow().collect { -// println(it) -// call.respondText { it } -// } + call.respondText("OK") } } diff --git a/src/main/kotlin/fr/dcproject/Module.kt b/src/main/kotlin/fr/dcproject/Module.kt index 63c0fde..87f604b 100644 --- a/src/main/kotlin/fr/dcproject/Module.kt +++ b/src/main/kotlin/fr/dcproject/Module.kt @@ -15,7 +15,7 @@ import fr.postgresjson.connexion.Requester import fr.postgresjson.migration.Migrations import io.ktor.util.KtorExperimentalAPI import io.lettuce.core.RedisClient -import io.lettuce.core.api.reactive.RedisReactiveCommands +import io.lettuce.core.api.async.RedisAsyncCommands import org.koin.dsl.module import fr.dcproject.repository.Article as ArticleRepository import fr.dcproject.repository.Citizen as CitizenRepository @@ -49,8 +49,10 @@ val Module = module { ) } - single> { - RedisClient.create(config.redis).connect()?.reactive() ?: error("Unable to connect to redis") + single { Migrations(connection = get(), directory = config.sqlFiles) } + + single> { + RedisClient.create(config.redis).connect()?.async() ?: error("Unable to connect to redis") } single { @@ -91,8 +93,6 @@ val Module = module { single { OpinionChoiceRepository(get()) } single { OpinionArticleRepository(get()) } - single { Migrations(connection = get(), directory = config.sqlFiles) } - single { Mailer(config.sendGridKey) } single { SsoManager(get(), config.domain, get()) } } diff --git a/src/main/kotlin/fr/dcproject/entity/Citizen.kt b/src/main/kotlin/fr/dcproject/entity/Citizen.kt index e60adab..44b12cb 100644 --- a/src/main/kotlin/fr/dcproject/entity/Citizen.kt +++ b/src/main/kotlin/fr/dcproject/entity/Citizen.kt @@ -37,15 +37,19 @@ open class CitizenSimple( id: UUID = UUID.randomUUID(), var name: Name, user: UserRef -) : CitizenRef(id, user) +) : CitizenRefWithUser(id, user) -open class CitizenRef( +open class CitizenRefWithUser( id: UUID = UUID.randomUUID(), open val user: UserRef -) : UuidEntity(id), - CitizenI, +) : CitizenRef(id), EntityDeletedAt by EntityDeletedAtImp() +open class CitizenRef( + id: UUID = UUID.randomUUID() +) : UuidEntity(id), + CitizenI + interface CitizenI : UuidEntityI { data class Name( var firstName: String, diff --git a/src/main/kotlin/fr/dcproject/entity/Comment.kt b/src/main/kotlin/fr/dcproject/entity/Comment.kt index ec3362c..cb3c1e0 100644 --- a/src/main/kotlin/fr/dcproject/entity/Comment.kt +++ b/src/main/kotlin/fr/dcproject/entity/Comment.kt @@ -14,7 +14,7 @@ open class Comment( var parent: Comment? = null, val parentsIds: List? = null, val childrenCount: Int? = null -) : ExtraI, +) : ExtraI, CommentRef(id), EntityCreatedAt by EntityCreatedAtImp(), EntityCreatedBy by EntityCreatedByImp(createdBy), diff --git a/src/main/kotlin/fr/dcproject/entity/Constitution.kt b/src/main/kotlin/fr/dcproject/entity/Constitution.kt index 1f51674..d1d335d 100644 --- a/src/main/kotlin/fr/dcproject/entity/Constitution.kt +++ b/src/main/kotlin/fr/dcproject/entity/Constitution.kt @@ -31,7 +31,7 @@ class Constitution( ) : ConstitutionSimple.TitleSimple(id, name, rank) } -open class ConstitutionSimple>( +open class ConstitutionSimple>( id: UUID = UUID.randomUUID(), var title: String, var anonymous: Boolean = true, diff --git a/src/main/kotlin/fr/dcproject/entity/Extra.kt b/src/main/kotlin/fr/dcproject/entity/Extra.kt index d80a472..ce24a38 100644 --- a/src/main/kotlin/fr/dcproject/entity/Extra.kt +++ b/src/main/kotlin/fr/dcproject/entity/Extra.kt @@ -8,18 +8,21 @@ import java.util.* import kotlin.reflect.KClass import kotlin.reflect.full.isSubclassOf -interface ExtraI : +interface ExtraI : UuidEntityI, EntityCreatedAt, - EntityCreatedBy { + EntityCreatedBy { val target: T } -open class TargetRef(id: UUID = UUID.randomUUID()) : TargetI, UuidEntity(id) { - override val reference: String = "" - get() { - return if (field != "") field else TargetI.getReference(this) - } +open class TargetRef(id: UUID = UUID.randomUUID(), reference: String = "") : TargetI, UuidEntity(id) { + + final override val reference: String + get() = if (field != "") field else TargetI.getReference(this) + + init { + this.reference = reference + } } interface TargetI : UuidEntityI { @@ -43,7 +46,7 @@ interface TargetI : UuidEntityI { fun getReference(t: TargetI): String { val ref = this.getReference(t::class) - return if (t is ExtraI<*>) { + return if (t is ExtraI<*, *>) { "${ref}_on_${t.target.reference}" } else { ref diff --git a/src/main/kotlin/fr/dcproject/entity/Follow.kt b/src/main/kotlin/fr/dcproject/entity/Follow.kt index 13eb5d5..9b1f3bd 100644 --- a/src/main/kotlin/fr/dcproject/entity/Follow.kt +++ b/src/main/kotlin/fr/dcproject/entity/Follow.kt @@ -7,7 +7,14 @@ class Follow( id: UUID = UUID.randomUUID(), override val createdBy: CitizenBasic, override var target: T -) : ExtraI, +) : ExtraI, + FollowSimple(id, createdBy, target) + +open class FollowSimple( + id: UUID = UUID.randomUUID(), + override val createdBy: C, + override var target: T +) : ExtraI, UuidEntity(id), EntityCreatedAt by EntityCreatedAtImp(), - EntityCreatedBy by EntityCreatedByImp(createdBy) + EntityCreatedBy by EntityCreatedByImp(createdBy) diff --git a/src/main/kotlin/fr/dcproject/entity/Opinion.kt b/src/main/kotlin/fr/dcproject/entity/Opinion.kt index 23fd3d6..0950e07 100644 --- a/src/main/kotlin/fr/dcproject/entity/Opinion.kt +++ b/src/main/kotlin/fr/dcproject/entity/Opinion.kt @@ -11,7 +11,7 @@ open class Opinion( override val createdBy: CitizenBasic, override val target: T, val choice: OpinionChoice -) : ExtraI, +) : ExtraI, TargetRef(id), EntityCreatedAt by EntityCreatedAtImp(), EntityCreatedBy by EntityCreatedByImp(createdBy) { diff --git a/src/main/kotlin/fr/dcproject/entity/Vote.kt b/src/main/kotlin/fr/dcproject/entity/Vote.kt index e5e70ed..644e451 100644 --- a/src/main/kotlin/fr/dcproject/entity/Vote.kt +++ b/src/main/kotlin/fr/dcproject/entity/Vote.kt @@ -9,7 +9,7 @@ open class Vote( override var target: T, var note: Int, var anonymous: Boolean = true -) : ExtraI, +) : ExtraI, UuidEntity(id), EntityCreatedAt by EntityCreatedAtImp(), EntityCreatedBy by EntityCreatedByImp(createdBy), diff --git a/src/main/kotlin/fr/dcproject/event/EventNotification.kt b/src/main/kotlin/fr/dcproject/event/EventNotification.kt index f042dd3..b0ebcc2 100644 --- a/src/main/kotlin/fr/dcproject/event/EventNotification.kt +++ b/src/main/kotlin/fr/dcproject/event/EventNotification.kt @@ -1,6 +1,7 @@ package fr.dcproject.event import fr.dcproject.entity.Article +import fr.postgresjson.entity.Serializable import fr.postgresjson.entity.immutable.UuidEntity import io.ktor.application.* import io.ktor.util.AttributeKey @@ -12,11 +13,11 @@ abstract class Notification( val type: String, val createdAt: DateTime = DateTime.now() ) -abstract class EntityEvent( +open class EntityEvent( val target: UuidEntity, type: String, val action: String -) : Notification(type) { +) : Notification(type), Serializable { enum class Type(val event: EventDefinition) { UPDATE_ARTICLE(EventDefinition()) } diff --git a/src/main/kotlin/fr/dcproject/repository/Follow.kt b/src/main/kotlin/fr/dcproject/repository/Follow.kt index 475e70f..5946ea3 100644 --- a/src/main/kotlin/fr/dcproject/repository/Follow.kt +++ b/src/main/kotlin/fr/dcproject/repository/Follow.kt @@ -1,10 +1,12 @@ package fr.dcproject.repository -import fr.dcproject.entity.CitizenI -import fr.dcproject.entity.TargetI +import fr.dcproject.entity.* import fr.postgresjson.connexion.Paginated import fr.postgresjson.connexion.Requester +import fr.postgresjson.entity.immutable.UuidEntity import fr.postgresjson.repository.RepositoryI +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow import java.util.* import fr.dcproject.entity.Article as ArticleEntity import fr.dcproject.entity.Constitution as ConstitutionEntity @@ -67,6 +69,32 @@ class FollowArticle(requester: Requester) : Follow(requester) { ) } } + + fun findFollowsByTarget( + target: UuidEntity, + page: Int = 1, + limit: Int = 300 + ): Paginated> { + return requester + .getFunction("find_follows_article_by_target") + .select(page, limit, + "target_id" to target.id + ) + } + + fun findFollowsByTarget( + target: UuidEntity, + limit: Int = 300 + ): Flow> = flow { + var nextPage = 1 + do { + val paginate = findFollowsByTarget(target, nextPage, limit) + paginate.result.forEach { + emit(it) + } + nextPage = paginate.currentPage+1 + } while (!paginate.isLastPage()) + } } class FollowConstitution(requester: Requester) : Follow(requester) { diff --git a/src/main/resources/sql/functions/follow/find_follows_article_by_target.sql b/src/main/resources/sql/functions/follow/find_follows_article_by_target.sql new file mode 100644 index 0000000..99408c0 --- /dev/null +++ b/src/main/resources/sql/functions/follow/find_follows_article_by_target.sql @@ -0,0 +1,37 @@ +create or replace function find_follows_article_by_target( + _target_id uuid, + "limit" int default 50, + "offset" int default 0, + out resource json, + out total int +) language plpgsql as +$$ +declare + _version_id uuid = (select version_id from article where id = _target_id); +begin + select json_agg(t), ( + select count(f.id) + from follow f + join article a on f.target_id = a.id + where a.version_id = _version_id) + into resource, total + from ( + select + f.id, + f.created_at, + f.target_reference, + json_build_object('id', f.target_id) as target, + json_build_object('id', f.created_by_id) as created_by + from follow_article as f + join article a on f.target_id = a.id + where a.version_id = _version_id + order by f.created_at + limit "limit" offset "offset" + ) as t; +end +$$; + +-- drop function if exists find_follows_article_by_target(uuid, int, int); +-- select * from find_follows_article_by_target('32518c76-5c58-3cd1-00cd-7f9d0bb872cd', 20, 0); +-- select * from find_follows_article_by_target('24a373f4-c321-4006-8d05-3c50f95a561b', 100, 0); +-- SELECT * FROM find_follows_article_by_target ("_target_id" := '24a373f4-c321-4006-8d05-3c50f95a561b'::uuid, "offset" := 0::int, "limit" := 300::int) \ No newline at end of file diff --git a/src/test/kotlin/MailerTest.kt b/src/test/kotlin/MailerTest.kt index 962f83a..8118293 100644 --- a/src/test/kotlin/MailerTest.kt +++ b/src/test/kotlin/MailerTest.kt @@ -7,6 +7,7 @@ import fr.dcproject.module import io.ktor.locations.KtorExperimentalLocationsAPI import io.ktor.server.testing.withTestApplication import io.ktor.util.KtorExperimentalAPI +import kotlinx.coroutines.InternalCoroutinesApi import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance @@ -18,6 +19,7 @@ import org.koin.test.get @KtorExperimentalAPI @TestInstance(TestInstance.Lifecycle.PER_CLASS) class MailerTest : KoinTest, AutoCloseKoinTest() { + @InternalCoroutinesApi @Test @Tag("online") fun `can be send an email`() { diff --git a/src/test/kotlin/RunCucumberTest.kt b/src/test/kotlin/RunCucumberTest.kt index 6c113a6..79f3e0a 100644 --- a/src/test/kotlin/RunCucumberTest.kt +++ b/src/test/kotlin/RunCucumberTest.kt @@ -13,6 +13,7 @@ import io.cucumber.junit.CucumberOptions import io.ktor.locations.KtorExperimentalLocationsAPI import io.ktor.server.testing.withTestApplication import io.ktor.util.KtorExperimentalAPI +import kotlinx.coroutines.InternalCoroutinesApi import org.junit.runner.RunWith import org.koin.test.KoinTest import org.koin.test.get @@ -20,6 +21,7 @@ import org.slf4j.Logger var unitialized: Boolean = false +@InternalCoroutinesApi @KtorExperimentalAPI @KtorExperimentalLocationsAPI @RunWith(Cucumber::class) @@ -27,6 +29,7 @@ var unitialized: Boolean = false class RunCucumberTest : En, KoinTest { private val logger: Logger? by LoggerDelegate() + @InternalCoroutinesApi val ktorContext = KtorServerContext { module(CUCUMBER) }