diff --git a/src/main/kotlin/Application.kt b/src/main/kotlin/Application.kt index 3f05e38..dd8b391 100644 --- a/src/main/kotlin/Application.kt +++ b/src/main/kotlin/Application.kt @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.datatype.joda.JodaModule import com.github.jasync.sql.db.postgresql.exceptions.GenericDatabaseException import fr.dcproject.Env.PROD +import fr.dcproject.elasticsearch.configElasticIndexes import fr.dcproject.entity.* import fr.dcproject.event.EventNotification import fr.dcproject.event.EventSubscriber @@ -38,8 +39,6 @@ import io.ktor.routing.Routing import io.ktor.util.KtorExperimentalAPI import io.ktor.websocket.WebSockets import org.eclipse.jetty.util.log.Slf4jLog -import org.elasticsearch.client.Request -import org.elasticsearch.client.RestClient import org.koin.core.qualifier.named import org.koin.ktor.ext.Koin import org.koin.ktor.ext.get @@ -192,56 +191,7 @@ fun Application.module(env: Env = PROD) { } } - /* Create index if not exist */ - get().run { - if (performRequest(Request("HEAD", "/views?include_type_name=false")).statusLine.statusCode == 404) { - Request( - "PUT", - "/views?include_type_name=false" - ).apply { - //language=JSON - setJsonEntity( - """ - { - "settings": { - "number_of_shards": 5 - }, - "mappings": { - "properties": { - "logged": { - "type": "boolean" - }, - "type": { - "type": "keyword" - }, - "user_ref": { - "type": "keyword" - }, - "id": { - "type": "keyword" - }, - "version_id": { - "type": "keyword" - }, - "ip": { - "type": "keyword" - }, - "citizen_id": { - "type": "keyword" - }, - "view_at": { - "type": "date" - } - } - } - } - """.trimIndent() - ) - }.let { - performRequest(it) - } - } - } + configElasticIndexes(get()) install(WebSockets) { pingPeriod = Duration.ofSeconds(60) // Disabled (null) by default diff --git a/src/main/kotlin/elasticsearch/Config.kt b/src/main/kotlin/elasticsearch/Config.kt new file mode 100644 index 0000000..ecc2abf --- /dev/null +++ b/src/main/kotlin/elasticsearch/Config.kt @@ -0,0 +1,83 @@ +package fr.dcproject.elasticsearch + +import org.elasticsearch.client.Request +import org.elasticsearch.client.RestClient +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +fun waitElasticsearchIsUp(client: RestClient) { + val logger: Logger = LoggerFactory.getLogger("fr.dcproject.elasticsearch") + val request = Request("GET", "/_cluster/health") + repeat(40) { + runCatching { + client.performRequest(request).statusLine.statusCode + }.onSuccess { + if (it == 200) { + logger.debug("Elasticsearch is Ready! Continue...") + return + } else { + logger.debug("sleep 2s and retry...") + Thread.sleep(2000) + } + }.onFailure { + logger.debug("${it.message}, sleep 2s and retry...") + Thread.sleep(2000) + } + } + error("Elasticsearch is not ready") +} + +fun configElasticIndexes(client: RestClient) { + waitElasticsearchIsUp(client) + + /* Create index if not exist */ + client.run { + if (performRequest(Request("HEAD", "/views?include_type_name=false")).statusLine.statusCode == 404) { + Request( + "PUT", + "/views?include_type_name=false" + ).apply { + //language=JSON + setJsonEntity( + """ + { + "settings": { + "number_of_shards": 5 + }, + "mappings": { + "properties": { + "logged": { + "type": "boolean" + }, + "type": { + "type": "keyword" + }, + "user_ref": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "version_id": { + "type": "keyword" + }, + "ip": { + "type": "keyword" + }, + "citizen_id": { + "type": "keyword" + }, + "view_at": { + "type": "date" + } + } + } + } + """.trimIndent() + ) + }.let { + performRequest(it) + } + } + } +} \ No newline at end of file