* Thread Safety: This class is immutable and thread-safe. All fields are final
* and the class provides no methods to modify its state after construction.
- *
+ *
+ *
* @since 2.7.0
*/
public final class ScriptMetadata {
@@ -40,10 +43,10 @@ public final class ScriptMetadata {
private final String contentHash;
private final long creationTime;
private final Class extends Script> compiledClass;
-
+
/**
* Constructs a new ScriptMetadata instance.
- *
+ *
* @param actionName the unique name/identifier of the action
* @param scriptContent the raw Groovy script content
* @param compiledClass the compiled Groovy script class
@@ -59,17 +62,17 @@ public ScriptMetadata(String actionName, String scriptContent, Class extends S
if (compiledClass == null) {
throw new IllegalArgumentException("Compiled class cannot be null");
}
-
+
this.actionName = actionName;
this.scriptContent = scriptContent;
this.contentHash = calculateHash(scriptContent);
this.creationTime = System.currentTimeMillis();
this.compiledClass = compiledClass;
}
-
+
/**
* Calculates SHA-256 hash of the given content.
- *
+ *
* @param content the content to hash
* @return Base64 encoded SHA-256 hash
* @throws RuntimeException if SHA-256 algorithm is not available
@@ -83,13 +86,14 @@ private String calculateHash(String content) {
throw new RuntimeException("SHA-256 algorithm not available", e);
}
}
-
+
/**
* Determines if the script content has changed compared to new content.
*
* This method uses SHA-256 hash comparison for efficient change detection
* without storing or comparing the full script content.
- *
+ *
+ *
* @param newContent the new script content to compare against
* @return {@code true} if content has changed, {@code false} if unchanged
* @throws IllegalArgumentException if newContent is null
@@ -100,53 +104,53 @@ public boolean hasChanged(String newContent) {
}
return !contentHash.equals(calculateHash(newContent));
}
-
+
/**
* Returns the action name/identifier.
- *
+ *
* @return the action name, never null
*/
- public String getActionName() {
- return actionName;
+ public String getActionName() {
+ return actionName;
}
-
+
/**
* Returns the original script content.
- *
+ *
* @return the script content, never null
*/
- public String getScriptContent() {
- return scriptContent;
+ public String getScriptContent() {
+ return scriptContent;
}
-
+
/**
* Returns the SHA-256 hash of the script content.
- *
+ *
* @return Base64 encoded content hash, never null
*/
- public String getContentHash() {
- return contentHash;
+ public String getContentHash() {
+ return contentHash;
}
-
+
/**
* Returns the timestamp when this metadata was created.
- *
+ *
* @return creation timestamp in milliseconds since epoch
*/
- public long getCreationTime() {
- return creationTime;
+ public long getCreationTime() {
+ return creationTime;
}
-
+
/**
* Returns the compiled Groovy script class.
*
* This class can be used to create new script instances for execution
* without requiring recompilation.
- *
- *
+ *
+ *
* @return the compiled script class, never null
*/
- public Class extends Script> getCompiledClass() {
- return compiledClass;
+ public Class extends Script> getCompiledClass() {
+ return compiledClass;
}
-}
+}
\ No newline at end of file
diff --git a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/GroovyActionsService.java b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/GroovyActionsService.java
index ed2c2f2130..1b4bf14be2 100644
--- a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/GroovyActionsService.java
+++ b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/GroovyActionsService.java
@@ -27,6 +27,8 @@
* This service provides functionality to load, compile, cache, and execute
* Groovy scripts as actions within the Apache Unomi framework. It implements
* optimized compilation and caching strategies to achieve high performance.
+ *
+ *
*
* Key features:
*
@@ -35,9 +37,12 @@
*
Thread-safe compilation and execution
*
Unified caching architecture for compiled scripts
*
+ *
+ *
*
* Thread Safety: Implementations must be thread-safe as this service
* is accessed concurrently during script execution.
+ *
*
* @see GroovyAction
* @see ScriptMetadata
@@ -51,6 +56,7 @@ public interface GroovyActionsService {
* This method compiles the script, validates it has the required
* annotations, persists it, and updates the internal cache.
* If the script content hasn't changed, recompilation is skipped.
+ *
*
* @param actionName the unique identifier for the action
* @param groovyScript the Groovy script source code
@@ -64,6 +70,7 @@ public interface GroovyActionsService {
*
* This method removes the action from both the cache and persistent storage,
* and cleans up any registered action types in the definitions service.
+ *
*
* @param actionName the unique identifier of the action to remove
* @throws IllegalArgumentException if id is null
@@ -76,6 +83,7 @@ public interface GroovyActionsService {
* This is the preferred method for script execution as it returns
* pre-compiled classes without any compilation overhead. Returns
* {@code null} if the script is not found in the cache.
+ *
*
* @param actionName the unique identifier of the action
* @return the compiled script class, or {@code null} if not found in cache
@@ -89,6 +97,7 @@ public interface GroovyActionsService {
* The returned metadata includes content hash, compilation timestamp,
* and the compiled class reference. This is useful for monitoring
* tools and debugging.
+ *
*
* @param actionName the unique identifier of the action
* @return the script metadata, or {@code null} if not found
diff --git a/kar/pom.xml b/kar/pom.xml
index bc5bca789e..1b012cd55d 100644
--- a/kar/pom.xml
+++ b/kar/pom.xml
@@ -71,7 +71,7 @@
org.apache.unomi
- unomi-persistence-elasticsearch-core
+ unomi-persistence-elasticsearch-9-core
@@ -152,7 +152,6 @@
httpclient-osgi
-
diff --git a/kar/src/main/feature/feature.xml b/kar/src/main/feature/feature.xml
index 4c64d920f8..9680067188 100644
--- a/kar/src/main/feature/feature.xml
+++ b/kar/src/main/feature/feature.xml
@@ -39,7 +39,7 @@
spiflyshell-compatmvn:org.apache.unomi/unomi-wab/${project.version}/cfg/unomicfg
- mvn:org.apache.unomi/unomi-persistence-elasticsearch-core/${project.version}/cfg/elasticsearchcfg
+ mvn:org.apache.unomi/unomi-persistence-elasticsearch-9-core/${project.version}/cfg/elasticsearchcfgmvn:org.apache.unomi/unomi-services/${project.version}/cfg/servicescfgmvn:org.apache.unomi/unomi-services/${project.version}/cfg/thirdpartycfgmvn:org.apache.unomi/unomi-services/${project.version}/cfg/clustercfg
@@ -82,7 +82,9 @@
mvn:org.apache.unomi/unomi-scripting/${project.version}mvn:org.apache.unomi/unomi-metrics/${project.version}mvn:org.apache.unomi/unomi-persistence-spi/${project.version}
- mvn:org.apache.unomi/unomi-persistence-elasticsearch-core/${project.version}
+
+ mvn:org.apache.unomi/unomi-persistence-elasticsearch-9-core/${project.version}
+ mvn:org.apache.unomi/unomi-persistence-elasticsearch-9-conditions/${project.version}mvn:org.apache.unomi/unomi-services/${project.version}mvn:org.apache.unomi/unomi-json-schema-services/${project.version}mvn:org.apache.unomi/unomi-json-schema-rest/${project.version}
diff --git a/lifecycle-watcher/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/lifecycle-watcher/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index c7eb443c60..47c7351e43 100644
--- a/lifecycle-watcher/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/lifecycle-watcher/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -44,7 +44,8 @@
-
+
+
diff --git a/metrics/pom.xml b/metrics/pom.xml
index aed1b61b95..c84b45d614 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -62,12 +62,6 @@
unomi-commonprovided
-
- org.apache.unomi
- unomi-persistence-spi
- provided
-
-
com.fasterxml.jackson.corejackson-core
diff --git a/metrics/src/main/java/org/apache/unomi/metrics/commands/ViewCommand.java b/metrics/src/main/java/org/apache/unomi/metrics/commands/ViewCommand.java
index b3cefe84c1..1d7b74737f 100644
--- a/metrics/src/main/java/org/apache/unomi/metrics/commands/ViewCommand.java
+++ b/metrics/src/main/java/org/apache/unomi/metrics/commands/ViewCommand.java
@@ -18,10 +18,10 @@
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.apache.unomi.metrics.Metric;
-import org.apache.unomi.persistence.spi.CustomObjectMapper;
@Command(scope = "metrics", name = "view", description = "This will display all the data for a single metric ")
public class ViewCommand extends MetricsCommandSupport{
@@ -40,7 +40,7 @@ protected Object doExecute() throws Exception {
// the caller values easier to read.
DefaultPrettyPrinter defaultPrettyPrinter = new DefaultPrettyPrinter();
defaultPrettyPrinter = defaultPrettyPrinter.withArrayIndenter(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE);
- String jsonMetric = CustomObjectMapper.getObjectMapper().writer(defaultPrettyPrinter).writeValueAsString(metric);
+ String jsonMetric = new ObjectMapper().writer(defaultPrettyPrinter).writeValueAsString(metric);
System.out.println(jsonMetric);
return null;
}
diff --git a/package/pom.xml b/package/pom.xml
index 97cb7bb953..4a53de4b27 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -149,7 +149,7 @@
org.elasticsearch.pluginreindex-client
- ${elasticsearch.version}
+ ${old.elasticsearch.version}${project.build.directory}/assembly/elasticsearch/modules/reindex
@@ -173,7 +173,7 @@
org.apache.unomi
- unomi-persistence-elasticsearch-core
+ unomi-persistence-elasticsearch-9-coreelasticsearchcfgcfg${project.build.directory}/assembly/etc
@@ -318,7 +318,7 @@
unomi-karunomi-router-karaf-featureunomi-groovy-actions
- unomi-rest-ui
+
17
diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index 0e3cd9e5c6..3d2e57927b 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -99,18 +99,12 @@ org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy=${env:UNOMI_ELASTICSEARCH
org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-}
org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context}
-# These monthlyIndex properties are now deprecated, please use rollover equivalent.
-org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5}
-org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_REPLICAS:-0}
-org.apache.unomi.elasticsearch.monthlyIndex.indexMappingTotalFieldsLimit=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_MAPPINGTOTALFIELDSLIMIT:-1000}
-org.apache.unomi.elasticsearch.monthlyIndex.indexMaxDocValueFieldsSearch=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_MAXDOCVALUEFIELDSSEARCH:-1000}
-org.apache.unomi.elasticsearch.monthlyIndex.itemsMonthlyIndexedOverride=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_ITEMSMONTHLYINDEXED:-event,session}
-# New rollover properties (it overrides monthlyIndex values)
-org.apache.unomi.elasticsearch.rollover.nbShards=${env:UNOMI_ELASTICSEARCH_ROLLOVER_SHARDS}
-org.apache.unomi.elasticsearch.rollover.nbReplicas=${env:UNOMI_ELASTICSEARCH_ROLLOVER_REPLICAS}
-org.apache.unomi.elasticsearch.rollover.indexMappingTotalFieldsLimit=${env:UNOMI_ELASTICSEARCH_ROLLOVER_MAPPINGTOTALFIELDSLIMIT}
-org.apache.unomi.elasticsearch.rollover.indexMaxDocValueFieldsSearch=${env:UNOMI_ELASTICSEARCH_ROLLOVER_MAXDOCVALUEFIELDSSEARCH}
-org.apache.unomi.elasticsearch.rollover.indices=${env:UNOMI_ELASTICSEARCH_ROLLOVER_INDICES}
+# Rollover properties
+org.apache.unomi.elasticsearch.rollover.nbShards=${env:UNOMI_ELASTICSEARCH_ROLLOVER_SHARDS:-5}
+org.apache.unomi.elasticsearch.rollover.nbReplicas=${env:UNOMI_ELASTICSEARCH_ROLLOVER_REPLICAS:-0}
+org.apache.unomi.elasticsearch.rollover.indexMappingTotalFieldsLimit=${env:UNOMI_ELASTICSEARCH_ROLLOVER_MAPPINGTOTALFIELDSLIMIT:-1000}
+org.apache.unomi.elasticsearch.rollover.indexMaxDocValueFieldsSearch=${env:UNOMI_ELASTICSEARCH_ROLLOVER_MAXDOCVALUEFIELDSSEARCH:-1000}
+org.apache.unomi.elasticsearch.rollover.indices=${env:UNOMI_ELASTICSEARCH_ROLLOVER_INDICES:-event,session}
# Rollover configuration
org.apache.unomi.elasticsearch.rollover.maxSize=${env:UNOMI_ELASTICSEARCH_ROLLOVER_MAXSIZE:-30gb}
@@ -146,8 +140,8 @@ org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSE
# The values used here are the default values of the API
org.apache.unomi.elasticsearch.bulkProcessor.concurrentRequests=${env:UNOMI_ELASTICSEARCH_BULK_CONCURRENTREQUESTS:-1}
org.apache.unomi.elasticsearch.bulkProcessor.bulkActions=${env:UNOMI_ELASTICSEARCH_BULK_ACTIONS:-1000}
-org.apache.unomi.elasticsearch.bulkProcessor.bulkSize=${env:UNOMI_ELASTICSEARCH_BULK_SIZE:-5MB}
-org.apache.unomi.elasticsearch.bulkProcessor.flushInterval=${env:UNOMI_ELASTICSEARCH_BULK_FLUSHINTERVAL:-5s}
+org.apache.unomi.elasticsearch.bulkProcessor.bulkSize=${env:UNOMI_ELASTICSEARCH_BULK_SIZE:-5}
+org.apache.unomi.elasticsearch.bulkProcessor.flushInterval=${env:UNOMI_ELASTICSEARCH_BULK_FLUSHINTERVAL:-5}
org.apache.unomi.elasticsearch.bulkProcessor.backoffPolicy=${env:UNOMI_ELASTICSEARCH_BULK_BACKOFFPOLICY:-exponential}
# Errors
org.apache.unomi.elasticsearch.throwExceptions=${env:UNOMI_ELASTICSEARCH_THROW_EXCEPTIONS:-false}
diff --git a/persistence-elasticsearch-9/conditions/pom.xml b/persistence-elasticsearch-9/conditions/pom.xml
new file mode 100644
index 0000000000..0c41219274
--- /dev/null
+++ b/persistence-elasticsearch-9/conditions/pom.xml
@@ -0,0 +1,198 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.unomi
+ unomi-persistence-elasticsearch-9
+ 3.0.0-SNAPSHOT
+
+
+ unomi-persistence-elasticsearch-9-conditions
+ Apache Unomi :: Persistence :: ElasticSearch :: Conditions
+ Conditions ElasticSearch persistence implementation for the Apache Unomi Context Server
+ bundle
+
+
+
+
+ org.apache.unomi
+ unomi-bom
+ ${project.version}
+ pom
+ import
+
+
+
+
+
+ org.osgi
+ osgi.core
+ provided
+
+
+ org.apache.unomi
+ unomi-api
+ ${project.version}
+ provided
+
+
+ org.apache.unomi
+ unomi-common
+ ${project.version}
+ provided
+
+
+ org.apache.unomi
+ unomi-persistence-spi
+ ${project.version}
+ provided
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ commons-collections
+ commons-collections
+
+
+ commons-io
+ commons-io
+
+
+ co.elastic.clients
+ elasticsearch-java
+ provided
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ provided
+
+
+ org.apache.unomi
+ unomi-metrics
+ ${project.version}
+ provided
+
+
+
+ junit
+ junit
+ test
+
+
+ com.hazelcast
+ hazelcast-all
+ 3.12.8
+ provided
+
+
+
+ org.apache.unomi
+ unomi-scripting
+ ${project.version}
+ provided
+
+
+
+ org.apache.unomi
+ unomi-persistence-elasticsearch-9-core
+ ${project.version}
+ provided
+
+
+
+ joda-time
+ joda-time
+ 2.12.7
+
+
+
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ true
+
+
+
+ co.elastic.clients.elasticsearch._types,
+ co.elastic.clients.elasticsearch._types.query_dsl,
+ co.elastic.clients.elasticsearch._types.query_dsl.Query,
+ co.elastic.clients.util,
+ android.os;resolution:=optional,
+ com.conversantmedia.util.concurrent;resolution:=optional,
+ com.google.appengine.api;resolution:=optional,
+ com.google.appengine.api.utils;resolution:=optional,
+ com.google.apphosting.api;resolution:=optional,
+ jakarta.enterprise.context.spi;resolution:=optional,
+ jakarta.enterprise.inject.spi;resolution:=optional,
+ jdk.net;resolution:=optional,
+ org.apache.avalon.framework.logger;resolution:=optional,
+ org.apache.log;resolution:=optional,
+ org.apache.log4j,
+ org.brotli.dec;resolution:=optional,
+ org.conscrypt;resolution:=optional,
+ org.glassfish.hk2.osgiresourcelocator;resolution:=optional,
+ org.ietf.jgss;resolution:=optional,
+ org.joda.convert;resolution:=optional,
+ org.reactivestreams;resolution:=optional,
+ software.amazon.awssdk.auth.credentials;resolution:=optional,
+ software.amazon.awssdk.core.async;resolution:=optional,
+ software.amazon.awssdk.http;resolution:=optional,
+ software.amazon.awssdk.http.async;resolution:=optional,
+ software.amazon.awssdk.http.auth.aws.signer;resolution:=optional,
+ software.amazon.awssdk.http.auth.spi.signer;resolution:=optional,
+ software.amazon.awssdk.identity.spi;resolution:=optional,
+ software.amazon.awssdk.regions;resolution:=optional,
+ software.amazon.awssdk.utils;resolution:=optional,
+ software.amazon.awssdk.utils.builder;resolution:=optional,
+ sun.misc;resolution:=optional,
+ sun.nio.ch;resolution:=optional,
+ io.opentelemetry.sdk.autoconfigure;resolution:=optional,
+ jakarta.json.bind;resolution:=optional,
+ jakarta.json.bind.spi;resolution:=optional,
+ *
+
+ *;scope=compile|runtime
+ true
+
+
+
+
+
+
+
+
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/GeoLocationByPointSessionConditionESQueryBuilder.java b/persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/GeoLocationByPointSessionConditionESQueryBuilder.java
similarity index 63%
rename from plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/GeoLocationByPointSessionConditionESQueryBuilder.java
rename to persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/GeoLocationByPointSessionConditionESQueryBuilder.java
index 21d27b1270..0c832f128d 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/GeoLocationByPointSessionConditionESQueryBuilder.java
+++ b/persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/GeoLocationByPointSessionConditionESQueryBuilder.java
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.unomi.plugins.baseplugin.conditions;
+package org.apache.unomi.persistence.elasticsearch9.querybuilders.advanced;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import org.apache.unomi.api.conditions.Condition;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
+import org.apache.unomi.persistence.elasticsearch9.ConditionESQueryBuilder;
+import org.apache.unomi.persistence.elasticsearch9.ConditionESQueryBuilderDispatcher;
import java.util.Map;
public class GeoLocationByPointSessionConditionESQueryBuilder implements ConditionESQueryBuilder {
@Override
- public QueryBuilder buildQuery(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher) {
+ public Query buildQuery(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher) {
String type = (String) condition.getParameter("type");
String name = condition.getParameter("name") == null ? "location" : (String) condition.getParameter("name");
@@ -37,9 +36,7 @@ public QueryBuilder buildQuery(Condition condition, Map context,
String distance = condition.getParameter("distance").toString();
if(circleLatitude != null && circleLongitude != null && distance != null) {
- return QueryBuilders.geoDistanceQuery(name)
- .point(circleLatitude, circleLongitude)
- .distance(distance);
+ return Query.of(q -> q.geoDistance(g -> g.field(name).location(l -> l.latlon(latlong -> latlong.lat(circleLatitude).lon(circleLongitude))).distance(distance)));
}
} else if("rectangle".equals(type)) {
Double rectLatitudeNE = (Double) condition.getParameter("rectLatitudeNE");
@@ -48,8 +45,18 @@ public QueryBuilder buildQuery(Condition condition, Map context,
Double rectLongitudeSW = (Double) condition.getParameter("rectLongitudeSW");
if(rectLatitudeNE != null && rectLongitudeNE != null && rectLatitudeSW != null && rectLongitudeSW != null) {
- return QueryBuilders.geoBoundingBoxQuery(name)
- .setCorners(rectLatitudeNE, rectLongitudeNE,rectLatitudeSW, rectLongitudeSW);
+ return Query.of(q -> q.geoBoundingBox(g -> g
+ .field(name)
+ .boundingBox(b -> b
+ .coords(c -> c
+ .top(rectLatitudeNE)
+ .left(rectLongitudeNE)
+ .bottom(rectLatitudeSW)
+ .right(rectLongitudeSW)
+ )
+ )
+ )
+ );
}
}
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/IdsConditionESQueryBuilder.java b/persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/IdsConditionESQueryBuilder.java
similarity index 64%
rename from plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/IdsConditionESQueryBuilder.java
rename to persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/IdsConditionESQueryBuilder.java
index d535c1b650..adb5f848f9 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/IdsConditionESQueryBuilder.java
+++ b/persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/IdsConditionESQueryBuilder.java
@@ -14,15 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.unomi.plugins.baseplugin.conditions;
+package org.apache.unomi.persistence.elasticsearch9.querybuilders.advanced;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import org.apache.unomi.api.conditions.Condition;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
+import org.apache.unomi.persistence.elasticsearch9.ConditionESQueryBuilder;
+import org.apache.unomi.persistence.elasticsearch9.ConditionESQueryBuilderDispatcher;
import java.util.Collection;
import java.util.Map;
@@ -35,8 +32,9 @@ public void setMaximumIdsQueryCount(int maximumIdsQueryCount) {
this.maximumIdsQueryCount = maximumIdsQueryCount;
}
+
@Override
- public QueryBuilder buildQuery(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher) {
+ public Query buildQuery(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher) {
Collection ids = (Collection) condition.getParameter("ids");
Boolean match = (Boolean) condition.getParameter("match");
@@ -45,13 +43,11 @@ public QueryBuilder buildQuery(Condition condition, Map context,
throw new UnsupportedOperationException("Too many profiles");
}
- IdsQueryBuilder idsQueryBuilder = QueryBuilders.idsQuery().addIds(ids.toArray(new String[0]));
+ Query idsQuery = Query.of(q -> q.ids(i -> i.values(ids.stream().toList())));
if (match) {
- return idsQueryBuilder;
+ return idsQuery;
} else {
- BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
- boolQuery.mustNot(idsQueryBuilder);
- return boolQuery;
+ return Query.of(q -> q.bool(b -> b.mustNot(idsQuery)));
}
}
}
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/PastEventConditionESQueryBuilder.java
similarity index 89%
rename from plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
rename to persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/PastEventConditionESQueryBuilder.java
index 6c40f9f874..935eda6fe6 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/persistence-elasticsearch-9/conditions/src/main/java/org/apache/unomi/persistence/elasticsearch9/querybuilders/advanced/PastEventConditionESQueryBuilder.java
@@ -15,26 +15,30 @@
* limitations under the License.
*/
-package org.apache.unomi.plugins.baseplugin.conditions;
+package org.apache.unomi.persistence.elasticsearch9.querybuilders.advanced;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.Profile;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.services.DefinitionsService;
import org.apache.unomi.api.services.SegmentService;
import org.apache.unomi.api.utils.ConditionBuilder;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher;
+import org.apache.unomi.persistence.elasticsearch9.ConditionESQueryBuilder;
+import org.apache.unomi.persistence.elasticsearch9.ConditionESQueryBuilderDispatcher;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
+import org.apache.unomi.persistence.spi.conditions.ConditionContextHelper;
+import org.apache.unomi.persistence.spi.conditions.PastEventConditionPersistenceQueryBuilder;
import org.apache.unomi.scripting.ScriptExecutor;
-import org.elasticsearch.index.query.*;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
import java.util.*;
import java.util.stream.Collectors;
-public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder {
+public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder, PastEventConditionPersistenceQueryBuilder {
private DefinitionsService definitionsService;
private PersistenceService persistenceService;
@@ -45,6 +49,8 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder
private int aggregateQueryBucketSize = 5000;
private boolean pastEventsDisablePartitions = false;
+ private final DateTimeFormatter dateTimeFormatter = ISODateTimeFormat.dateTime();
+
public void setDefinitionsService(DefinitionsService definitionsService) {
this.definitionsService = definitionsService;
}
@@ -74,7 +80,7 @@ public void setSegmentService(SegmentService segmentService) {
}
@Override
- public QueryBuilder buildQuery(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher) {
+ public Query buildQuery(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher) {
boolean eventsOccurred = getStrategyFromOperator((String) condition.getParameter("operator"));
int minimumEventCount = !eventsOccurred || condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount");
int maximumEventCount = !eventsOccurred || condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount");
@@ -117,7 +123,7 @@ public long count(Condition condition, Map context, ConditionESQ
}
}
- protected static boolean getStrategyFromOperator(String operator) {
+ public boolean getStrategyFromOperator(String operator) {
if (operator != null && !operator.equals("eventsOccurred") && !operator.equals("eventsNotOccurred")) {
throw new UnsupportedOperationException("Unsupported operator: " + operator + ", please use either 'eventsOccurred' or 'eventsNotOccurred'");
}
@@ -198,7 +204,7 @@ private Set getProfileIdsMatchingEventCount(Condition eventCondition, in
}
}
- protected static Condition getEventCondition(Condition condition, Map context, String profileId,
+ public Condition getEventCondition(Condition condition, Map context, String profileId,
DefinitionsService definitionsService, ScriptExecutor scriptExecutor) {
Condition eventCondition;
try {
@@ -227,8 +233,24 @@ protected static Condition getEventCondition(Condition condition, Map
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.unomi.persistence.elasticsearch9.ConditionESQueryBuilder
+ org.apache.unomi.persistence.spi.conditions.PastEventConditionPersistenceQueryBuilder
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch-9/core/pom.xml
similarity index 82%
rename from persistence-elasticsearch/core/pom.xml
rename to persistence-elasticsearch-9/core/pom.xml
index e9c990b3d7..f3eaec7bf8 100644
--- a/persistence-elasticsearch/core/pom.xml
+++ b/persistence-elasticsearch-9/core/pom.xml
@@ -20,14 +20,16 @@
4.0.0org.apache.unomi
- unomi-persistence-elasticsearch
+ unomi-persistence-elasticsearch-93.0.0-SNAPSHOT
- unomi-persistence-elasticsearch-core
- Apache Unomi :: Persistence :: ElasticSearch :: Core
- Core ElasticSearch persistence implementation for the Apache Unomi Context Server
+
+ unomi-persistence-elasticsearch-9-core
+ Apache Unomi :: Persistence :: ElasticSearch 9 :: Core
+ Core ElasticSearch 9 persistence implementation for the Apache Unomi Context Serverbundle
+
@@ -78,6 +80,14 @@
provided
+
+ co.elastic.clients
+ elasticsearch-java
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ com.hazelcasthazelcast-all
@@ -94,6 +104,11 @@
jackson-databindprovided
+
+ com.fasterxml.jackson.core
+ jackson-core
+ provided
+ org.apache.commonscommons-lang3
@@ -106,10 +121,10 @@
- org.elasticsearch.client
- elasticsearch-rest-high-level-client
+ joda-time
+ joda-time
+ provided
-
org.apache.logging.log4j
@@ -152,16 +167,6 @@
junittest
-
- org.elasticsearch.test
- framework
- test
-
-
- org.elasticsearch.plugin
- transport-netty4-client
- test
- org.apache.lucenelucene-test-framework
@@ -246,6 +251,7 @@
org.apache.unomi.metrics,
org.apache.unomi.persistence.spi.aggregate,
org.apache.unomi.persistence.spi,
+ org.apache.unomi.persistence.spi.conditions,
org.codehaus.stax2;resolution:=optional,
org.elasticsearch.*;resolution:=optional,
org.ietf.jgss;resolution:=optional,
@@ -261,13 +267,34 @@
org.xml.sax.ext;resolution:=optional,
org.xml.sax.helpers;resolution:=optional,
org.zeromq;resolution:=optional,
+ org.conscrypt;resolution:=optional,
+ org.glassfish.hk2.osgiresourcelocator;resolution:=optional,
+ org.brotli.dec;resolution:=optional,
+ io.opentelemetry.opentelemetry-api;resolution:=optional,
+ io.opentelemetry.sdk.autoconfigure;resolution:=optional,
+ io.opentelemetry.opentelemetry-context;resolution:=optional,
+ io.opentelemetry.opentelemetry-sdk;resolution:=optional,
+ io.opentelemetry.opentelemetry-sdk-extension-autoconfigure;resolution:=optional,
+ org.apache.httpcomponents.client5.httpclient5;resolution:=optional,
+ org.apache.httpcomponents.core5.httpcore5;resolution:=optional,
+ org.apache.httpcomponents.core5.httpcore5-h2;resolution:=optional,
+ jakarta.json.bind;resolution:=optional,
+ jakarta.json.bind.spi;resolution:=optional,
+ jakarta.json.spi;resolution:=optional,
+ jakarta.json.stream;resolution:=optional,
+ org.eclipse.parsson.parsson;resolution:=optional,
+ org.eclipse.yasson;resolution:=optional,
+ org.apache.http.impl.nio.client;resolution:=optional,
*
- org.elasticsearch.*;version="${elasticsearch.version}",
- org.elasticsearch.index.query.*;version="${elasticsearch.version}",
org.apache.lucene.search.join.*;version="${lucene.version}",
- org.apache.unomi.persistence.elasticsearch.conditions;version="${project.version}"
+ org.apache.unomi.persistence.elasticsearch9;version="${project.version}",
+ co.elastic.clients.elasticsearch;version="${elasticsearch.version}",
+ co.elastic.clients.elasticsearch._types.query_dsl.Query;version="${elasticsearch.version}",
+ co.elastic.clients.elasticsearch._types.query_dsl;version="${elasticsearch.version}",
+ co.elastic.clients.util;version="${elasticsearch.version}",
+ co.elastic.clients.elasticsearch._types;version="${elasticsearch.version}"
*;scope=compile|runtimetrue
@@ -302,4 +329,4 @@
-
+
\ No newline at end of file
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ConditionESQueryBuilder.java
similarity index 82%
rename from persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
rename to persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ConditionESQueryBuilder.java
index 7a100d05a6..0a9c8ca35e 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
+++ b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ConditionESQueryBuilder.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.unomi.persistence.elasticsearch.conditions;
+package org.apache.unomi.persistence.elasticsearch9;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import org.apache.unomi.api.conditions.Condition;
-import org.elasticsearch.index.query.QueryBuilder;
import java.util.Map;
public interface ConditionESQueryBuilder {
- QueryBuilder buildQuery(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher);
+ Query buildQuery(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher);
default long count(Condition condition, Map context, ConditionESQueryBuilderDispatcher dispatcher) {
throw new UnsupportedOperationException();
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ConditionESQueryBuilderDispatcher.java
similarity index 85%
rename from persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
rename to persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ConditionESQueryBuilderDispatcher.java
index 010811549d..f334955597 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
+++ b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ConditionESQueryBuilderDispatcher.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.unomi.persistence.elasticsearch.conditions;
+package org.apache.unomi.persistence.elasticsearch9;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.persistence.spi.conditions.ConditionContextHelper;
import org.apache.unomi.scripting.ScriptExecutor;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,16 +54,18 @@ public String getQuery(Condition condition) {
return "{\"query\": " + getQueryBuilder(condition).toString() + "}";
}
- public QueryBuilder getQueryBuilder(Condition condition) {
- return QueryBuilders.boolQuery().must(QueryBuilders.matchAllQuery()).filter(buildFilter(condition));
+ public Query getQueryBuilder(Condition condition) {
+ Query.Builder qb = new Query.Builder();
+ return qb.bool(b -> b.must(Query.of(q -> q.matchAll(m -> m))).filter(buildFilter(condition))).build();
+
}
- public QueryBuilder buildFilter(Condition condition) {
- return buildFilter(condition, new HashMap());
+ public Query buildFilter(Condition condition) {
+ return buildFilter(condition, new HashMap<>());
}
- public QueryBuilder buildFilter(Condition condition, Map context) {
- if(condition == null || condition.getConditionType() == null) {
+ public Query buildFilter(Condition condition, Map context) {
+ if (condition == null || condition.getConditionType() == null) {
throw new IllegalArgumentException("Condition is null or doesn't have type, impossible to build filter");
}
@@ -89,7 +91,7 @@ public QueryBuilder buildFilter(Condition condition, Map context
LOGGER.debug("No matching query builder for condition {} and context {}", condition, context);
}
- return QueryBuilders.matchAllQuery();
+ return Query.of(q -> q.matchAll(m -> m));
}
public long count(Condition condition) {
@@ -97,7 +99,7 @@ public long count(Condition condition) {
}
public long count(Condition condition, Map context) {
- if(condition == null || condition.getConditionType() == null) {
+ if (condition == null || condition.getConditionType() == null) {
throw new IllegalArgumentException("Condition is null or doesn't have type, impossible to build filter");
}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESCustomObjectMapper.java b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESCustomObjectMapper.java
similarity index 77%
rename from persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESCustomObjectMapper.java
rename to persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESCustomObjectMapper.java
index 0048398f91..17f0f9732a 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESCustomObjectMapper.java
+++ b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESCustomObjectMapper.java
@@ -14,16 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.unomi.persistence.elasticsearch;
+package org.apache.unomi.persistence.elasticsearch9;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.Item;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
+import java.util.Map;
+
/**
- * This CustomObjectMapper is used to avoid the version parameter to be registered in ES
- * @author dgaillard
+ * This CustomObjectMapper is used to avoid the version parameter to be registered in Elasticsearch
*/
public class ESCustomObjectMapper extends CustomObjectMapper {
@@ -31,8 +34,11 @@ public class ESCustomObjectMapper extends CustomObjectMapper {
public ESCustomObjectMapper() {
super();
+ super.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
this.addMixIn(Item.class, ESItemMixIn.class);
this.addMixIn(Event.class, ESEventMixIn.class);
+ this.configOverride(Map.class)
+ .setInclude(JsonInclude.Value.construct(JsonInclude.Include.ALWAYS, JsonInclude.Include.ALWAYS));
}
public static ObjectMapper getObjectMapper() {
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESEventMixIn.java b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESEventMixIn.java
similarity index 95%
rename from persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESEventMixIn.java
rename to persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESEventMixIn.java
index 76fb9393cf..8dad1f89b0 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESEventMixIn.java
+++ b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESEventMixIn.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.unomi.persistence.elasticsearch;
+package org.apache.unomi.persistence.elasticsearch9;
import com.fasterxml.jackson.annotation.JsonIgnore;
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESItemMixIn.java b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESItemMixIn.java
similarity index 93%
rename from persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESItemMixIn.java
rename to persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESItemMixIn.java
index c20d8ef9dd..e72a225ddc 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ESItemMixIn.java
+++ b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ESItemMixIn.java
@@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.unomi.persistence.elasticsearch;
+package org.apache.unomi.persistence.elasticsearch9;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* This mixin is used in ESCustomObjectMapper to avoid the version parameter to be registered in ES
- * @author dgaillard
*/
public abstract class ESItemMixIn {
diff --git a/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ElasticSearchPersistenceServiceImpl.java
new file mode 100644
index 0000000000..6089dd3831
--- /dev/null
+++ b/persistence-elasticsearch-9/core/src/main/java/org/apache/unomi/persistence/elasticsearch9/ElasticSearchPersistenceServiceImpl.java
@@ -0,0 +1,2686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.persistence.elasticsearch9;
+
+import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
+import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
+import co.elastic.clients.elasticsearch._types.*;
+import co.elastic.clients.elasticsearch._types.aggregations.*;
+import co.elastic.clients.elasticsearch._types.analysis.CustomAnalyzer;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
+import co.elastic.clients.elasticsearch._types.query_dsl.*;
+import co.elastic.clients.elasticsearch.core.*;
+import co.elastic.clients.elasticsearch.core.CountRequest;
+import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.bulk.UpdateAction;
+import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.core.search.TotalHits;
+import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
+import co.elastic.clients.elasticsearch.ilm.*;
+import co.elastic.clients.elasticsearch.indices.*;
+import co.elastic.clients.elasticsearch.indices.Alias;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexTemplateRequest;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.elasticsearch.indices.get_alias.IndexAliases;
+import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
+import co.elastic.clients.elasticsearch.indices.put_index_template.IndexTemplateMapping;
+import co.elastic.clients.elasticsearch.tasks.GetTasksRequest;
+import co.elastic.clients.elasticsearch.tasks.GetTasksResponse;
+import co.elastic.clients.json.JsonData;
+import co.elastic.clients.json.JsonpMapper;
+import co.elastic.clients.transport.BackoffPolicy;
+import co.elastic.clients.transport.endpoints.BooleanResponse;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.transport.rest_client.RestClientOptions;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.json.stream.JsonGenerator;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.log4j.Level;
+import org.apache.unomi.api.*;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.query.DateRange;
+import org.apache.unomi.api.query.IpRange;
+import org.apache.unomi.api.query.NumericRange;
+import org.apache.unomi.metrics.MetricAdapter;
+import org.apache.unomi.metrics.MetricsService;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.apache.unomi.persistence.spi.aggregate.*;
+import org.apache.unomi.persistence.spi.aggregate.DateRangeAggregate;
+import org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate;
+import org.apache.unomi.persistence.spi.conditions.ConditionContextHelper;
+import org.apache.unomi.persistence.spi.conditions.ConditionEvaluator;
+import org.apache.unomi.persistence.spi.conditions.ConditionEvaluatorDispatcher;
+import org.elasticsearch.client.*;
+
+import org.osgi.framework.*;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.*;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
+
+ public static final String SEQ_NO = "seq_no";
+ public static final String PRIMARY_TERM = "primary_term";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
+ private static final String ROLLOVER_LIFECYCLE_NAME = "unomi-rollover-policy";
+
+ private boolean throwExceptions = false;
+ private ElasticsearchClient esClient;
+ private BulkIngester bulkIngester;
+ private String elasticSearchAddresses;
+ private final List elasticSearchAddressList = new ArrayList<>();
+ private String indexPrefix;
+ private String numberOfShards;
+ private String numberOfReplicas;
+ private String indexMappingTotalFieldsLimit;
+ private String indexMaxDocValueFieldsSearch;
+ private String[] fatalIllegalStateErrors;
+ private BundleContext bundleContext;
+ private final Map mappings = new HashMap();
+ private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
+ private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
+ private Map routingByType;
+
+ private Integer defaultQueryLimit = 10;
+ private final Integer removeByQueryTimeoutInMinutes = 10;
+ private Integer taskWaitingTimeout = 3600000;
+ private Integer taskWaitingPollingInterval = 1000;
+
+ private String bulkProcessorConcurrentRequests = "1";
+ private String bulkProcessorBulkActions = "1000";
+ private Long bulkProcessorBulkSize = 5L;
+ private Long bulkProcessorFlushIntervalInSeconds = 5L;
+ private String bulkProcessorBackoffPolicy = "exponential";
+
+ // Rollover configuration
+ private String sessionLatestIndex;
+ private List rolloverIndices;
+ private String rolloverMaxSize;
+ private String rolloverMaxAge;
+ private String rolloverMaxDocs;
+ private String rolloverIndexNumberOfShards;
+ private String rolloverIndexNumberOfReplicas;
+ private String rolloverIndexMappingTotalFieldsLimit;
+ private String rolloverIndexMaxDocValueFieldsSearch;
+
+ private String minimalElasticSearchVersion = "9.0.3";
+ private String maximalElasticSearchVersion = "10.0.0";
+
+ // authentication props
+ private String username;
+ private String password;
+ private boolean sslEnable = false;
+ private boolean sslTrustAllCertificates = false;
+
+ private int aggregateQueryBucketSize = 5000;
+
+ private MetricsService metricsService;
+ private boolean useBatchingForSave = false;
+ private boolean useBatchingForUpdate = true;
+ private String logLevelRestClient = "ERROR";
+ private boolean alwaysOverwrite = true;
+ private boolean aggQueryThrowOnMissingDocs = false;
+ private Integer aggQueryMaxResponseSizeHttp = null;
+ private Integer clientSocketTimeout = null;
+ private Map itemTypeToRefreshPolicy = new HashMap<>();
+
+ private final Map>> knownMappings = new HashMap<>();
+
+ private static final Map itemTypeIndexNameMap = new HashMap<>();
+ private static final Collection systemItems = Arrays.asList("actionType", "campaign", "campaignevent", "goal", "userList",
+ "propertyType", "scope", "conditionType", "rule", "scoring", "segment", "groovyAction", "topic", "patch", "jsonSchema",
+ "importConfig", "exportConfig", "rulestats");
+
+ static {
+ for (String systemItem : systemItems) {
+ itemTypeIndexNameMap.put(systemItem, "systemItems");
+ }
+
+ itemTypeIndexNameMap.put("profile", "profile");
+ itemTypeIndexNameMap.put("persona", "profile");
+ }
+
+ public void setBundleContext(BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
+ }
+
+ public void setElasticSearchAddresses(String elasticSearchAddresses) {
+ this.elasticSearchAddresses = elasticSearchAddresses;
+ String[] elasticSearchAddressesArray = elasticSearchAddresses.split(",");
+ elasticSearchAddressList.clear();
+ for (String elasticSearchAddress : elasticSearchAddressesArray) {
+ elasticSearchAddressList.add(elasticSearchAddress.trim());
+ }
+ }
+
+ public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy) throws IOException {
+ if (!itemTypeToRefreshPolicy.isEmpty()) {
+ this.itemTypeToRefreshPolicy = new ObjectMapper().readValue(itemTypeToRefreshPolicy,
+ new TypeReference>() {
+ });
+ }
+ }
+
+ public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
+ this.fatalIllegalStateErrors = Arrays.stream(fatalIllegalStateErrors.split(",")).map(i -> i.trim()).filter(i -> !i.isEmpty())
+ .toArray(String[]::new);
+ }
+
+ public void setAggQueryMaxResponseSizeHttp(String aggQueryMaxResponseSizeHttp) {
+ if (StringUtils.isNumeric(aggQueryMaxResponseSizeHttp)) {
+ this.aggQueryMaxResponseSizeHttp = Integer.parseInt(aggQueryMaxResponseSizeHttp);
+ }
+ }
+
+ public void setIndexPrefix(String indexPrefix) {
+ this.indexPrefix = indexPrefix;
+ }
+
+ public void setNumberOfShards(String numberOfShards) {
+ this.numberOfShards = numberOfShards;
+ }
+
+ public void setNumberOfReplicas(String numberOfReplicas) {
+ this.numberOfReplicas = numberOfReplicas;
+ }
+
+ public void setIndexMappingTotalFieldsLimit(String indexMappingTotalFieldsLimit) {
+ this.indexMappingTotalFieldsLimit = indexMappingTotalFieldsLimit;
+ }
+
+ public void setIndexMaxDocValueFieldsSearch(String indexMaxDocValueFieldsSearch) {
+ this.indexMaxDocValueFieldsSearch = indexMaxDocValueFieldsSearch;
+ }
+
+ public void setDefaultQueryLimit(Integer defaultQueryLimit) {
+ this.defaultQueryLimit = defaultQueryLimit;
+ }
+
+ public void setRoutingByType(Map routingByType) {
+ this.routingByType = routingByType;
+ }
+
+ public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) {
+ this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
+ }
+
+ public void setConditionESQueryBuilderDispatcher(ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher) {
+ this.conditionESQueryBuilderDispatcher = conditionESQueryBuilderDispatcher;
+ }
+
+ public void setBulkProcessorConcurrentRequests(String bulkProcessorConcurrentRequests) {
+ this.bulkProcessorConcurrentRequests = bulkProcessorConcurrentRequests;
+ }
+
+ public void setBulkProcessorBulkActions(String bulkProcessorBulkActions) {
+ this.bulkProcessorBulkActions = bulkProcessorBulkActions;
+ }
+
+ public void setBulkProcessorBulkSize(Long bulkProcessorBulkSize) {
+ this.bulkProcessorBulkSize = bulkProcessorBulkSize;
+ }
+
+ public void setBulkProcessorFlushIntervalInSeconds(Long bulkProcessorFlushIntervalInSeconds) {
+ this.bulkProcessorFlushIntervalInSeconds = bulkProcessorFlushIntervalInSeconds;
+ }
+
+ public void setBulkProcessorBackoffPolicy(String bulkProcessorBackoffPolicy) {
+ this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy;
+ }
+
+ public void setRolloverIndices(String rolloverIndices) {
+ this.rolloverIndices = StringUtils.isNotEmpty(rolloverIndices) ? Arrays.asList(rolloverIndices.split(",").clone()) : null;
+ }
+
+ public void setRolloverMaxSize(String rolloverMaxSize) {
+ this.rolloverMaxSize = rolloverMaxSize;
+ }
+
+ public void setRolloverMaxAge(String rolloverMaxAge) {
+ this.rolloverMaxAge = rolloverMaxAge;
+ }
+
+ public void setRolloverMaxDocs(String rolloverMaxDocs) {
+ this.rolloverMaxDocs = rolloverMaxDocs;
+ }
+
+ public void setRolloverIndexNumberOfShards(String rolloverIndexNumberOfShards) {
+ this.rolloverIndexNumberOfShards = rolloverIndexNumberOfShards;
+ }
+
+ public void setRolloverIndexNumberOfReplicas(String rolloverIndexNumberOfReplicas) {
+ this.rolloverIndexNumberOfReplicas = rolloverIndexNumberOfReplicas;
+ }
+
+ public void setRolloverIndexMappingTotalFieldsLimit(String rolloverIndexMappingTotalFieldsLimit) {
+ this.rolloverIndexMappingTotalFieldsLimit = rolloverIndexMappingTotalFieldsLimit;
+ }
+
+ public void setRolloverIndexMaxDocValueFieldsSearch(String rolloverIndexMaxDocValueFieldsSearch) {
+ this.rolloverIndexMaxDocValueFieldsSearch = rolloverIndexMaxDocValueFieldsSearch;
+ }
+
+ public void setMinimalElasticSearchVersion(String minimalElasticSearchVersion) {
+ this.minimalElasticSearchVersion = minimalElasticSearchVersion;
+ }
+
+ public void setMaximalElasticSearchVersion(String maximalElasticSearchVersion) {
+ this.maximalElasticSearchVersion = maximalElasticSearchVersion;
+ }
+
+ public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
+ this.aggregateQueryBucketSize = aggregateQueryBucketSize;
+ }
+
+ public void setClientSocketTimeout(String clientSocketTimeout) {
+ if (StringUtils.isNumeric(clientSocketTimeout)) {
+ this.clientSocketTimeout = Integer.parseInt(clientSocketTimeout);
+ }
+ }
+
+ public void setMetricsService(MetricsService metricsService) {
+ this.metricsService = metricsService;
+ }
+
+ public void setUseBatchingForSave(boolean useBatchingForSave) {
+ this.useBatchingForSave = useBatchingForSave;
+ }
+
+ public void setUseBatchingForUpdate(boolean useBatchingForUpdate) {
+ this.useBatchingForUpdate = useBatchingForUpdate;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public void setSslEnable(boolean sslEnable) {
+ this.sslEnable = sslEnable;
+ }
+
+ public void setSslTrustAllCertificates(boolean sslTrustAllCertificates) {
+ this.sslTrustAllCertificates = sslTrustAllCertificates;
+ }
+
+ public void setAggQueryThrowOnMissingDocs(boolean aggQueryThrowOnMissingDocs) {
+ this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs;
+ }
+
+ public void setThrowExceptions(boolean throwExceptions) {
+ this.throwExceptions = throwExceptions;
+ }
+
+ public void setAlwaysOverwrite(boolean alwaysOverwrite) {
+ this.alwaysOverwrite = alwaysOverwrite;
+ }
+
+ public void setLogLevelRestClient(String logLevelRestClient) {
+ this.logLevelRestClient = logLevelRestClient;
+ }
+
+ public void setTaskWaitingTimeout(String taskWaitingTimeout) {
+ if (StringUtils.isNumeric(taskWaitingTimeout)) {
+ this.taskWaitingTimeout = Integer.parseInt(taskWaitingTimeout);
+ }
+ }
+
+ public void setTaskWaitingPollingInterval(String taskWaitingPollingInterval) {
+ if (StringUtils.isNumeric(taskWaitingPollingInterval)) {
+ this.taskWaitingPollingInterval = Integer.parseInt(taskWaitingPollingInterval);
+ }
+ }
+
+ /**
+ * Check if the current cluster version is in the expected range
+ *
+ * @return true if the version of the current elasticsearch is not in the expected range
+ */
+ private boolean versionIsNotCompatible() throws IOException {
+ InfoResponse info = esClient.info();
+ String currentVersion = info.version().number();
+
+ return compareVersions(currentVersion, minimalElasticSearchVersion) < 0
+ || compareVersions(currentVersion, maximalElasticSearchVersion) >= 0;
+ }
+
+ /**
+ * Compare to semantic versions
+ *
+ * @param version1 First version
+ * @param version2 Second vrsion
+ * @return positive if version1 > version2, 0 if equals, negative if version1 < version2
+ */
+ private static int compareVersions(String version1, String version2) {
+ String[] parts1 = version1.split("\\.");
+ String[] parts2 = version2.split("\\.");
+
+ int length = Math.max(parts1.length, parts2.length);
+
+ for (int i = 0; i < length; i++) {
+ int part1 = i < parts1.length ? Integer.parseInt(parts1[i]) : 0;
+ int part2 = i < parts2.length ? Integer.parseInt(parts2[i]) : 0;
+
+ if (part1 != part2) {
+ return part1 - part2;
+ }
+ }
+
+ return 0;
+ }
+
+ public void start() throws Exception {
+
+ // Work around to avoid ES Logs regarding the deprecated [ignore_throttled] parameter
+ try {
+ Level lvl = Level.toLevel(logLevelRestClient, Level.ERROR);
+ //TODO ensure this is necessary
+ org.apache.log4j.Logger.getLogger("org.elasticsearch.client.RestClient").setLevel(lvl);
+ } catch (Exception e) {
+ // Never fail because of the set of the logger
+ }
+
+ // on startup
+ new InClassLoaderExecute<>(null, null, this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ public Object execute(Object... args) throws Exception {
+
+ buildClient();
+
+ if (versionIsNotCompatible()) {
+ throw new Exception(
+ "ElasticSearch version is not within [" + minimalElasticSearchVersion + "," + maximalElasticSearchVersion
+ + "), aborting startup !");
+ }
+
+ registerRolloverLifecyclePolicy();
+
+ loadPredefinedMappings(bundleContext, false);
+ loadPainlessScripts(bundleContext);
+
+ // load predefined mappings and condition dispatchers of any bundles that were started before this one.
+ for (Bundle existingBundle : bundleContext.getBundles()) {
+ if (existingBundle.getBundleContext() != null) {
+ loadPredefinedMappings(existingBundle.getBundleContext(), false);
+ loadPainlessScripts(existingBundle.getBundleContext());
+ }
+ }
+
+ // Wait for green
+ LOGGER.info("Waiting for GREEN cluster status...");
+ esClient.cluster().health(builder -> builder.waitForStatus(HealthStatus.Green));
+ LOGGER.info("Cluster status is GREEN");
+
+ // We keep in memory the latest available session index to be able to load session using direct GET access on ES
+ if (isItemTypeRollingOver(Session.ITEM_TYPE)) {
+ LOGGER.info("Sessions are using rollover indices, loading latest session index available ...");
+ GetAliasResponse getAliasResponse = esClient.indices().getAlias(builder -> builder.name(getIndex(Session.ITEM_TYPE)));
+ Map aliases = getAliasResponse.aliases();
+ if (!aliases.isEmpty()) {
+ sessionLatestIndex = new TreeSet<>(aliases.keySet()).last();
+ LOGGER.info("Latest available session index found is: {}", sessionLatestIndex);
+ } else {
+ throw new IllegalStateException("No index found for sessions");
+ }
+ }
+
+ return true;
+ }
+ }.executeInClassLoader();
+
+ bundleContext.addBundleListener(this);
+
+ LOGGER.info("{} service started successfully.", this.getClass().getName());
+ }
+
+ private List getHosts() {
+ List hosts = new ArrayList<>();
+ for (String elasticSearchAddress : elasticSearchAddressList) {
+ String[] elasticSearchAddressParts = elasticSearchAddress.split(":");
+ String elasticSearchHostName = elasticSearchAddressParts[0];
+ int elasticSearchPort = Integer.parseInt(elasticSearchAddressParts[1]);
+ hosts.add(new HttpHost(elasticSearchHostName, elasticSearchPort, sslEnable ? "https" : "http"));
+ }
+ return hosts;
+ }
+
+ private void buildClient() throws NoSuchFieldException, IllegalAccessException {
+ final SSLContext sslContext;
+ try {
+ sslContext = SSLContext.getInstance("SSL");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (sslTrustAllCertificates) {
+ try {
+ sslContext.init(null, new TrustManager[] { new X509TrustManager() {
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ public void checkClientTrusted(X509Certificate[] certs, String authType) {
+ }
+
+ public void checkServerTrusted(X509Certificate[] certs, String authType) {
+ }
+ } }, new SecureRandom());
+ } catch (KeyManagementException e) {
+ LOGGER.error("Error creating SSL Context for trust all certificates", e);
+ }
+ }
+
+ esClient = ElasticsearchClientFactory.builder().hosts(getHosts()).socketTimeout(clientSocketTimeout).sslContext(sslContext)
+ .usernameAndPassword(username, password).build();
+
+ buildBulkIngester();
+ LOGGER.info("Connecting to ElasticSearch persistence backend using index prefix {}...", indexPrefix);
+ }
+
+ public BulkIngester buildBulkIngester() {
+ if (bulkIngester != null) {
+ return bulkIngester;
+ }
+ BulkListener listener = new BulkListener() {
+ @Override public void beforeBulk(long executionId, BulkRequest request, List strings) {
+ LOGGER.debug("Before Bulk");
+ }
+
+ @Override public void afterBulk(long executionId, BulkRequest request, List strings, BulkResponse response) {
+ LOGGER.debug("After Bulk");
+ }
+
+ @Override public void afterBulk(long executionId, BulkRequest request, List strings, Throwable failure) {
+ LOGGER.error("After Bulk (failure)", failure);
+
+ }
+ };
+
+ BulkIngester.Builder ingesterBuilder = new BulkIngester.Builder().client(esClient).maxOperations(100)
+ .flushInterval(1, TimeUnit.SECONDS).listener(listener);
+
+ if (bulkProcessorConcurrentRequests != null) {
+ int concurrentRequests = Integer.parseInt(bulkProcessorConcurrentRequests);
+ if (concurrentRequests > 1) {
+ ingesterBuilder.maxConcurrentRequests(concurrentRequests);
+ }
+ }
+ if (bulkProcessorBulkActions != null) {
+ int bulkActions = Integer.parseInt(bulkProcessorBulkActions);
+ ingesterBuilder.maxOperations(bulkActions);
+ }
+ if (bulkProcessorBulkSize != null) {
+ // Default is 5MB
+ ingesterBuilder.maxSize(bulkProcessorBulkSize * 1024 * 1024);
+ }
+
+ if (bulkProcessorFlushIntervalInSeconds != null) {
+ ingesterBuilder.flushInterval(bulkProcessorFlushIntervalInSeconds, TimeUnit.SECONDS);
+ } else {
+ // in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default
+ ingesterBuilder.flushInterval(5, TimeUnit.SECONDS);
+ }
+ if (bulkProcessorBackoffPolicy != null) {
+ String backoffPolicyStr = bulkProcessorBackoffPolicy;
+ if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) {
+ backoffPolicyStr = backoffPolicyStr.toLowerCase();
+ if ("nobackoff".equals(backoffPolicyStr)) {
+ ingesterBuilder.backoffPolicy(BackoffPolicy.noBackoff());
+ } else if (backoffPolicyStr.startsWith("constant(")) {
+ int paramStartPos = backoffPolicyStr.indexOf("constant(" + "constant(".length());
+ int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
+ int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
+ Long delay = Long.valueOf(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos));
+
+ int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
+ // Delay is in ms
+ ingesterBuilder.backoffPolicy(BackoffPolicy.constantBackoff(delay != null ? delay : 5000, maxNumberOfRetries));
+ } else if (backoffPolicyStr.startsWith("exponential")) {
+ if (!backoffPolicyStr.contains("(")) {
+ ingesterBuilder.backoffPolicy(BackoffPolicy.exponentialBackoff());
+ } else {
+ // we detected parameters, must process them.
+ int paramStartPos = backoffPolicyStr.indexOf("exponential(" + "exponential(".length());
+ int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
+ int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
+ Long delay = Long.valueOf(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos));
+ int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
+ ingesterBuilder.backoffPolicy(BackoffPolicy.exponentialBackoff(delay != null ? delay : 5000, maxNumberOfRetries));
+ }
+ }
+ }
+ }
+
+ bulkIngester = ingesterBuilder.build();
+ return bulkIngester;
+ }
+
+ public void stop() {
+ new InClassLoaderExecute<>(null, null, this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ protected Object execute(Object... args) throws IOException {
+ LOGGER.info("Closing ElasticSearch persistence backend...");
+ if (esClient != null) {
+ esClient.close();
+ }
+ return null;
+ }
+ }.catchingExecuteInClassLoader(true);
+
+ bundleContext.removeBundleListener(this);
+ }
+
+ public void bindConditionEvaluator(ServiceReference conditionEvaluatorServiceReference) {
+ ConditionEvaluator conditionEvaluator = bundleContext.getService(conditionEvaluatorServiceReference);
+ conditionEvaluatorDispatcher.addEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString(),
+ conditionEvaluator);
+ }
+
+ public void unbindConditionEvaluator(ServiceReference conditionEvaluatorServiceReference) {
+ if (conditionEvaluatorServiceReference == null) {
+ return;
+ }
+ conditionEvaluatorDispatcher.removeEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString());
+ }
+
+ public void bindConditionESQueryBuilder(ServiceReference conditionESQueryBuilderServiceReference) {
+ ConditionESQueryBuilder conditionESQueryBuilder = bundleContext.getService(conditionESQueryBuilderServiceReference);
+ conditionESQueryBuilderDispatcher.addQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString(),
+ conditionESQueryBuilder);
+ }
+
+ public void unbindConditionESQueryBuilder(ServiceReference conditionESQueryBuilderServiceReference) {
+ if (conditionESQueryBuilderServiceReference == null) {
+ return;
+ }
+ conditionESQueryBuilderDispatcher.removeQueryBuilder(
+ conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString());
+ }
+
+ @Override public void bundleChanged(BundleEvent event) {
+ switch (event.getType()) {
+ case BundleEvent.STARTING:
+ loadPredefinedMappings(event.getBundle().getBundleContext(), true);
+ loadPainlessScripts(event.getBundle().getBundleContext());
+ break;
+ }
+ }
+
+ private void loadPredefinedMappings(BundleContext bundleContext, boolean forceUpdateMapping) {
+ Enumeration predefinedMappings = bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
+ if (predefinedMappings == null) {
+ return;
+ }
+ while (predefinedMappings.hasMoreElements()) {
+ URL predefinedMappingURL = predefinedMappings.nextElement();
+ LOGGER.info("Found mapping at {}, loading... ", predefinedMappingURL);
+ try {
+ final String path = predefinedMappingURL.getPath();
+ String name = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.'));
+ String mappingSource = loadMappingFile(predefinedMappingURL);
+
+ mappings.put(name, mappingSource);
+
+ if (!createIndex(name)) {
+ LOGGER.info("Found index for type {}", name);
+ if (forceUpdateMapping) {
+ LOGGER.info("Updating mapping for {}", name);
+ createMapping(name, mappingSource);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error while loading mapping definition {}", predefinedMappingURL, e);
+ }
+ }
+ }
+
+ private void loadPainlessScripts(BundleContext bundleContext) {
+ Enumeration scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true);
+ if (scriptsURL == null) {
+ return;
+ }
+
+ Map scriptsById = new HashMap<>();
+ while (scriptsURL.hasMoreElements()) {
+ URL scriptURL = scriptsURL.nextElement();
+ LOGGER.info("Found painless script at {}, loading... ", scriptURL);
+ try (InputStream in = scriptURL.openStream()) {
+ String script = IOUtils.toString(in, StandardCharsets.UTF_8);
+ String scriptId = FilenameUtils.getBaseName(scriptURL.getPath());
+ scriptsById.put(scriptId, script);
+ } catch (Exception e) {
+ LOGGER.error("Error while loading painless script {}", scriptURL, e);
+ }
+
+ }
+
+ storeScripts(scriptsById);
+ }
+
+ private String loadMappingFile(URL predefinedMappingURL) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream()));
+
+ StringBuilder content = new StringBuilder();
+ String l;
+ while ((l = reader.readLine()) != null) {
+ content.append(l);
+ }
+ return content.toString();
+ }
+
+ @Override public String getName() {
+ return "elasticsearch9";
+ }
+
+ @Override public List getAllItems(final Class clazz) {
+ return getAllItems(clazz, 0, -1, null).getList();
+ }
+
+ @Override public long getAllItemsCount(String itemType) {
+ return queryCount(Query.of(q -> q.matchAll(m -> m)), itemType);
+ }
+
+ @Override public PartialList getAllItems(final Class clazz, int offset, int size, String sortBy) {
+ return getAllItems(clazz, offset, size, sortBy, null);
+ }
+
+ @Override public PartialList getAllItems(final Class clazz, int offset, int size, String sortBy,
+ String scrollTimeValidity) {
+ long startTime = System.currentTimeMillis();
+ try {
+ return query(Query.of(q -> q.matchAll(m -> m)), sortBy, clazz, offset, size, null, scrollTimeValidity);
+ } finally {
+ if (metricsService != null && metricsService.isActivated()) {
+ metricsService.updateTimer(this.getClass().getName() + ".getAllItems", startTime);
+ }
+ }
+ }
+
+ @Override public T load(final String itemId, final Class clazz) {
+ return load(itemId, clazz, null);
+ }
+
+ @Override @Deprecated public T load(final String itemId, final Date dateHint, final Class clazz) {
+ return load(itemId, clazz, null);
+ }
+
+ @Override @Deprecated public CustomItem loadCustomItem(final String itemId, final Date dateHint, String customItemType) {
+ return load(itemId, CustomItem.class, customItemType);
+ }
+
+ @Override public CustomItem loadCustomItem(final String itemId, String customItemType) {
+ return load(itemId, CustomItem.class, customItemType);
+ }
+
+ private T load(final String itemId, final Class clazz, final String customItemType) {
+ if (StringUtils.isEmpty(itemId)) {
+ return null;
+ }
+
+ return new InClassLoaderExecute(metricsService, this.getClass().getName() + ".loadItem", this.bundleContext,
+ this.fatalIllegalStateErrors, throwExceptions) {
+ protected T execute(Object... args) throws Exception {
+ try {
+ final String itemType = customItemType != null ? customItemType : Item.getItemType(clazz);
+ String documentId = getDocumentIDForItemType(itemId, itemType);
+
+ boolean sessionSpecialDirectAccess = sessionLatestIndex != null && Session.ITEM_TYPE.equals(itemType);
+ if (!sessionSpecialDirectAccess && isItemTypeRollingOver(itemType)) {
+ return new MetricAdapter(metricsService, ".loadItemWithQuery") {
+ @Override public T execute(Object... args) throws Exception {
+ Query query = Query.of(q -> q.ids(builder -> builder.values(documentId)));
+ if (customItemType == null) {
+ PartialList r = query(query, null, clazz, 0, 1, null, null);
+ if (r.size() > 0) {
+ return r.get(0);
+ }
+ } else {
+ PartialList r = query(query, null, customItemType, 0, 1, null, null);
+ if (r.size() > 0) {
+ return (T) r.get(0);
+ }
+ }
+ return null;
+ }
+ }.execute();
+ } else {
+ // Special handling for session we check the latest available index directly to speed up session loading
+ GetRequest getRequest = GetRequest.of(
+ builder -> builder.index(sessionSpecialDirectAccess ? sessionLatestIndex : getIndex(itemType))
+ .id(documentId));
+ GetResponse response = esClient.get(getRequest, clazz);
+ if (response.found()) {
+ T value = response.source();
+ setMetadata(value, response.id(), response.version() != null ? response.version() : 0L,
+ response.seqNo() != null ? response.seqNo() : 0L,
+ response.primaryTerm() != null ? response.primaryTerm() : 0L, response.index());
+ return value;
+ } else {
+ return null;
+ }
+ }
+ } catch (ElasticsearchException e) {
+ if (e.status() == 404 && e.getMessage() != null && e.getMessage().contains("index_not_found_exception")) {
+ // The index does not exist
+ return null;
+ }
+ return null;
+ } catch (Exception ex) {
+ throw new Exception(
+ "Error loading itemType=" + clazz.getName() + " customItemType=" + customItemType + " itemId=" + itemId, ex);
+ }
+ }
+ }.catchingExecuteInClassLoader(true);
+
+ }
+
+ private void setMetadata(Item item, String itemId, long version, long seqNo, long primaryTerm, String index) {
+ if (!systemItems.contains(item.getItemType()) && item.getItemId() == null) {
+ item.setItemId(itemId);
+ }
+ item.setVersion(version);
+ item.setSystemMetadata(SEQ_NO, seqNo);
+ item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
+ item.setSystemMetadata("index", index);
+ }
+
+ @Override public boolean isConsistent(Item item) {
+ return getRefreshPolicy(item.getItemType()) != Refresh.False;
+ }
+
+ @Override public boolean save(final Item item) {
+ return save(item, useBatchingForSave, alwaysOverwrite);
+ }
+
+ @Override public boolean save(final Item item, final boolean useBatching) {
+ return save(item, useBatching, alwaysOverwrite);
+ }
+
+ @Override public boolean save(final Item item, final Boolean useBatchingOption, final Boolean alwaysOverwriteOption) {
+ final boolean useBatching = useBatchingOption == null ? this.useBatchingForSave : useBatchingOption;
+ final boolean alwaysOverwrite = alwaysOverwriteOption == null ? this.alwaysOverwrite : alwaysOverwriteOption;
+
+ Boolean result = new InClassLoaderExecute(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext,
+ this.fatalIllegalStateErrors, throwExceptions) {
+ protected Boolean execute(Object... args) throws Exception {
+ try {
+ String itemType = item.getItemType();
+ if (item instanceof CustomItem) {
+ itemType = ((CustomItem) item).getCustomItemType();
+ }
+ String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
+ String index = item.getSystemMetadata("index") != null ? (String) item.getSystemMetadata("index") : getIndex(itemType);
+
+ Long seqNo;
+ Long primaryTerm;
+ OpType opType = null;
+ String routing;
+ if (!alwaysOverwrite) {
+ seqNo = (Long) item.getSystemMetadata(SEQ_NO);
+ primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM);
+ opType = seqNo == null && primaryTerm == null ? OpType.Create : null;
+ } else {
+ primaryTerm = null;
+ seqNo = null;
+ }
+
+ if (routingByType.containsKey(itemType)) {
+ routing = routingByType.get(itemType);
+ } else {
+ routing = null;
+ }
+
+ try {
+ if (bulkIngester == null || !useBatching) {
+ IndexRequest.Builder