blob: f5cc2730d8b59d60a111ec6fe460b9fe885e2849 [file] [log] [blame]
// Copyright (C) 2014 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch;
import static com.google.common.base.Preconditions.checkState;
import static com.google.gson.FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES;
import static org.apache.commons.codec.binary.Base64.decodeBase64;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.FieldDef.FillArgs;
import com.google.gerrit.server.index.Index;
import com.google.gerrit.server.index.IndexUtils;
import com.google.gerrit.server.index.Schema;
import com.google.gerrit.server.index.Schema.Values;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gwtorm.protobuf.ProtobufCodec;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.TimeUnit;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.client.http.JestHttpClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.Delete;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.IndicesExists;
abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
protected static <T> List<T> decodeProtos(JsonObject doc, String fieldName,
ProtobufCodec<T> codec) {
JsonArray field = doc.getAsJsonArray(fieldName);
if (field == null) {
return null;
}
return FluentIterable.from(field)
.transform(i -> codec.decode(decodeBase64(i.toString())))
.toList();
}
private final Schema<V> schema;
private final FillArgs fillArgs;
private final SitePaths sitePaths;
protected final boolean refresh;
protected final String indexName;
protected final JestHttpClient client;
protected final Gson gson;
protected final ElasticQueryBuilder queryBuilder;
AbstractElasticIndex(@GerritServerConfig Config cfg,
FillArgs fillArgs,
SitePaths sitePaths,
Schema<V> schema,
String indexName) {
this.fillArgs = fillArgs;
this.sitePaths = sitePaths;
this.schema = schema;
this.gson = new GsonBuilder()
.setFieldNamingPolicy(LOWER_CASE_WITH_UNDERSCORES).create();
this.queryBuilder = new ElasticQueryBuilder();
String protocol = getRequiredConfigOption(cfg, "protocol");
String hostname = getRequiredConfigOption(cfg, "hostname");
String port = getRequiredConfigOption(cfg, "port");
this.indexName = String.format("%s%s%04d",
Strings.nullToEmpty(cfg.getString("index", null, "prefix")),
indexName,
schema.getVersion());
// By default Elasticsearch has a 1s delay before changes are available in
// the index. Setting refresh(true) on calls to the index makes the index
// refresh immediately.
//
// Discovery should be disabled during test mode to prevent spurious
// connection failures caused by the client starting up and being ready
// before the test node.
//
// This setting should only be set to true during testing, and is not
// documented.
this.refresh = cfg.getBoolean("index", "elasticsearch", "test", false);
String url = buildUrl(protocol, hostname, port);
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig
.Builder(url)
.multiThreaded(true)
.discoveryEnabled(!refresh)
.discoveryFrequency(1L, TimeUnit.MINUTES)
.build());
client = (JestHttpClient) factory.getObject();
}
@Override
public Schema<V> getSchema() {
return schema;
}
@Override
public void close() {
client.shutdownClient();
}
@Override
public void markReady(boolean ready) throws IOException {
IndexUtils.setReady(sitePaths, indexName, schema.getVersion(), ready);
}
@Override
public void delete(K c) throws IOException {
Bulk bulk = addActions(new Bulk.Builder(), c).refresh(refresh).build();
JestResult result = client.execute(bulk);
if (!result.isSucceeded()) {
throw new IOException(String.format(
"Failed to delete change %s in index %s: %s", c, indexName,
result.getErrorMessage()));
}
}
@Override
public void deleteAll() throws IOException {
// Delete the index, if it exists.
JestResult result = client.execute(
new IndicesExists.Builder(indexName).build());
if (result.isSucceeded()) {
result = client.execute(
new DeleteIndex.Builder(indexName).build());
if (!result.isSucceeded()) {
throw new IOException(String.format(
"Failed to delete index %s: %s", indexName,
result.getErrorMessage()));
}
}
// Recreate the index.
result = client.execute(
new CreateIndex.Builder(indexName).settings(getMappings()).build());
if (!result.isSucceeded()) {
String error = String.format("Failed to create index %s: %s",
indexName, result.getErrorMessage());
throw new IOException(error);
}
}
protected abstract Bulk.Builder addActions(Bulk.Builder builder, K c);
protected abstract String getMappings();
protected abstract String getId(V v);
protected Delete delete(String type, K c) {
String id = c.toString();
return new Delete.Builder(id)
.index(indexName)
.type(type)
.build();
}
protected io.searchbox.core.Index insert(String type, V v) throws IOException {
String id = getId(v);
String doc = toDoc(v);
return new io.searchbox.core.Index.Builder(doc)
.index(indexName)
.type(type)
.id(id)
.build();
}
private String toDoc(V v) throws IOException {
XContentBuilder builder = jsonBuilder().startObject();
for (Values<V> values : schema.buildFields(v, fillArgs)) {
String name = values.getField().getName();
if (values.getField().isRepeatable()) {
builder.array(name, values.getValues());
} else {
Object element = Iterables.getOnlyElement(values.getValues(), "");
if (!(element instanceof String) || !((String) element).isEmpty()) {
builder.field(name, element);
}
}
}
return builder.endObject().string();
}
private String getRequiredConfigOption(Config cfg, String name) {
String option = cfg.getString("index", null, name);
checkState(!Strings.isNullOrEmpty(option), "index." + name + " must be supplied");
return option;
}
private String buildUrl(String protocol, String hostname, String port) {
try {
return new URL(protocol, hostname, Integer.parseInt(port), "").toString();
} catch (MalformedURLException | NumberFormatException e) {
throw new RuntimeException(
"Cannot build url to Elasticsearch from values: protocol=" + protocol
+ " hostname=" + hostname + " port=" + port, e);
}
}
}