diff --git oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java index 99c6bc5..3324f5d 100644 --- oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java +++ oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java @@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch; import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.plugins.index.IndexConstants; import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -63,11 +64,15 @@ public class ElasticsearchIndexDefinition extends IndexDefinition { public final int bulkRetries; public final long bulkRetriesBackoff; private final String indexPrefix; + private final String remoteAlias; public ElasticsearchIndexDefinition(NodeState root, NodeState defn, String indexPath, String indexPrefix) { super(root, getIndexDefinitionState(defn), determineIndexFormatVersion(defn), determineUniqueId(defn), indexPath); + boolean isReindex = defn.getBoolean(IndexConstants.REINDEX_PROPERTY_NAME); + String indexSuffix = "-" + (getReindexCount() + (isReindex ? 1 : 0)); this.indexPrefix = indexPrefix != null ? indexPrefix : ""; - this.remoteIndexName = setupIndexName(); + this.remoteAlias = setupAlias(); + this.remoteIndexName = getESSafeIndexName(this.remoteAlias + indexSuffix); this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, BULK_ACTIONS_DEFAULT); this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, BULK_SIZE_BYTES_DEFAULT); this.bulkFlushIntervalMs = getOptionalValue(defn, BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT); @@ -76,17 +81,27 @@ public class ElasticsearchIndexDefinition extends IndexDefinition { } /** + * Returns the index alias on the Elasticsearch cluster. This alias should be used for any index related operations + * instead of accessing the index directly. + * @return the Elasticsearch index alias + */ + public String getRemoteIndexAlias() { + return remoteAlias; + } + + /** * Returns the index identifier on the Elasticsearch cluster. Notice this can be different from the value returned - * from {@code getIndexName}. + * from {@code getIndexName}. The index name shouldn't be used for index read or updates. Alias obtained from {@link #getRemoteIndexAlias()} + * should be used for such purposes. * @return the Elasticsearch index identifier */ public String getRemoteIndexName() { return remoteIndexName; } - private String setupIndexName() { + private String setupAlias() { // TODO: implement advanced remote index name strategy that takes into account multiple tenants and re-index process - return getESSafeIndexName(indexPrefix + "." + getIndexPath() + "-" + getReindexCount()); + return getESSafeIndexName(indexPrefix + "." + getIndexPath()); } /** diff --git oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java index add20d3..59215d4 100644 --- oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java +++ oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java @@ -21,6 +21,9 @@ import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexD import org.apache.jackrabbit.oak.plugins.index.search.FieldNames; import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; @@ -28,9 +31,13 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.GetAliasesResponse; +import org.elasticsearch.client.IndicesClient; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -44,8 +51,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchDocument.pathToId; @@ -92,7 +100,7 @@ class ElasticsearchIndexWriter implements FulltextIndexWriter> aliases = aliasesResponse.getAliases(); + IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); + for (String oldIndexName : aliases.keySet()) { + IndicesAliasesRequest.AliasActions removeAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE); + removeAction.index(oldIndexName).alias(indexDefinition.getRemoteIndexAlias()); + indicesAliasesRequest.addAliasAction(removeAction); + } + IndicesAliasesRequest.AliasActions addAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD); + addAction.index(indexName).alias(indexDefinition.getRemoteIndexAlias()); + indicesAliasesRequest.addAliasAction(addAction); + AcknowledgedResponse updateAliasResponse = indicesClient.updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT); + checkResponseAcknowledgement(updateAliasResponse, "Update alias call not acknowledged for alias " + + indexDefinition.getRemoteIndexAlias()); + LOG.info("Updated alias {} to index {}. Response acknowledged: {}", indexDefinition.getRemoteIndexAlias(), + indexName, updateAliasResponse.isAcknowledged()); + deleteOldIndices(indicesClient, aliases.keySet()); + } + + private void checkResponseAcknowledgement(AcknowledgedResponse response, String exceptionMessage) { + if (!response.isAcknowledged()) { + throw new IllegalStateException(exceptionMessage); + } + } + + private void deleteOldIndices(IndicesClient indicesClient, Set indices) throws IOException { + if (indices.size() == 0) + return; + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(); + for (String oldIndexName : indices) { + deleteIndexRequest.indices(oldIndexName); + } + AcknowledgedResponse deleteIndexResponse = indicesClient.delete(deleteIndexRequest, RequestOptions.DEFAULT); + checkResponseAcknowledgement(deleteIndexResponse, "Delete index call not acknowledged for indices " + indices); + LOG.info("Deleted indices {}. Response acknowledged: {}", indices.toString(), deleteIndexResponse.isAcknowledged()); + } + + private CreateIndexRequest constructCreateIndexRequest(String indexName) throws IOException { + CreateIndexRequest request = new CreateIndexRequest(indexName); // provision settings request.settings(Settings.builder() @@ -179,11 +239,7 @@ class ElasticsearchIndexWriter implements FulltextIndexWriter