create ProjectionSnapshotRepositoryInRedis

This commit is contained in:
2025-03-25 01:05:52 +01:00
parent 23b304fdbd
commit 9670a000f0
5 changed files with 458 additions and 176 deletions

View File

@@ -11,15 +11,35 @@ import eventDemo.business.event.projection.GameProjectionBus
import eventDemo.business.event.projection.gameList.GameListRepository
import eventDemo.business.event.projection.gameState.GameStateRepository
import eventDemo.libs.event.projection.SnapshotConfig
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer
import org.koin.core.module.Module
import org.koin.core.module.dsl.singleOf
import org.koin.dsl.bind
import redis.clients.jedis.JedisPooled
import redis.clients.jedis.UnifiedJedis
import redis.clients.jedis.json.JsonObjectMapper
fun Module.configureDIInfrastructure(redisUrl: String) {
single {
JedisPooled(redisUrl)
}
factory {
JedisPooled(redisUrl).apply {
setJsonObjectMapper(
object : JsonObjectMapper {
override fun <T> fromJson(
value: String,
valueType: Class<T>,
): T {
val s: KSerializer<T> = serializer(valueType) as KSerializer<T>
return Json.decodeFromString(s, value)
}
override fun toJson(value: Any): String =
Json.encodeToString(value)
},
)
}
} bind UnifiedJedis::class
singleOf(::GameEventBusInMemory) bind GameEventBus::class
singleOf(::GameEventStoreInMemory) bind GameEventStore::class

View File

@@ -0,0 +1,11 @@
package eventDemo.libs
fun List<Int>.toRanges(): List<IntRange> =
fold(listOf()) { acc, i ->
val last = acc.lastOrNull()
if (last != null && last.max() + 1 == i) {
(acc - setOf(last)) + setOf(IntRange(last.min(), i))
} else {
acc + setOf(IntRange(i, i))
}
}

View File

@@ -0,0 +1,194 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.toRanges
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import redis.clients.jedis.UnifiedJedis
import redis.clients.jedis.params.ScanParams
import redis.clients.jedis.params.SortingParams
import kotlin.reflect.KClass
class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val eventStore: EventStore<E, ID>,
private val jedis: UnifiedJedis,
private val initialStateBuilder: (aggregateId: ID) -> P,
private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(),
private val projectionClass: KClass<P>,
private val projectionToJson: (P) -> String,
private val jsonToProjection: (String) -> P,
private val applyToProjection: P.(event: E) -> P,
) : ProjectionSnapshotRepository<E, P, ID> {
/**
* Create a snapshot for the event
*
* 1. get the last snapshot with a version lower than that of the event
* 2. get the events with a greater version of the snapshot
* 3. apply the event to the snapshot
* 4. apply the new event to the projection
* 5. save it
* 6. remove old one
*/
override fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
save(it)
removeOldSnapshot(it.aggregateId, event.version)
}
/**
* Get the list of all [Projections][Projection]
*/
override fun getList(): List<P> =
jedis
.scan(
"0",
ScanParams()
.match(projectionClass.redisKeySearchListLatest)
.count(100),
).result
.map { jsonToProjection(it) }
/**
* Get the last version of the [Projection] from the cache.
*
* 1. get the last snapshot
* 2. get the missing event to the snapshot
* 3. apply the missing events to the snapshot
*/
override fun getLast(aggregateId: ID): P =
jedis
.get(projectionClass.redisKeyLatest(aggregateId))
.let(jsonToProjection)
/**
* Build the [Projection] to the specific [event][Event].
*
* It does not contain the [events][Event] it after this one.
*
* 1. get the last snapshot before the event
* 2. get the events with a greater version of the snapshot but lower of passed event
* 3. apply the events to the snapshot
*/
override fun getUntil(event: E): P {
val lastSnapshot =
jedis
.sort(
projectionClass.redisKey(event.aggregateId),
SortingParams()
.desc()
.by("score")
.limit(0, 1),
).firstOrNull()
?.let(jsonToProjection)
if (lastSnapshot?.lastEventVersion == event.version) {
return lastSnapshot
}
val missingEventOfSnapshot =
eventStore
.getStream(event.aggregateId)
.readVersionBetween((lastSnapshot?.lastEventVersion ?: 1)..event.version)
return if (lastSnapshot?.lastEventVersion == event.version) {
lastSnapshot
} else {
lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot)
}
}
private fun save(projection: P) {
jedis.zadd(projection.redisKeyVersion, projection.lastEventVersion.toDouble(), projectionToJson(projection))
jedis.expire(projection.redisKeyVersion, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds)
jedis.set(projection.redisKeyLatest, projectionToJson(projection))
}
/**
* Apply events to the projection.
*/
private fun P?.applyEvents(
aggregateId: ID,
eventsToApply: Set<E>,
): P =
eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure)
/**
* Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event.
*/
private val applyToProjectionSecure: P.(event: E) -> P = { event ->
withLoggingContext("event" to event.toString(), "projection" to this.toString()) {
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")
}
}
}
private fun removeOldSnapshot(
aggregateId: AggregateId,
lastVersion: Int,
) {
removeByModulo(aggregateId, lastVersion)
removeTheHeadBySize(aggregateId)
}
private fun removeByModulo(
aggregateId: AggregateId,
lastVersion: Int,
) {
(lastVersion - snapshotCacheConfig.maxSnapshotCacheSize)
.let { if (it < 0) 0 else it }
.let { IntRange(it, lastVersion - 1) }
.filter { (it % snapshotCacheConfig.modulo) != 1 }
.toRanges()
.map {
jedis.zremrangeByScore(
projectionClass.redisKey(aggregateId),
it.min().toDouble(),
it.max().toDouble(),
)
}
}
private fun removeTheHeadBySize(aggregateId: AggregateId) {
val size =
jedis.zcount(
projectionClass.redisKey(aggregateId),
Double.MIN_VALUE,
Double.MAX_VALUE,
)
LongRange((size - snapshotCacheConfig.maxSnapshotCacheSize), size)
.let {
jedis.zremrangeByRank(
projectionClass.redisKey(aggregateId),
1,
it.max(),
)
}
}
}
val <P : Projection<*>> KClass<P>.redisKeySearchListLatest: String get() {
return "projection:$simpleName:*:latest"
}
val <P : Projection<*>> P.redisKeyVersion: String get() {
return "projection:${this::class.simpleName}:${aggregateId.id}:$lastEventVersion"
}
val <P : Projection<*>> P.redisKeyLatest: String get() {
return "projection:${this::class.simpleName}:${aggregateId.id}:latest"
}
fun <A : AggregateId, P : Projection<*>> KClass<P>.redisKeyLatest(aggregateId: A): String =
"projection:$simpleName:${aggregateId.id}:latest"
fun <P : Projection<*>, A : AggregateId> KClass<P>.redisKey(aggregateId: A): String =
"projection:$simpleName:${aggregateId.id}"

View File

@@ -1,173 +0,0 @@
package eventDemo.business.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStoreInMemory
import eventDemo.libs.event.VersionBuilderLocal
import eventDemo.libs.event.projection.Projection
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.equals.shouldBeEqual
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.test.assertNotNull
@OptIn(DelicateCoroutinesApi::class)
class ProjectionSnapshotRepositoryInMemoryTest :
FunSpec({
test("when call applyAndPutToCache, the getUntil method must be use the built projection cache") {
val eventStore: EventStore<TestEvents, IdTest> = EventStoreInMemory()
val repo = getSnapshotRepoTest(eventStore)
val aggregateId = IdTest()
val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest())
eventStore.publish(eventOther)
repo.applyAndPutToCache(eventOther)
assertNotNull(repo.getUntil(eventOther)).also {
assertNotNull(it.value) shouldBeEqual "valOther"
}
val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId)
eventStore.publish(event1)
repo.applyAndPutToCache(event1)
assertNotNull(repo.getLast(event1.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId)
eventStore.publish(event2)
repo.applyAndPutToCache(event2)
assertNotNull(repo.getLast(event2.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1val2"
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
assertNotNull(repo.getUntil(event2)).also {
assertNotNull(it.value) shouldBeEqual "val1val2"
}
}
test("ProjectionSnapshotRepositoryInMemory should be thread safe") {
val eventStore: EventStore<TestEvents, IdTest> = EventStoreInMemory()
val repo = getSnapshotRepoTest(eventStore)
val aggregateId = IdTest()
val versionBuilder = VersionBuilderLocal()
val lock = ReentrantLock()
(0..9)
.map {
GlobalScope.launch {
(1..10).map {
val eventX =
lock.withLock {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)
.also { eventStore.publish(it) }
}
repo.applyAndPutToCache(eventX)
}
}
}.joinAll()
assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100
}
test("removeOldSnapshot") {
val versionBuilder = VersionBuilderLocal()
val eventStore: EventStore<TestEvents, IdTest> = EventStoreInMemory()
val repo = getSnapshotRepoTest(eventStore, SnapshotConfig(2))
val aggregateId = IdTest()
fun buildEndSendEventX() {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)
.also { eventStore.publish(it) }
.also { repo.applyAndPutToCache(it) }
}
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 1
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 2
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 3
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 4
}
})
@JvmInline
private value class IdTest(
override val id: UUID = UUID.randomUUID(),
) : AggregateId
private data class ProjectionTest(
override val aggregateId: IdTest,
override val lastEventVersion: Int = 0,
var value: String? = null,
var num: Int = 0,
) : Projection<IdTest>
private sealed interface TestEvents : Event<IdTest>
private data class Event1Test(
override val eventId: UUID = UUID.randomUUID(),
override val aggregateId: IdTest,
override val createdAt: Instant = Clock.System.now(),
override val version: Int,
val value1: String,
) : TestEvents
private data class Event2Test(
override val eventId: UUID = UUID.randomUUID(),
override val aggregateId: IdTest,
override val createdAt: Instant = Clock.System.now(),
override val version: Int,
val value2: String,
) : TestEvents
private data class EventXTest(
override val eventId: UUID = UUID.randomUUID(),
override val aggregateId: IdTest,
override val createdAt: Instant = Clock.System.now(),
override val version: Int,
val num: Int,
) : TestEvents
private fun getSnapshotRepoTest(
eventStore: EventStore<TestEvents, IdTest>,
snapshotConfig: SnapshotConfig = SnapshotConfig(2000),
): ProjectionSnapshotRepositoryInMemory<TestEvents, ProjectionTest, IdTest> =
ProjectionSnapshotRepositoryInMemory(
eventStore = eventStore,
initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) },
snapshotCacheConfig = snapshotConfig,
) { event ->
this.let { projection ->
when (event) {
is Event1Test -> {
projection.copy(value = (projection.value ?: "") + event.value1)
}
is Event2Test -> {
projection.copy(value = (projection.value ?: "") + event.value2)
}
is EventXTest -> {
projection.copy(num = projection.num + event.num)
}
}.copy(
lastEventVersion = event.version,
)
}
}

View File

@@ -0,0 +1,230 @@
package eventDemo.business.event.projection
import eventDemo.configuration.serializer.UUIDSerializer
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStoreInMemory
import eventDemo.libs.event.VersionBuilderLocal
import eventDemo.libs.event.projection.Projection
import eventDemo.libs.event.projection.ProjectionSnapshotRepository
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.kotest.core.spec.style.FunSpec
import io.kotest.datatest.WithDataTestName
import io.kotest.datatest.withData
import io.kotest.matchers.equals.shouldBeEqual
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import redis.clients.jedis.JedisPooled
import java.util.UUID
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.test.assertNotNull
@OptIn(DelicateCoroutinesApi::class)
class ProjectionSnapshotRepositoryTest :
FunSpec({
data class TestData(
val store: EventStore<TestEvents, IdTest>,
val snapshotRepo: ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest>,
) : WithDataTestName {
override fun dataTestName(): String =
"${snapshotRepo::class.simpleName} with ${store::class.simpleName}"
}
val eventStores =
listOf(
EventStoreInMemory<TestEvents, IdTest>(),
)
val projectionRepo =
listOf(
::getSnapshotRepoInMemoryTest,
::getSnapshotRepoInRedisTest,
)
val list =
eventStores.flatMap { store ->
projectionRepo.map {
TestData(store, it(store, SnapshotConfig(2000)))
}
}
val nameFn: (TestData) -> String = { (eventStore, repo) ->
"${repo::class.simpleName} with ${eventStore::class.simpleName}"
}
context("when call applyAndPutToCache, the getUntil method must be use the built projection cache") {
withData(nameFn = nameFn, list) { (eventStore, repo) ->
val aggregateId = IdTest()
val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest())
eventStore.publish(eventOther)
repo.applyAndPutToCache(eventOther)
assertNotNull(repo.getUntil(eventOther)).also {
assertNotNull(it.value) shouldBeEqual "valOther"
}
val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId)
eventStore.publish(event1)
repo.applyAndPutToCache(event1)
assertNotNull(repo.getLast(event1.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId)
eventStore.publish(event2)
repo.applyAndPutToCache(event2)
assertNotNull(repo.getLast(event2.aggregateId)).also {
assertNotNull(it.value) shouldBeEqual "val1val2"
}
assertNotNull(repo.getUntil(event1)).also {
assertNotNull(it.value) shouldBeEqual "val1"
}
assertNotNull(repo.getUntil(event2)).also {
assertNotNull(it.value) shouldBeEqual "val1val2"
}
}
}
context("ProjectionSnapshotRepositoryInMemory should be thread safe") {
withData(list) { (eventStore, repo) ->
val aggregateId = IdTest()
val versionBuilder = VersionBuilderLocal()
val lock = ReentrantLock()
(0..9)
.map {
GlobalScope.launch {
(1..10).map {
val eventX =
lock.withLock {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)
.also { eventStore.publish(it) }
}
repo.applyAndPutToCache(eventX)
}
}
}.joinAll()
assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100
}
}
context("removeOldSnapshot") {
withData(list) { (eventStore, repo) ->
val versionBuilder = VersionBuilderLocal()
val aggregateId = IdTest()
fun buildEndSendEventX() {
EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId)
.also { eventStore.publish(it) }
.also { repo.applyAndPutToCache(it) }
}
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 1
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 2
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 3
buildEndSendEventX()
repo.getLast(aggregateId).num shouldBeEqual 4
}
}
})
@JvmInline
@Serializable
private value class IdTest(
@Serializable(with = UUIDSerializer::class)
override val id: UUID = UUID.randomUUID(),
) : AggregateId
@Serializable
private data class ProjectionTest(
override val aggregateId: IdTest,
override val lastEventVersion: Int = 0,
var value: String? = null,
var num: Int = 0,
) : Projection<IdTest>
private sealed interface TestEvents : Event<IdTest>
private data class Event1Test(
override val eventId: UUID = UUID.randomUUID(),
override val aggregateId: IdTest,
override val createdAt: Instant = Clock.System.now(),
override val version: Int,
val value1: String,
) : TestEvents
private data class Event2Test(
override val eventId: UUID = UUID.randomUUID(),
override val aggregateId: IdTest,
override val createdAt: Instant = Clock.System.now(),
override val version: Int,
val value2: String,
) : TestEvents
private data class EventXTest(
override val eventId: UUID = UUID.randomUUID(),
override val aggregateId: IdTest,
override val createdAt: Instant = Clock.System.now(),
override val version: Int,
val num: Int,
) : TestEvents
private fun getSnapshotRepoInMemoryTest(
eventStore: EventStore<TestEvents, IdTest>,
snapshotConfig: SnapshotConfig = SnapshotConfig(2000),
): ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest> =
ProjectionSnapshotRepositoryInMemory(
eventStore = eventStore,
initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) },
snapshotCacheConfig = snapshotConfig,
applyToProjection = apply,
)
private fun getSnapshotRepoInRedisTest(
eventStore: EventStore<TestEvents, IdTest>,
snapshotConfig: SnapshotConfig = SnapshotConfig(2000),
): ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest> =
ProjectionSnapshotRepositoryInRedis(
eventStore = eventStore,
jedis = JedisPooled("redis://localhost:6379"),
initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) },
snapshotCacheConfig = snapshotConfig,
projectionClass = ProjectionTest::class,
projectionToJson = { Json.encodeToString(it) },
jsonToProjection = { Json.decodeFromString(it) },
applyToProjection = apply,
)
private val apply: ProjectionTest.(TestEvents) -> ProjectionTest = { event ->
this.let { projection ->
when (event) {
is Event1Test -> {
projection.copy(value = (projection.value ?: "") + event.value1)
}
is Event2Test -> {
projection.copy(value = (projection.value ?: "") + event.value2)
}
is EventXTest -> {
projection.copy(num = projection.num + event.num)
}
}.copy(
lastEventVersion = event.version,
)
}
}