create GameListRepositoryInRedis
This commit is contained in:
@@ -0,0 +1,57 @@
|
|||||||
|
package eventDemo.adapter.infrastructureLayer.event.projection
|
||||||
|
|
||||||
|
import eventDemo.business.entity.GameId
|
||||||
|
import eventDemo.business.event.GameEventBus
|
||||||
|
import eventDemo.business.event.GameEventStore
|
||||||
|
import eventDemo.business.event.projection.GameProjectionBus
|
||||||
|
import eventDemo.business.event.projection.gameList.GameList
|
||||||
|
import eventDemo.business.event.projection.gameList.GameListRepository
|
||||||
|
import eventDemo.business.event.projection.gameList.apply
|
||||||
|
import eventDemo.business.event.projection.gameState.GameState
|
||||||
|
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis
|
||||||
|
import eventDemo.libs.event.projection.SnapshotConfig
|
||||||
|
import io.github.oshai.kotlinlogging.withLoggingContext
|
||||||
|
import kotlinx.serialization.json.Json
|
||||||
|
import redis.clients.jedis.UnifiedJedis
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus].
|
||||||
|
*/
|
||||||
|
class GameListRepositoryInRedis(
|
||||||
|
eventStore: GameEventStore,
|
||||||
|
jedis: UnifiedJedis,
|
||||||
|
snapshotConfig: SnapshotConfig = SnapshotConfig(),
|
||||||
|
) : GameListRepository {
|
||||||
|
private val projectionsSnapshot =
|
||||||
|
ProjectionSnapshotRepositoryInRedis(
|
||||||
|
eventStore = eventStore,
|
||||||
|
snapshotCacheConfig = snapshotConfig,
|
||||||
|
initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) },
|
||||||
|
projectionClass = GameList::class,
|
||||||
|
projectionToJson = { Json.encodeToString(GameList.serializer(), it) },
|
||||||
|
jsonToProjection = { Json.decodeFromString(GameList.serializer(), it) },
|
||||||
|
applyToProjection = GameList::apply,
|
||||||
|
jedis = jedis,
|
||||||
|
)
|
||||||
|
|
||||||
|
fun subscribeToBus(
|
||||||
|
projectionBus: GameProjectionBus,
|
||||||
|
eventBus: GameEventBus,
|
||||||
|
) {
|
||||||
|
eventBus.subscribe { event ->
|
||||||
|
withLoggingContext("event" to event.toString()) {
|
||||||
|
projectionsSnapshot
|
||||||
|
.applyAndPutToCache(event)
|
||||||
|
.also { projectionBus.publish(it) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the last version of the [GameState] from the all eventStream.
|
||||||
|
*
|
||||||
|
* It fetches it from the local cache if possible, otherwise it builds it.
|
||||||
|
*/
|
||||||
|
override fun getList(): List<GameList> =
|
||||||
|
projectionsSnapshot.getList()
|
||||||
|
}
|
||||||
@@ -10,6 +10,7 @@ import eventDemo.configuration.route.declareHttpGameRoute
|
|||||||
import eventDemo.configuration.route.declareWebSocketsGameRoute
|
import eventDemo.configuration.route.declareWebSocketsGameRoute
|
||||||
import io.ktor.server.application.Application
|
import io.ktor.server.application.Application
|
||||||
import org.koin.ktor.ext.get
|
import org.koin.ktor.ext.get
|
||||||
|
import org.koin.ktor.ext.getKoin
|
||||||
|
|
||||||
fun Application.configure() {
|
fun Application.configure() {
|
||||||
configureKoin()
|
configureKoin()
|
||||||
@@ -24,5 +25,5 @@ fun Application.configure() {
|
|||||||
configureHttpRouting()
|
configureHttpRouting()
|
||||||
declareHttpGameRoute()
|
declareHttpGameRoute()
|
||||||
|
|
||||||
configureGameListener()
|
getKoin().configureGameListener()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import com.zaxxer.hikari.HikariConfig
|
|||||||
import com.zaxxer.hikari.HikariDataSource
|
import com.zaxxer.hikari.HikariDataSource
|
||||||
import eventDemo.adapter.infrastructureLayer.event.GameEventBusInMemory
|
import eventDemo.adapter.infrastructureLayer.event.GameEventBusInMemory
|
||||||
import eventDemo.adapter.infrastructureLayer.event.GameEventStoreInPostgresql
|
import eventDemo.adapter.infrastructureLayer.event.GameEventStoreInPostgresql
|
||||||
import eventDemo.adapter.infrastructureLayer.event.projection.GameListRepositoryInMemory
|
import eventDemo.adapter.infrastructureLayer.event.projection.GameListRepositoryInRedis
|
||||||
import eventDemo.adapter.infrastructureLayer.event.projection.GameProjectionBusInMemory
|
import eventDemo.adapter.infrastructureLayer.event.projection.GameProjectionBusInMemory
|
||||||
import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInRedis
|
import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInRedis
|
||||||
import eventDemo.business.event.GameEventBus
|
import eventDemo.business.event.GameEventBus
|
||||||
@@ -21,7 +21,7 @@ import redis.clients.jedis.UnifiedJedis
|
|||||||
import javax.sql.DataSource
|
import javax.sql.DataSource
|
||||||
|
|
||||||
fun Module.configureDIInfrastructure(config: Configuration) {
|
fun Module.configureDIInfrastructure(config: Configuration) {
|
||||||
factory {
|
single {
|
||||||
JedisPooled(config.redisUrl)
|
JedisPooled(config.redisUrl)
|
||||||
} bind UnifiedJedis::class
|
} bind UnifiedJedis::class
|
||||||
|
|
||||||
@@ -47,6 +47,6 @@ fun Module.configureDIInfrastructure(config: Configuration) {
|
|||||||
} bind GameStateRepository::class
|
} bind GameStateRepository::class
|
||||||
|
|
||||||
single {
|
single {
|
||||||
GameListRepositoryInMemory(get(), get(), get(), snapshotConfig = SnapshotConfig())
|
GameListRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig())
|
||||||
} bind GameListRepository::class
|
} bind GameListRepository::class
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import eventDemo.libs.toRanges
|
|||||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||||
import io.github.oshai.kotlinlogging.withLoggingContext
|
import io.github.oshai.kotlinlogging.withLoggingContext
|
||||||
import redis.clients.jedis.UnifiedJedis
|
import redis.clients.jedis.UnifiedJedis
|
||||||
|
import redis.clients.jedis.params.ScanParams
|
||||||
import redis.clients.jedis.params.SortingParams
|
import redis.clients.jedis.params.SortingParams
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
@@ -55,13 +56,16 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
|
|||||||
offset: Int,
|
offset: Int,
|
||||||
): List<P> =
|
): List<P> =
|
||||||
jedis
|
jedis
|
||||||
.sort(
|
.scan(
|
||||||
projectionClass.redisKeySearchList,
|
offset.toString(),
|
||||||
SortingParams()
|
ScanParams()
|
||||||
.desc()
|
.match(projectionClass.redisKeySearchList)
|
||||||
.by("score")
|
.count(limit),
|
||||||
.limit(limit, offset),
|
).result
|
||||||
).map { jsonToProjection(it) }
|
.mapNotNull { key ->
|
||||||
|
getLastByKey(key)
|
||||||
|
?.let(jsonToProjection)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the last version of the [Projection] from the cache.
|
* Get the last version of the [Projection] from the cache.
|
||||||
@@ -71,16 +75,19 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
|
|||||||
* 3. apply the missing events to the snapshot
|
* 3. apply the missing events to the snapshot
|
||||||
*/
|
*/
|
||||||
override fun getLast(aggregateId: ID): P =
|
override fun getLast(aggregateId: ID): P =
|
||||||
|
getLastByKey(projectionClass.redisKey(aggregateId))
|
||||||
|
?.let(jsonToProjection)
|
||||||
|
?: initialStateBuilder(aggregateId)
|
||||||
|
|
||||||
|
private fun getLastByKey(key: String): String? =
|
||||||
jedis
|
jedis
|
||||||
.sort(
|
.sort(
|
||||||
projectionClass.redisKey(aggregateId),
|
key,
|
||||||
SortingParams()
|
SortingParams()
|
||||||
.desc()
|
.desc()
|
||||||
.by("score")
|
.by("score")
|
||||||
.limit(0, 1),
|
.limit(0, 1),
|
||||||
).firstOrNull()
|
).firstOrNull()
|
||||||
?.let(jsonToProjection)
|
|
||||||
?: initialStateBuilder(aggregateId)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build the [Projection] to the specific [event][Event].
|
* Build the [Projection] to the specific [event][Event].
|
||||||
|
|||||||
Reference in New Issue
Block a user