Can delete Notification
Add ID to notification
This commit is contained in:
@@ -13,7 +13,9 @@ import fr.dcproject.Env.PROD
|
||||
import fr.dcproject.entity.*
|
||||
import fr.dcproject.event.EntityEvent
|
||||
import fr.dcproject.event.EventNotification
|
||||
import fr.dcproject.event.Notification
|
||||
import fr.dcproject.event.publisher.Publisher
|
||||
import fr.dcproject.repository.Follow
|
||||
import fr.dcproject.repository.FollowArticle
|
||||
import fr.dcproject.routes.*
|
||||
import fr.dcproject.security.voter.*
|
||||
@@ -41,12 +43,10 @@ import io.ktor.routing.Routing
|
||||
import io.ktor.util.KtorExperimentalAPI
|
||||
import io.ktor.websocket.WebSockets
|
||||
import io.lettuce.core.api.async.RedisAsyncCommands
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.eclipse.jetty.util.log.Slf4jLog
|
||||
import org.joda.time.DateTime
|
||||
import org.koin.core.qualifier.named
|
||||
import org.koin.ktor.ext.Koin
|
||||
import org.koin.ktor.ext.get
|
||||
@@ -193,10 +193,10 @@ fun Application.module(env: Env = PROD) {
|
||||
val exchangeName = config.exchangeNotificationName
|
||||
get<ConnectionFactory>().newConnection().use { connection ->
|
||||
connection.createChannel().use { channel ->
|
||||
channel.queueDeclare("sse", true, false, false, null)
|
||||
channel.queueDeclare("push", true, false, false, null)
|
||||
channel.queueDeclare("email", true, false, false, null)
|
||||
channel.exchangeDeclare(exchangeName, DIRECT, true)
|
||||
channel.queueBind("sse", exchangeName, "")
|
||||
channel.queueBind("push", exchangeName, "")
|
||||
channel.queueBind("email", exchangeName, "")
|
||||
}
|
||||
}
|
||||
@@ -208,43 +208,40 @@ fun Application.module(env: Env = PROD) {
|
||||
}
|
||||
|
||||
/* Launch Consumer */
|
||||
GlobalScope.launch {
|
||||
val connection = get<ConnectionFactory>().newConnection()
|
||||
val channel = connection.createChannel()
|
||||
launch {
|
||||
val rabbitChannel = get<ConnectionFactory>().newConnection().createChannel()
|
||||
val redis = get<RedisAsyncCommands<String, String>>()
|
||||
val consumerSSE: Consumer = object : DefaultConsumer(channel) {
|
||||
|
||||
val consumerPush: Consumer = object : DefaultConsumer(rabbitChannel) {
|
||||
@Throws(IOException::class)
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
envelope: Envelope,
|
||||
properties: AMQP.BasicProperties,
|
||||
body: ByteArray
|
||||
) {
|
||||
) = runBlocking {
|
||||
val message = body.toString(Charsets.UTF_8)
|
||||
val event =
|
||||
message.deserialize<EntityEvent>() ?: error("Unable to unserialise event message from rabbit")
|
||||
val msg = message.deserialize<EntityEvent>() ?: error("Unable to unserialise event message from rabbit")
|
||||
|
||||
val followRepo = when (event.type) {
|
||||
"article" -> get<FollowArticle>()
|
||||
else -> error("type of event not supported")
|
||||
let {
|
||||
when (msg.type) {
|
||||
Notification.Type.ARTICLE -> get<FollowArticle>()
|
||||
} as Follow<*,*>
|
||||
}
|
||||
.findFollowsByTarget(msg.target)
|
||||
.collect { follow ->
|
||||
redis.zadd(
|
||||
"notification:${follow.createdBy.id}",
|
||||
msg.id,
|
||||
message
|
||||
)
|
||||
}
|
||||
|
||||
runBlocking {
|
||||
followRepo
|
||||
.findFollowsByTarget(event.target)
|
||||
.collect { follow ->
|
||||
redis.zadd(
|
||||
"notification:${follow.createdBy.id}",
|
||||
DateTime.now().millis.toDouble(),
|
||||
message
|
||||
)
|
||||
}
|
||||
}
|
||||
channel.basicAck(envelope.deliveryTag, false)
|
||||
rabbitChannel.basicAck(envelope.deliveryTag, false)
|
||||
}
|
||||
}
|
||||
|
||||
val consumerEmail: Consumer = object : DefaultConsumer(channel) {
|
||||
val consumerEmail: Consumer = object : DefaultConsumer(rabbitChannel) {
|
||||
@Throws(IOException::class)
|
||||
override fun handleDelivery(
|
||||
consumerTag: String,
|
||||
@@ -255,11 +252,11 @@ fun Application.module(env: Env = PROD) {
|
||||
val message = body.toString(Charsets.UTF_8)
|
||||
println("The message is receive for send email: $message")
|
||||
// TODO implement email sender
|
||||
channel.basicAck(envelope.deliveryTag, false)
|
||||
rabbitChannel.basicAck(envelope.deliveryTag, false)
|
||||
}
|
||||
}
|
||||
channel.basicConsume("sse", false, consumerSSE)
|
||||
channel.basicConsume("email", false, consumerEmail)
|
||||
rabbitChannel.basicConsume("push", false, consumerPush) // The front consume the redis via Websocket
|
||||
rabbitChannel.basicConsume("email", false, consumerEmail)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package fr.dcproject.event
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonValue
|
||||
import fr.dcproject.entity.Article
|
||||
import fr.postgresjson.entity.Serializable
|
||||
import fr.postgresjson.entity.immutable.UuidEntity
|
||||
@@ -8,15 +9,27 @@ import io.ktor.util.AttributeKey
|
||||
import io.ktor.util.KtorExperimentalAPI
|
||||
import kotlinx.coroutines.DisposableHandle
|
||||
import org.joda.time.DateTime
|
||||
import kotlin.random.Random.Default.nextInt
|
||||
|
||||
sealed class NotificationS
|
||||
|
||||
open class Notification(
|
||||
val type: String,
|
||||
val type: Type,
|
||||
val createdAt: DateTime = DateTime.now()
|
||||
) : Serializable
|
||||
) : NotificationS(), Serializable {
|
||||
val id: Double = randId(createdAt.millis)
|
||||
enum class Type(@JsonValue val type: String) {
|
||||
ARTICLE("article");
|
||||
}
|
||||
|
||||
private fun randId(time: Long): Double {
|
||||
return (time.toString() + nextInt(1000, 9999).toString()).toDouble()
|
||||
}
|
||||
}
|
||||
|
||||
open class EntityEvent(
|
||||
val target: UuidEntity,
|
||||
type: String,
|
||||
type: Notification.Type,
|
||||
val action: String
|
||||
) : Notification(type) {
|
||||
enum class Type(val event: EventDefinition<ArticleUpdate>) {
|
||||
@@ -26,7 +39,7 @@ open class EntityEvent(
|
||||
|
||||
class ArticleUpdate(
|
||||
target: Article
|
||||
) : EntityEvent(target, "article", "update")
|
||||
) : EntityEvent(target, Notification.Type.ARTICLE, "update")
|
||||
|
||||
/**
|
||||
* Installation Class
|
||||
|
||||
@@ -12,7 +12,7 @@ import fr.dcproject.entity.Article as ArticleEntity
|
||||
import fr.dcproject.entity.Constitution as ConstitutionEntity
|
||||
import fr.dcproject.entity.Follow as FollowEntity
|
||||
|
||||
open class Follow<IN : TargetRef, OUT : TargetRef>(override var requester: Requester) : RepositoryI {
|
||||
sealed class Follow<IN : TargetRef, OUT : TargetRef>(override var requester: Requester) : RepositoryI {
|
||||
open fun findByCitizen(
|
||||
citizen: CitizenI,
|
||||
page: Int = 1,
|
||||
@@ -63,6 +63,26 @@ open class Follow<IN : TargetRef, OUT : TargetRef>(override var requester: Reque
|
||||
"citizen_id" to citizen.id,
|
||||
"target_id" to target.id
|
||||
)
|
||||
|
||||
fun findFollowsByTarget(
|
||||
target: UuidEntity,
|
||||
bulkSize: Int = 300
|
||||
): Flow<FollowSimple<IN, CitizenRef>> = flow {
|
||||
var nextPage = 1
|
||||
do {
|
||||
val paginate = findFollowsByTarget(target, nextPage, bulkSize)
|
||||
paginate.result.forEach {
|
||||
emit(it)
|
||||
}
|
||||
nextPage = paginate.currentPage+1
|
||||
} while (!paginate.isLastPage())
|
||||
}
|
||||
|
||||
abstract fun findFollowsByTarget(
|
||||
target: UuidEntity,
|
||||
page: Int = 1,
|
||||
limit: Int = 300
|
||||
): Paginated<FollowSimple<IN, CitizenRef>>
|
||||
}
|
||||
|
||||
class FollowArticle(requester: Requester) : Follow<ArticleRef, ArticleEntity>(requester) {
|
||||
@@ -80,10 +100,10 @@ class FollowArticle(requester: Requester) : Follow<ArticleRef, ArticleEntity>(re
|
||||
}
|
||||
}
|
||||
|
||||
fun findFollowsByTarget(
|
||||
override fun findFollowsByTarget(
|
||||
target: UuidEntity,
|
||||
page: Int = 1,
|
||||
limit: Int = 300
|
||||
page: Int,
|
||||
limit: Int
|
||||
): Paginated<FollowSimple<ArticleRef, CitizenRef>> {
|
||||
return requester
|
||||
.getFunction("find_follows_article_by_target")
|
||||
@@ -91,20 +111,6 @@ class FollowArticle(requester: Requester) : Follow<ArticleRef, ArticleEntity>(re
|
||||
"target_id" to target.id
|
||||
)
|
||||
}
|
||||
|
||||
fun findFollowsByTarget(
|
||||
target: UuidEntity,
|
||||
limit: Int = 300
|
||||
): Flow<FollowSimple<ArticleRef, CitizenRef>> = flow {
|
||||
var nextPage = 1
|
||||
do {
|
||||
val paginate = findFollowsByTarget(target, nextPage, limit)
|
||||
paginate.result.forEach {
|
||||
emit(it)
|
||||
}
|
||||
nextPage = paginate.currentPage+1
|
||||
} while (!paginate.isLastPage())
|
||||
}
|
||||
}
|
||||
|
||||
class FollowConstitution(requester: Requester) : Follow<ConstitutionRef, ConstitutionEntity>(requester) {
|
||||
@@ -121,4 +127,12 @@ class FollowConstitution(requester: Requester) : Follow<ConstitutionRef, Constit
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override fun findFollowsByTarget(
|
||||
target: UuidEntity,
|
||||
page: Int,
|
||||
limit: Int
|
||||
): Paginated<FollowSimple<ConstitutionRef, CitizenRef>> {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package fr.dcproject.routes
|
||||
|
||||
import fr.dcproject.citizen
|
||||
import fr.dcproject.event.Notification
|
||||
import fr.postgresjson.serializer.deserialize
|
||||
import io.ktor.client.HttpClient
|
||||
import io.ktor.http.cio.websocket.Frame
|
||||
import io.ktor.http.cio.websocket.readText
|
||||
@@ -21,32 +23,36 @@ import kotlinx.coroutines.launch
|
||||
fun Route.notificationArticle(redis: RedisAsyncCommands<String, String>, client: HttpClient) {
|
||||
webSocket("/notifications") {
|
||||
val citizenId = call.citizen.id
|
||||
val job = launch {
|
||||
var score = 0.0
|
||||
while (!outgoing.isClosedForSend) {
|
||||
val result = redis.zrangebyscoreWithScores(
|
||||
|
||||
launch {
|
||||
incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect {
|
||||
val notificationMessage = it.readText().deserialize<Notification>() ?: error("unable to deserialize message")
|
||||
|
||||
redis.zremrangebyscore(
|
||||
"notification:$citizenId",
|
||||
Range.from(
|
||||
Range.Boundary.excluding(score),
|
||||
Range.Boundary.including(Double.POSITIVE_INFINITY)
|
||||
Range.Boundary.including(notificationMessage.id),
|
||||
Range.Boundary.including(notificationMessage.id)
|
||||
)
|
||||
)
|
||||
|
||||
result.get().forEach {
|
||||
outgoing.send(Frame.Text(it.value))
|
||||
if (it.score > score) score = it.score
|
||||
}
|
||||
delay(1000)
|
||||
// TODO terminate coroutine after connection close !
|
||||
}
|
||||
}
|
||||
job.join()
|
||||
|
||||
// TODO mark notification as read
|
||||
incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect {
|
||||
val text = it.readText()
|
||||
outgoing.send(Frame.Text(text))
|
||||
delay(100)
|
||||
var score = 0.0
|
||||
while (!outgoing.isClosedForSend) {
|
||||
val result = redis.zrangebyscoreWithScores(
|
||||
"notification:$citizenId",
|
||||
Range.from(
|
||||
Range.Boundary.excluding(score),
|
||||
Range.Boundary.including(Double.POSITIVE_INFINITY)
|
||||
)
|
||||
)
|
||||
|
||||
result.get().forEach {
|
||||
outgoing.send(Frame.Text(it.value))
|
||||
if (it.score > score) score = it.score
|
||||
}
|
||||
delay(1000)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user