#60 Can follow citizen #96
@@ -42,9 +42,9 @@ class NotificationPushListener(
|
|||||||
onReceive: suspend (NotificationMessage) -> Unit,
|
onReceive: suspend (NotificationMessage) -> Unit,
|
||||||
) {
|
) {
|
||||||
class Builder(redisClient: RedisClient) {
|
class Builder(redisClient: RedisClient) {
|
||||||
private val redisConnection = redisClient.connect() ?: error("Unable to connect to redis")
|
private val redisConnection = redisClient.connect()
|
||||||
private val redisConnectionPubSub = redisClient.connectPubSub() ?: error("Unable to connect to redis PubSub")
|
private val redisConnectionPubSub = redisClient.connectPubSub()
|
||||||
private val redis: RedisAsyncCommands<String, String> = redisConnection.async() ?: error("Unable to connect to redis Async")
|
private val redis: RedisAsyncCommands<String, String> = redisConnection.async()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build Listener with citizen, incoming flow and set an outgoing callback
|
* Build Listener with citizen, incoming flow and set an outgoing callback
|
||||||
@@ -62,7 +62,7 @@ class NotificationPushListener(
|
|||||||
fun build(ws: DefaultWebSocketServerSession): NotificationPushListener {
|
fun build(ws: DefaultWebSocketServerSession): NotificationPushListener {
|
||||||
/* Convert channel of string from websocket, to a flow of Notification object */
|
/* Convert channel of string from websocket, to a flow of Notification object */
|
||||||
val incomingFlow: Flow<NotificationMessage> = ws.incoming.consumeAsFlow()
|
val incomingFlow: Flow<NotificationMessage> = ws.incoming.consumeAsFlow()
|
||||||
.mapNotNull<Frame, Text> { it as? Frame.Text }
|
.mapNotNull<Frame, Text> { it as? Text }
|
||||||
.map { it.readText() }
|
.map { it.readText() }
|
||||||
.map { NotificationMessage.fromString(it) }
|
.map { NotificationMessage.fromString(it) }
|
||||||
|
|
||||||
@@ -119,7 +119,7 @@ class NotificationPushListener(
|
|||||||
addListener(listener)
|
addListener(listener)
|
||||||
|
|
||||||
/* Register to the events */
|
/* Register to the events */
|
||||||
async()?.psubscribe("__key*__:$key") ?: error("Unable to subscribe to redis events")
|
async()?.psubscribe("__key*__:$key")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user