diff --git a/build.gradle.kts b/build.gradle.kts index 84cee6a..8d825e5 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -70,6 +70,7 @@ dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}") implementation("io.ktor:ktor-server-jetty:$ktor_version") + implementation("io.ktor:ktor-client-jetty:$ktor_version") implementation("ch.qos.logback:logback-classic:$logback_version") implementation("io.ktor:ktor-server-core:$ktor_version") implementation("io.ktor:ktor-locations:$ktor_version") @@ -77,6 +78,7 @@ dependencies { implementation("io.ktor:ktor-auth-jwt:$ktor_version") implementation("io.ktor:ktor-gson:$ktor_version") implementation("io.ktor:ktor-auth-jwt:$ktor_version") + implementation("io.ktor:ktor-websockets:$ktor_version") implementation("org.koin:koin-ktor:$koinVersion") implementation("io.ktor:ktor-jackson:$ktor_version") implementation("com.fasterxml.jackson.module:jackson-module-kotlin:$jackson_version") diff --git a/src/main/kotlin/fr/dcproject/Application.kt b/src/main/kotlin/fr/dcproject/Application.kt index 006840d..c6e9cd7 100644 --- a/src/main/kotlin/fr/dcproject/Application.kt +++ b/src/main/kotlin/fr/dcproject/Application.kt @@ -26,26 +26,28 @@ import io.ktor.application.install import io.ktor.auth.Authentication import io.ktor.auth.authenticate import io.ktor.auth.jwt.jwt +import io.ktor.client.HttpClient +import io.ktor.client.engine.jetty.Jetty import io.ktor.features.* import io.ktor.http.HttpHeaders import io.ktor.http.HttpMethod import io.ktor.http.HttpStatusCode +import io.ktor.http.auth.HttpAuthHeader import io.ktor.jackson.jackson import io.ktor.locations.KtorExperimentalLocationsAPI import io.ktor.locations.Locations import io.ktor.response.respond -import io.ktor.response.respondText import io.ktor.routing.Routing -import io.ktor.routing.get import io.ktor.util.KtorExperimentalAPI +import io.ktor.websocket.WebSockets import io.lettuce.core.api.async.RedisAsyncCommands import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.eclipse.jetty.util.log.Slf4jLog import org.joda.time.DateTime +import org.koin.core.qualifier.named import org.koin.ktor.ext.Koin import org.koin.ktor.ext.get import org.slf4j.event.Level @@ -63,7 +65,6 @@ fun main(args: Array): Unit = io.ktor.server.jetty.EngineMain.main(args) enum class Env { PROD, TEST, CUCUMBER } -@InternalCoroutinesApi @KtorExperimentalAPI @KtorExperimentalLocationsAPI @Suppress("unused") // Referenced in application.conf @@ -167,6 +168,18 @@ fun Application.module(env: Env = PROD) { ) } + HttpClient(Jetty) { + engine { + } + } + + install(WebSockets) { + pingPeriod = Duration.ofSeconds(5) // Disabled (null) by default + timeout = Duration.ofSeconds(3) + maxFrameSize = Long.MAX_VALUE // Disabled (max value). The connection will be closed if surpassed this length. + masking = false + } + install(EventNotification) { /* Config Rabbit */ val exchangeName = config.exchangeNotificationName @@ -258,6 +271,21 @@ fun Application.module(env: Env = PROD) { } } } + + jwt("url") { + verifier(JwtConfig.verifier) + realm = "dc-project.fr" + authHeader { call -> + call.request.queryParameters.get("token")?.let { + HttpAuthHeader.Single("Bearer", it) + } + } + validate { + it.payload.getClaim("id").asString()?.let { id -> + get().findById(UUID.fromString(id)) + } + } + } } install(AutoHeadResponse) @@ -295,10 +323,10 @@ fun Application.module(env: Env = PROD) { opinionArticle(get()) opinionChoice(get()) definition() - get("/sse") { + } - call.respondText("OK") - } + authenticate("url") { + notificationArticle(get(), get(named("ws"))) } } diff --git a/src/main/kotlin/fr/dcproject/Module.kt b/src/main/kotlin/fr/dcproject/Module.kt index 87f604b..6f30cf5 100644 --- a/src/main/kotlin/fr/dcproject/Module.kt +++ b/src/main/kotlin/fr/dcproject/Module.kt @@ -13,9 +13,12 @@ import fr.dcproject.messages.SsoManager import fr.postgresjson.connexion.Connection import fr.postgresjson.connexion.Requester import fr.postgresjson.migration.Migrations +import io.ktor.client.HttpClient +import io.ktor.client.features.websocket.WebSockets import io.ktor.util.KtorExperimentalAPI import io.lettuce.core.RedisClient import io.lettuce.core.api.async.RedisAsyncCommands +import org.koin.core.qualifier.named import org.koin.dsl.module import fr.dcproject.repository.Article as ArticleRepository import fr.dcproject.repository.Citizen as CitizenRepository @@ -39,6 +42,7 @@ val Module = module { single { config } + // SQL connection single { Connection( host = config.host, @@ -49,16 +53,20 @@ val Module = module { ) } + // Launch Database migration single { Migrations(connection = get(), directory = config.sqlFiles) } + // Redis client single> { RedisClient.create(config.redis).connect()?.async() ?: error("Unable to connect to redis") } + // RabbitMQ single { ConnectionFactory().apply { setUri(config.rabbitmq) } } + // JsonSerializer single { jacksonObjectMapper().apply { registerModule(SimpleModule()) @@ -70,6 +78,14 @@ val Module = module { } } + // Client HTTP for WebSockets + single(named("ws")) { + HttpClient { + install(WebSockets) + } + } + + // SQL Requester (postgresJson) single { Requester.RequesterFactory( connection = get(), @@ -77,7 +93,7 @@ val Module = module { ).createRequester() } - // TODO: create generic declaration + // Repositories single { UserRepository(get()) } single { ArticleRepository(get()) } single { CitizenRepository(get()) } @@ -93,6 +109,9 @@ val Module = module { single { OpinionChoiceRepository(get()) } single { OpinionArticleRepository(get()) } + // Mailler single { Mailer(config.sendGridKey) } + + // SSO Manager for connection single { SsoManager(get(), config.domain, get()) } } diff --git a/src/main/kotlin/fr/dcproject/event/EventNotification.kt b/src/main/kotlin/fr/dcproject/event/EventNotification.kt index b0ebcc2..e4aea7c 100644 --- a/src/main/kotlin/fr/dcproject/event/EventNotification.kt +++ b/src/main/kotlin/fr/dcproject/event/EventNotification.kt @@ -9,15 +9,16 @@ import io.ktor.util.KtorExperimentalAPI import kotlinx.coroutines.DisposableHandle import org.joda.time.DateTime -abstract class Notification( +open class Notification( val type: String, val createdAt: DateTime = DateTime.now() -) +) : Serializable + open class EntityEvent( val target: UuidEntity, type: String, val action: String -) : Notification(type), Serializable { +) : Notification(type) { enum class Type(val event: EventDefinition) { UPDATE_ARTICLE(EventDefinition()) } diff --git a/src/main/kotlin/fr/dcproject/routes/Notification.kt b/src/main/kotlin/fr/dcproject/routes/Notification.kt new file mode 100644 index 0000000..3bb2e64 --- /dev/null +++ b/src/main/kotlin/fr/dcproject/routes/Notification.kt @@ -0,0 +1,52 @@ +package fr.dcproject.routes + +import fr.dcproject.citizen +import io.ktor.client.HttpClient +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.readText +import io.ktor.locations.KtorExperimentalLocationsAPI +import io.ktor.routing.Route +import io.ktor.websocket.webSocket +import io.lettuce.core.Range +import io.lettuce.core.api.async.RedisAsyncCommands +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.launch + +@ExperimentalCoroutinesApi +@KtorExperimentalLocationsAPI +fun Route.notificationArticle(redis: RedisAsyncCommands, client: HttpClient) { + webSocket("/notifications") { + val citizenId = call.citizen.id + + launch { + var score = 0.0 + while (true) { + val result = redis.zrangebyscoreWithScores( + "notification:$citizenId", + Range.from( + Range.Boundary.excluding(score), + Range.Boundary.including(Double.POSITIVE_INFINITY) + ) + ) + + result.get().forEach { + outgoing.send(Frame.Text(it.value)) + if (it.score > score) score = it.score + } + delay(1000) + // TODO terminate coroutine after connection close ! + } + } + + // TODO mark notification as read + incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect { + val text = it.readText() + outgoing.send(Frame.Text(text)) + delay(100) + } + } +}