| // Copyright (C) 2014 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.gerrit.elasticsearch; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.collect.ImmutableList.toImmutableList; |
| import static com.google.gson.FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES; |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ArrayListMultimap; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ListMultimap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Streams; |
| import com.google.common.flogger.FluentLogger; |
| import com.google.common.io.BaseEncoding; |
| import com.google.common.io.CharStreams; |
| import com.google.gerrit.common.Nullable; |
| import com.google.gerrit.elasticsearch.ElasticMapping.Mapping; |
| import com.google.gerrit.elasticsearch.builders.QueryBuilder; |
| import com.google.gerrit.elasticsearch.builders.SearchSourceBuilder; |
| import com.google.gerrit.elasticsearch.bulk.DeleteRequest; |
| import com.google.gerrit.entities.converter.ProtoConverter; |
| import com.google.gerrit.exceptions.StorageException; |
| import com.google.gerrit.index.FieldType; |
| import com.google.gerrit.index.Index; |
| import com.google.gerrit.index.QueryOptions; |
| import com.google.gerrit.index.Schema; |
| import com.google.gerrit.index.query.DataSource; |
| import com.google.gerrit.index.query.FieldBundle; |
| import com.google.gerrit.index.query.HasCardinality; |
| import com.google.gerrit.index.query.ListResultSet; |
| import com.google.gerrit.index.query.Predicate; |
| import com.google.gerrit.index.query.QueryParseException; |
| import com.google.gerrit.index.query.ResultSet; |
| import com.google.gerrit.proto.Protos; |
| import com.google.gerrit.server.config.SitePaths; |
| import com.google.gerrit.server.index.IndexUtils; |
| import com.google.gerrit.server.index.options.AutoFlush; |
| import com.google.gson.Gson; |
| import com.google.gson.GsonBuilder; |
| import com.google.gson.JsonArray; |
| import com.google.gson.JsonElement; |
| import com.google.gson.JsonObject; |
| import com.google.gson.JsonParser; |
| import com.google.protobuf.MessageLite; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.Reader; |
| import java.io.UnsupportedEncodingException; |
| import java.net.URLEncoder; |
| import java.sql.Timestamp; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.function.Function; |
| import org.apache.http.HttpEntity; |
| import org.apache.http.HttpStatus; |
| import org.apache.http.StatusLine; |
| import org.apache.http.client.methods.HttpPost; |
| import org.apache.http.entity.ContentType; |
| import org.apache.http.nio.entity.NStringEntity; |
| import org.apache.http.util.EntityUtils; |
| import org.elasticsearch.client.Request; |
| import org.elasticsearch.client.Response; |
| |
| abstract class AbstractElasticIndex<K, V> implements Index<K, V> { |
| private static final FluentLogger logger = FluentLogger.forEnclosingClass(); |
| |
| protected static final String BULK = "_bulk"; |
| protected static final String MAPPINGS = "mappings"; |
| protected static final String ORDER = "order"; |
| protected static final String DESC_SORT_ORDER = "desc"; |
| protected static final String ASC_SORT_ORDER = "asc"; |
| protected static final String UNMAPPED_TYPE = "unmapped_type"; |
| protected static final String SEARCH = "_search"; |
| protected static final String SETTINGS = "settings"; |
| |
| static byte[] decodeBase64(String base64String) { |
| return BaseEncoding.base64().decode(base64String); |
| } |
| |
| protected static <T> List<T> decodeProtos( |
| JsonObject doc, String fieldName, ProtoConverter<?, T> converter) { |
| JsonArray field = doc.getAsJsonArray(fieldName); |
| if (field == null) { |
| return null; |
| } |
| return Streams.stream(field) |
| .map(JsonElement::getAsString) |
| .map(AbstractElasticIndex::decodeBase64) |
| .map(bytes -> parseProtoFrom(bytes, converter)) |
| .collect(toImmutableList()); |
| } |
| |
| protected static <P extends MessageLite, T> T parseProtoFrom( |
| byte[] bytes, ProtoConverter<P, T> converter) { |
| P message = Protos.parseUnchecked(converter.getParser(), bytes); |
| return converter.fromProto(message); |
| } |
| |
| static String getContent(Response response) throws IOException { |
| HttpEntity responseEntity = response.getEntity(); |
| String content = ""; |
| if (responseEntity != null) { |
| InputStream contentStream = responseEntity.getContent(); |
| try (Reader reader = new InputStreamReader(contentStream, UTF_8)) { |
| content = CharStreams.toString(reader); |
| } |
| } |
| return content; |
| } |
| |
| private final ElasticConfiguration config; |
| private final Schema<V> schema; |
| private final SitePaths sitePaths; |
| private final String indexNameRaw; |
| private final Map<String, String> refreshParam; |
| |
| protected final ElasticRestClientProvider client; |
| protected final String indexName; |
| protected final Gson gson; |
| protected final ElasticQueryBuilder queryBuilder; |
| |
| AbstractElasticIndex( |
| ElasticConfiguration config, |
| SitePaths sitePaths, |
| Schema<V> schema, |
| ElasticRestClientProvider client, |
| String indexName, |
| AutoFlush autoFlush) { |
| this.config = config; |
| this.sitePaths = sitePaths; |
| this.schema = schema; |
| this.gson = new GsonBuilder().setFieldNamingPolicy(LOWER_CASE_WITH_UNDERSCORES).create(); |
| this.queryBuilder = new ElasticQueryBuilder(); |
| this.indexName = config.getIndexName(indexName, schema.getVersion()); |
| this.indexNameRaw = indexName; |
| this.client = client; |
| this.refreshParam = |
| Map.of( |
| "refresh", |
| autoFlush == AutoFlush.ENABLED ? Boolean.TRUE.toString() : Boolean.FALSE.toString()); |
| } |
| |
| @Override |
| public void insert(V obj) { |
| replace(obj); |
| } |
| |
| @Override |
| public Schema<V> getSchema() { |
| return schema; |
| } |
| |
| @Override |
| public void close() { |
| // Do nothing. Client is closed by the provider. |
| } |
| |
| @Override |
| public void markReady(boolean ready) { |
| IndexUtils.setReady(sitePaths, indexNameRaw, schema.getVersion(), ready); |
| } |
| |
| @Override |
| public void delete(K id) { |
| String uri = getURI(BULK); |
| Response response = postRequestWithRefreshParam(uri, getDeleteActions(id)); |
| int statusCode = response.getStatusLine().getStatusCode(); |
| if (statusCode != HttpStatus.SC_OK) { |
| throw new StorageException( |
| String.format("Failed to delete %s from index %s: %s", id, indexName, statusCode)); |
| } |
| } |
| |
| @Override |
| public void deleteAll() { |
| // Delete the index, if it exists. |
| String endpoint = indexName + client.adapter().indicesExistParams(); |
| Response response = performRequest("HEAD", endpoint); |
| int statusCode = response.getStatusLine().getStatusCode(); |
| if (statusCode == HttpStatus.SC_OK) { |
| response = performRequest("DELETE", indexName); |
| statusCode = response.getStatusLine().getStatusCode(); |
| if (statusCode != HttpStatus.SC_OK) { |
| throw new StorageException( |
| String.format("Failed to delete index %s: %s", indexName, statusCode)); |
| } |
| } |
| |
| // Recreate the index. |
| String indexCreationFields = concatJsonString(getSettings(), getMappings()); |
| response = performRequest("PUT", indexName, indexCreationFields); |
| statusCode = response.getStatusLine().getStatusCode(); |
| if (statusCode != HttpStatus.SC_OK) { |
| String error = String.format("Failed to create index %s: %s", indexName, statusCode); |
| throw new StorageException(error); |
| } |
| } |
| |
| protected abstract String getDeleteActions(K id); |
| |
| protected abstract String getMappings(); |
| |
| private String getSettings() { |
| return gson.toJson(ImmutableMap.of(SETTINGS, ElasticSetting.createSetting(config))); |
| } |
| |
| protected abstract String getId(V v); |
| |
| protected String getMappingsForSingleType(Mapping mapping) { |
| return getMappingsFor(mapping); |
| } |
| |
| protected String getMappingsFor(Mapping mapping) { |
| JsonObject mappings = new JsonObject(); |
| |
| mappings.add(MAPPINGS, gson.toJsonTree(mapping)); |
| return gson.toJson(mappings); |
| } |
| |
| protected String getDeleteRequest(K id) { |
| return new DeleteRequest(id.toString(), indexName).toString(); |
| } |
| |
| protected abstract V fromDocument(JsonObject doc, Set<String> fields); |
| |
| protected FieldBundle toFieldBundle(JsonObject doc) { |
| ListMultimap<String, Object> rawFields = ArrayListMultimap.create(); |
| for (Map.Entry<String, JsonElement> element : |
| doc.get(client.adapter().rawFieldsKey()).getAsJsonObject().entrySet()) { |
| checkArgument( |
| getSchema().hasField(element.getKey()), "Unrecognized field " + element.getKey()); |
| FieldType<?> type = getSchema().getSchemaField(element.getKey()).getType(); |
| Iterable<JsonElement> innerItems = |
| element.getValue().isJsonArray() |
| ? element.getValue().getAsJsonArray() |
| : Collections.singleton(element.getValue()); |
| for (JsonElement inner : innerItems) { |
| if (type == FieldType.EXACT || type == FieldType.FULL_TEXT || type == FieldType.PREFIX) { |
| rawFields.put(element.getKey(), inner.getAsString()); |
| } else if (type == FieldType.INTEGER || type == FieldType.INTEGER_RANGE) { |
| rawFields.put(element.getKey(), inner.getAsInt()); |
| } else if (type == FieldType.LONG) { |
| rawFields.put(element.getKey(), inner.getAsLong()); |
| } else if (type == FieldType.TIMESTAMP) { |
| rawFields.put(element.getKey(), new Timestamp(inner.getAsLong())); |
| } else if (type == FieldType.STORED_ONLY) { |
| rawFields.put(element.getKey(), decodeBase64(inner.getAsString())); |
| } else { |
| throw FieldType.badFieldType(type); |
| } |
| } |
| } |
| return new FieldBundle(rawFields); |
| } |
| |
| protected boolean hasErrors(Response response) { |
| try { |
| String contentType = response.getEntity().getContentType().getValue(); |
| Preconditions.checkState( |
| contentType.equals(ContentType.APPLICATION_JSON.toString()), |
| String.format("Expected %s, but was: %s", ContentType.APPLICATION_JSON, contentType)); |
| String responseStr = EntityUtils.toString(response.getEntity()); |
| JsonObject responseJson = (JsonObject) new JsonParser().parse(responseStr); |
| return responseJson.get("errors").getAsBoolean(); |
| } catch (IOException e) { |
| throw new StorageException(e); |
| } |
| } |
| |
| protected String toAction(String type, String id, String action) { |
| JsonObject properties = new JsonObject(); |
| properties.addProperty("_id", id); |
| properties.addProperty("_index", indexName); |
| properties.addProperty("_type", type); |
| |
| JsonObject jsonAction = new JsonObject(); |
| jsonAction.add(action, properties); |
| return jsonAction.toString() + System.lineSeparator(); |
| } |
| |
| protected void addNamedElement(String name, JsonObject element, JsonArray array) { |
| JsonObject arrayElement = new JsonObject(); |
| arrayElement.add(name, element); |
| array.add(arrayElement); |
| } |
| |
| protected String getSearch(SearchSourceBuilder searchSource, JsonArray sortArray) { |
| JsonObject search = new JsonParser().parse(searchSource.toString()).getAsJsonObject(); |
| search.add("sort", sortArray); |
| return gson.toJson(search); |
| } |
| |
| protected JsonArray getSortArray(String idFieldName) { |
| JsonObject properties = new JsonObject(); |
| properties.addProperty(ORDER, ASC_SORT_ORDER); |
| |
| JsonArray sortArray = new JsonArray(); |
| addNamedElement(idFieldName, properties, sortArray); |
| return sortArray; |
| } |
| |
| protected String getURI(String request) { |
| try { |
| return URLEncoder.encode(indexName, UTF_8.toString()) + "/" + request; |
| } catch (UnsupportedEncodingException e) { |
| throw new StorageException(e); |
| } |
| } |
| |
| protected Response postRequestWithRefreshParam(String uri, Object payload) { |
| return performRequest("POST", uri, payload, refreshParam); |
| } |
| |
| private String concatJsonString(String target, String addition) { |
| return target.substring(0, target.length() - 1) + "," + addition.substring(1); |
| } |
| |
| private Response performRequest(String method, String uri) { |
| return performRequest(method, uri, null); |
| } |
| |
| private Response performRequest(String method, String uri, @Nullable Object payload) { |
| return performRequest(method, uri, payload, Collections.emptyMap()); |
| } |
| |
| private Response performRequest( |
| String method, String uri, @Nullable Object payload, Map<String, String> params) { |
| Request request = new Request(method, uri.startsWith("/") ? uri : "/" + uri); |
| if (payload != null) { |
| String payloadStr = payload instanceof String ? (String) payload : payload.toString(); |
| request.setEntity(new NStringEntity(payloadStr, ContentType.APPLICATION_JSON)); |
| } |
| for (Map.Entry<String, String> entry : params.entrySet()) { |
| request.addParameter(entry.getKey(), entry.getValue()); |
| } |
| try { |
| return client.get().performRequest(request); |
| } catch (IOException e) { |
| throw new StorageException(e); |
| } |
| } |
| |
| protected class ElasticQuerySource implements DataSource<V> { |
| private final QueryOptions opts; |
| private final Predicate<V> predicate; |
| private final String search; |
| |
| ElasticQuerySource(Predicate<V> p, QueryOptions opts, JsonArray sortArray) |
| throws QueryParseException { |
| this.opts = opts; |
| this.predicate = p; |
| QueryBuilder qb = queryBuilder.toQueryBuilder(p); |
| SearchSourceBuilder searchSource = |
| new SearchSourceBuilder(client.adapter()) |
| .query(qb) |
| .size(opts.pageSize()) |
| .fields(Lists.newArrayList(opts.fields())) |
| .trackTotalHits(false); |
| searchSource = |
| opts.searchAfter() != null |
| ? searchSource.searchAfter((JsonArray) opts.searchAfter()) |
| : searchSource.from(opts.start()); |
| search = getSearch(searchSource, sortArray); |
| } |
| |
| @Override |
| public int getCardinality() { |
| if (predicate instanceof HasCardinality) { |
| return ((HasCardinality) predicate).getCardinality(); |
| } |
| return 10; |
| } |
| |
| @Override |
| public ResultSet<V> read() { |
| return readImpl(doc -> AbstractElasticIndex.this.fromDocument(doc, opts.fields())); |
| } |
| |
| @Override |
| public ResultSet<FieldBundle> readRaw() { |
| return readImpl(AbstractElasticIndex.this::toFieldBundle); |
| } |
| |
| private <T> ResultSet<T> readImpl(Function<JsonObject, T> mapper) { |
| try { |
| String uri = getURI(SEARCH); |
| JsonArray searchAfter = null; |
| Response response = |
| performRequest(HttpPost.METHOD_NAME, uri, search, Collections.emptyMap()); |
| StatusLine statusLine = response.getStatusLine(); |
| if (statusLine.getStatusCode() == HttpStatus.SC_OK) { |
| String content = getContent(response); |
| JsonObject obj = |
| new JsonParser().parse(content).getAsJsonObject().getAsJsonObject("hits"); |
| if (obj.get("hits") != null) { |
| JsonArray json = obj.getAsJsonArray("hits"); |
| ImmutableList.Builder<T> results = ImmutableList.builderWithExpectedSize(json.size()); |
| JsonObject hit = null; |
| for (int i = 0; i < json.size(); i++) { |
| hit = json.get(i).getAsJsonObject(); |
| T mapperResult = mapper.apply(hit); |
| if (mapperResult != null) { |
| results.add(mapperResult); |
| } |
| } |
| if (hit != null && hit.get("sort") != null) { |
| searchAfter = hit.getAsJsonArray("sort"); |
| } |
| JsonArray finalSearchAfter = searchAfter; |
| return new ListResultSet<T>(results.build()) { |
| @Override |
| public Object searchAfter() { |
| return finalSearchAfter; |
| } |
| }; |
| } |
| } else { |
| logger.atSevere().log("%s", statusLine.getReasonPhrase()); |
| } |
| return new ListResultSet<>(ImmutableList.of()); |
| } catch (IOException e) { |
| throw new StorageException(e); |
| } |
| } |
| } |
| } |