diff --git a/build.gradle.kts b/build.gradle.kts index f4b3284..76cdd3d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -118,6 +118,7 @@ dependencies { implementation("redis.clients:jedis:5.2.0") implementation("org.postgresql:postgresql:42.7.5") implementation("com.zaxxer:HikariCP:6.3.0") + implementation("com.rabbitmq:amqp-client:5.25.0") // Force version of sub library (for security) implementation("commons-codec:commons-codec:1.13") diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt new file mode 100644 index 0000000..6486468 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt @@ -0,0 +1,63 @@ +package eventDemo.libs.bus + +import com.rabbitmq.client.CancelCallback +import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.client.DeliverCallback +import com.rabbitmq.client.Delivery +import io.ktor.utils.io.core.toByteArray +import kotlinx.coroutines.runBlocking + +class BusInRabbitMQ( + private val connectionFactory: ConnectionFactory, + private val queueName: String, + private val objectToString: (E) -> String, + private val stringToObject: (String) -> E, +) : Bus { + init { + connectionFactory + .newConnection() + .createChannel() + .use { + it.queueDeclare( + queueName, + true, + false, + false, + emptyMap(), + ) + } + } + + override suspend fun publish(item: E) { + connectionFactory + .newConnection() + .createChannel() + .use { + it.basicPublish( + "", + queueName, + null, + objectToString(item).toByteArray(), + ) + } + } + + override fun subscribe( + priority: Int, + block: suspend (E) -> Unit, + ) { + connectionFactory + .newConnection() + .createChannel() + .basicConsume( + queueName, + true, + DeliverCallback { _: String, message: Delivery -> + runBlocking { + block(stringToObject(message.body.toString(Charsets.UTF_8))) + } + }, + CancelCallback {}, + ) + } +} diff --git a/src/test/kotlin/eventDemo/libs/bus/BusTest.kt b/src/test/kotlin/eventDemo/libs/bus/BusTest.kt new file mode 100644 index 0000000..fd4c597 --- /dev/null +++ b/src/test/kotlin/eventDemo/libs/bus/BusTest.kt @@ -0,0 +1,53 @@ +package eventDemo.libs.bus + +import com.rabbitmq.client.ConnectionFactory +import io.kotest.assertions.nondeterministic.until +import io.kotest.core.spec.style.FunSpec +import io.kotest.datatest.withData +import io.kotest.matchers.equals.shouldBeEqual +import kotlin.random.Random +import kotlin.time.Duration.Companion.seconds + +private data class ObjTest( + val value: String, +) + +class BusTest : + FunSpec({ + context("Pub/sub") { + val factory = + ConnectionFactory().apply { + host = "localhost" + port = 5672 + virtualHost = virtualHost + username = "event-demo" + password = "changeit" + } + val list: Map> = + mapOf( + BusInMemory::class.java.simpleName to BusInMemory(), + BusInRabbitMQ::class.java.simpleName to + BusInRabbitMQ( + factory, + "testQueue", + { it.value }, + { ObjTest(it) }, + ), + ) + + withData(list) { bus -> + val value = "hello${Random.nextInt()}" + var isCalled = false + + bus.subscribe { obj -> + isCalled = true + obj.value shouldBeEqual value + } + bus.publish(ObjTest(value)) + + until(3.seconds) { + isCalled shouldBeEqual true + } + } + } + })