Implement Websocket for push Notification

create auth with jwt in query string
This commit is contained in:
2020-02-27 01:38:34 +01:00
parent b678f7f2cc
commit 1418dd95bc
5 changed files with 113 additions and 11 deletions

View File

@@ -26,26 +26,28 @@ import io.ktor.application.install
import io.ktor.auth.Authentication
import io.ktor.auth.authenticate
import io.ktor.auth.jwt.jwt
import io.ktor.client.HttpClient
import io.ktor.client.engine.jetty.Jetty
import io.ktor.features.*
import io.ktor.http.HttpHeaders
import io.ktor.http.HttpMethod
import io.ktor.http.HttpStatusCode
import io.ktor.http.auth.HttpAuthHeader
import io.ktor.jackson.jackson
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.locations.Locations
import io.ktor.response.respond
import io.ktor.response.respondText
import io.ktor.routing.Routing
import io.ktor.routing.get
import io.ktor.util.KtorExperimentalAPI
import io.ktor.websocket.WebSockets
import io.lettuce.core.api.async.RedisAsyncCommands
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.eclipse.jetty.util.log.Slf4jLog
import org.joda.time.DateTime
import org.koin.core.qualifier.named
import org.koin.ktor.ext.Koin
import org.koin.ktor.ext.get
import org.slf4j.event.Level
@@ -63,7 +65,6 @@ fun main(args: Array<String>): Unit = io.ktor.server.jetty.EngineMain.main(args)
enum class Env { PROD, TEST, CUCUMBER }
@InternalCoroutinesApi
@KtorExperimentalAPI
@KtorExperimentalLocationsAPI
@Suppress("unused") // Referenced in application.conf
@@ -167,6 +168,18 @@ fun Application.module(env: Env = PROD) {
)
}
HttpClient(Jetty) {
engine {
}
}
install(WebSockets) {
pingPeriod = Duration.ofSeconds(5) // Disabled (null) by default
timeout = Duration.ofSeconds(3)
maxFrameSize = Long.MAX_VALUE // Disabled (max value). The connection will be closed if surpassed this length.
masking = false
}
install(EventNotification) {
/* Config Rabbit */
val exchangeName = config.exchangeNotificationName
@@ -258,6 +271,21 @@ fun Application.module(env: Env = PROD) {
}
}
}
jwt("url") {
verifier(JwtConfig.verifier)
realm = "dc-project.fr"
authHeader { call ->
call.request.queryParameters.get("token")?.let {
HttpAuthHeader.Single("Bearer", it)
}
}
validate {
it.payload.getClaim("id").asString()?.let { id ->
get<UserRepository>().findById(UUID.fromString(id))
}
}
}
}
install(AutoHeadResponse)
@@ -295,10 +323,10 @@ fun Application.module(env: Env = PROD) {
opinionArticle(get())
opinionChoice(get())
definition()
get("/sse") {
}
call.respondText("OK")
}
authenticate("url") {
notificationArticle(get(), get(named("ws")))
}
}

View File

@@ -13,9 +13,12 @@ import fr.dcproject.messages.SsoManager
import fr.postgresjson.connexion.Connection
import fr.postgresjson.connexion.Requester
import fr.postgresjson.migration.Migrations
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.util.KtorExperimentalAPI
import io.lettuce.core.RedisClient
import io.lettuce.core.api.async.RedisAsyncCommands
import org.koin.core.qualifier.named
import org.koin.dsl.module
import fr.dcproject.repository.Article as ArticleRepository
import fr.dcproject.repository.Citizen as CitizenRepository
@@ -39,6 +42,7 @@ val Module = module {
single { config }
// SQL connection
single {
Connection(
host = config.host,
@@ -49,16 +53,20 @@ val Module = module {
)
}
// Launch Database migration
single { Migrations(connection = get(), directory = config.sqlFiles) }
// Redis client
single<RedisAsyncCommands<String, String>> {
RedisClient.create(config.redis).connect()?.async() ?: error("Unable to connect to redis")
}
// RabbitMQ
single<ConnectionFactory> {
ConnectionFactory().apply { setUri(config.rabbitmq) }
}
// JsonSerializer
single<ObjectMapper> {
jacksonObjectMapper().apply {
registerModule(SimpleModule())
@@ -70,6 +78,14 @@ val Module = module {
}
}
// Client HTTP for WebSockets
single(named("ws")) {
HttpClient {
install(WebSockets)
}
}
// SQL Requester (postgresJson)
single {
Requester.RequesterFactory(
connection = get(),
@@ -77,7 +93,7 @@ val Module = module {
).createRequester()
}
// TODO: create generic declaration
// Repositories
single { UserRepository(get()) }
single { ArticleRepository(get()) }
single { CitizenRepository(get()) }
@@ -93,6 +109,9 @@ val Module = module {
single { OpinionChoiceRepository(get()) }
single { OpinionArticleRepository(get()) }
// Mailler
single { Mailer(config.sendGridKey) }
// SSO Manager for connection
single { SsoManager(get<Mailer>(), config.domain, get()) }
}

View File

@@ -9,15 +9,16 @@ import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.DisposableHandle
import org.joda.time.DateTime
abstract class Notification(
open class Notification(
val type: String,
val createdAt: DateTime = DateTime.now()
)
) : Serializable
open class EntityEvent(
val target: UuidEntity,
type: String,
val action: String
) : Notification(type), Serializable {
) : Notification(type) {
enum class Type(val event: EventDefinition<ArticleUpdate>) {
UPDATE_ARTICLE(EventDefinition<ArticleUpdate>())
}

View File

@@ -0,0 +1,52 @@
package fr.dcproject.routes
import fr.dcproject.citizen
import io.ktor.client.HttpClient
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.readText
import io.ktor.locations.KtorExperimentalLocationsAPI
import io.ktor.routing.Route
import io.ktor.websocket.webSocket
import io.lettuce.core.Range
import io.lettuce.core.api.async.RedisAsyncCommands
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
@ExperimentalCoroutinesApi
@KtorExperimentalLocationsAPI
fun Route.notificationArticle(redis: RedisAsyncCommands<String, String>, client: HttpClient) {
webSocket("/notifications") {
val citizenId = call.citizen.id
launch {
var score = 0.0
while (true) {
val result = redis.zrangebyscoreWithScores(
"notification:$citizenId",
Range.from(
Range.Boundary.excluding(score),
Range.Boundary.including(Double.POSITIVE_INFINITY)
)
)
result.get().forEach {
outgoing.send(Frame.Text(it.value))
if (it.score > score) score = it.score
}
delay(1000)
// TODO terminate coroutine after connection close !
}
}
// TODO mark notification as read
incoming.consumeAsFlow().mapNotNull { it as? Frame.Text }.collect {
val text = it.readText()
outgoing.send(Frame.Text(text))
delay(100)
}
}
}